CMS 3D CMS Logo

CrabHelper.py
Go to the documentation of this file.
1 import logging
2 import sys
3 import os
4 from importlib import import_module
5 import subprocess
6 import time
7 import tools
8 
9 log = logging.getLogger(__name__)
11 
12  def __init__(self):
13  # perform imports only when creating instance. This allows to use the classmethods e.g.for
14  # CLI construction before crab is sourced.
15  self.crabFunctions = import_module('CalibMuon.DTCalibration.Workflow.Crabtools.crabFunctions')
16  # cached member variables
17  self._crab = None
18  self._cert_info = None
19 
20  def submit_crab_task(self):
21  # create a crab config
22  log.info("Creating crab config")
23  self.create_crab_config()
24  #write crab config
25  full_crab_config_filename = self.write_crabConfig()
26  if self.options.no_exec:
27  log.info("Runing with option no-exec exiting")
28  return True
29  #submit crab job
30  log.info("Submitting crab job")
31  self.crab.submit(full_crab_config_filename)
32  log.info("crab job submitted. Waiting 30 seconds before first status call")
33  time.sleep( 30 )
34  task = self.crabFunctions.CrabTask(crab_config = full_crab_config_filename)
35  task.update()
36  success_states = ( 'QUEUED', 'SUBMITTED', "COMPLETED", "FINISHED")
37  if task.state in success_states:
38  log.info("Job in state %s" % task.state )
39  return True
40  else:
41  log.error("Job submission not successful, crab state:%s" % task.state)
42  raise RuntimeError("Job submission not successful, crab state:%s" % task.state)
43 
44  def check_crabtask(self):
45  print self.crab_config_filepath
46  task = self.crabFunctions.CrabTask(crab_config = self.crab_config_filepath,
47  initUpdate = False)
48  if self.options.no_exec:
49  log.info("Nothing to check in no-exec mode")
50  return True
51  for n_check in range(self.options.max_checks):
52  task.update()
53  if task.state in ( "COMPLETED"):
54  print "Crab task complete. Getting output locally"
55  output_path = os.path.join( self.local_path, "unmerged_results" )
56  self.get_output_files(task, output_path)
57  return True
58  if task.state in ("SUBMITFAILED", "FAILED"):
59  print "Crab task failed"
60  return False
61  possible_job_states = ["nUnsubmitted",
62  "nIdle",
63  "nRunning",
64  "nTransferring",
65  "nCooloff",
66  "nFailed",
67  "nFinished",
68  "nComplete" ]
69 
70  jobinfos = ""
71  for jobstate in possible_job_states:
72  njobs_in_state = getattr(task, jobstate)
73  if njobs_in_state > 0:
74  jobinfos+="%s: %d " % (jobstate[1:], njobs_in_state)
75 
76  #clear line for reuse
77  sys.stdout.write("\r")
78  sys.stdout.write("".join([" " for i in range(tools.getTerminalSize()[0])]))
79  sys.stdout.write("\r")
80  prompt_text = "Check (%d/%d). Task state: %s (%s). Press q and enter to stop checks: " % (n_check,
81  self.options.max_checks, task.state, jobinfos)
82  user_input = tools.stdinWait(prompt_text, "", self.options.check_interval)
83  if user_input in ("q","Q"):
84  return False
85  print "Task not completed after %d checks (%d minutes)" % ( self.options.max_checks,
86  int( self.options.check_interval / 60. ))
87  return False
88 
90  process = subprocess.Popen(['voms-proxy-info', '-timeleft'],
91  stdout=subprocess.PIPE,
92  stderr=subprocess.STDOUT)
93  stdout = process.communicate()[0]
94  if process.returncode != 0:
95  return 0
96  else:
97  return int(stdout)
98 
99  def voms_proxy_create(self, passphrase = None):
100  voms = 'cms'
101  if passphrase:
102  p = subprocess.Popen(['voms-proxy-init', '--voms', voms, '--valid', '192:00'],
103  stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT)
104  stdout = p.communicate(input=passphrase+'\n')[0]
105  retcode = p.returncode
106  if not retcode == 0:
107  raise ProxyError('Proxy initialization command failed: %s'%stdout)
108  else:
109  retcode = subprocess.call(['voms-proxy-init', '--voms', voms, '--valid', '192:00'])
110  if not retcode == 0:
111  raise ProxyError('Proxy initialization command failed.')
112 
113 
115  """ Create a crab config object dependent on the chosen command option"""
116  from CalibMuon.DTCalibration.Workflow.Crabtools.crabConfigParser import CrabConfigParser
118  """ Fill common options in crab config """
119  ### General section
120  self.crab_config.add_section('General')
121  if "/" in self.crab_taskname:
122  raise ValueError( 'Sample contains "/" which is not allowed' )
123  self.crab_config.set( 'General', 'requestName', self.crab_taskname )
124  self.crab_config.set( 'General', 'workArea', self.local_path)
125  if self.options.no_log:
126  self.crab_config.set( 'General', 'transferLogs', 'False' )
127  else:
128  self.crab_config.set( 'General', 'transferLogs', 'True' )
129  ### JobType section
130  self.crab_config.add_section('JobType')
131  self.crab_config.set( 'JobType', 'pluginName', 'Analysis' )
132  self.crab_config.set( 'JobType', 'psetName', self.pset_path )
133  self.crab_config.set( 'JobType', 'outputFiles', self.output_files)
134  if self.input_files:
135  self.crab_config.set( 'JobType', 'inputFiles', self.input_files)
136  ### Data section
137  self.crab_config.add_section('Data')
138  self.crab_config.set('Data', 'inputDataset', self.options.datasetpath)
139  # set job splitting options
140  if self.options.datasettype =="MC":
141  self.crab_config.set('Data', 'splitting', 'FileBased')
142  self.crab_config.set('Data', 'unitsPerJob', str(self.options.filesPerJob) )
143  else:
144  self.crab_config.set('Data', 'splitting', 'LumiBased')
145  self.crab_config.set('Data', 'unitsPerJob', str(self.options.lumisPerJob) )
146  if self.options.runselection:
147  self.crab_config.set( "Data",
148  "runRange",
149  ",".join( self.options.runselection )
150  )
151  # set output path in compliance with crab3 structure
152  self.crab_config.set('Data', 'publication', False)
153  self.crab_config.set('Data', 'outputDatasetTag', self.remote_out_path["outputDatasetTag"])
154  self.crab_config.set('Data', 'outLFNDirBase', self.remote_out_path["outLFNDirBase"] )
155 
156  # set site section options
157  self.crab_config.add_section('Site')
158  self.crab_config.set('Site', 'storageSite', self.options.output_site)
159  self.crab_config.set('Site', 'whitelist', self.options.ce_white_list)
160  self.crab_config.set('Site', 'blacklist', self.options.ce_black_list)
161 
162  #set user section options if necessary
163 # if self.cert_info.voGroup or self.cert_info.voRole:
164 # self.crab_config.add_section('User')
165 # if self.cert_info.voGroup:
166 # self.crab_config.set('User', "voGroup", self.cert_info.voGroup)
167 # if self.cert_info.voRole:
168 # self.crab_config.set('User', "voRole", self.cert_info.voRole)
169  log.debug("Created crab config: %s " % self.crab_config_filename)
170 
171  def write_crabConfig(self):
172  """ Write crab config file in working dir with label option as name """
173  base_path = os.path.join( self.options.working_dir,self.local_path)
174  filename = os.path.join( base_path, self.crab_config_filename)
175  if not os.path.exists(base_path):
176  os.makedirs(base_path)
177  if os.path.exists(filename):
178  raise IOError("file %s alrady exits"%(filename))
179  self.crab_config.writeCrabConfig(filename)
180  log.info( 'created crab config file %s'%filename )
181  return filename
182 
184  crabtask = CrabTask( crab_config = self.crab_config_filename )
185  splitinfo = crabtask.crabConfig.Data.outputDatasetTag.split("_")
186  run, trial = splitinfo[0].split("Run")[-1], splitinfo[1].split("v")[-1]
187  if not self.options.run:
188  self.options.run = int(run)
189  self.options.trail = int(trial)
190  if not hasattr(self.options, "datasetpath"):
191  self.options.datasetpath = crabtask.crabConfig.Data.inputDataset
192  if not hasattr(self.options, "label"):
193  self.options.label = crabtask.crabConfig.General.requestName.split("_")[0]
194 
195  @property
196  def crab(self):
197  """ Retuns a CrabController instance from cache or creates new
198  on on first call """
199  if self._crab is None:
200  if self.cert_info.voGroup:
201  self._crab = self.crabFunctions.CrabController(voGroup = self.cert_info.voGroup)
202  else:
203  self._crab = self.crabFunctions.CrabController()
204  return self._crab
205 
206  @property
207  def cert_info(self):
208  if not self._cert_info:
209  if not self.voms_proxy_time_left() > 0:
210  warn_msg = "No valid proxy, a default proxy without a specific"
211  warn_msg = "VOGroup will be used"
212  self.voms_proxy_create()
213  log.warning(warn_msg)
214  self._cert_info = self.crabFunctions.CertInfo()
215  return self._cert_info
216 
217  @property
219  if hasattr(self.options, "crab_config_path"):
220  return self.options.crab_config_path
221  return 'crab_%s_cfg.py' % self.crab_taskname
222 
223  @property
225  base_path = os.path.join( self.options.working_dir,self.local_path)
226  return os.path.join( base_path, self.crab_config_filename)
227 
228  @property
229  def crab_taskname(self):
230  taskname = self.options.label + "_" + self.options.workflow + "_"
231  if hasattr( self.options, "workflow_mode"):
232  taskname+= self.options.workflow_mode + "_"
233  taskname += "run_" + str(self.options.run) + "_v" + str(self.options.trial)
234  return taskname
def fill_options_from_crab_config(self)
Definition: CrabHelper.py:183
def create_crab_config(self)
Definition: CrabHelper.py:114
This module extends the python configparser to create crab3 config files.
def crab_config_filepath(self)
Definition: CrabHelper.py:224
def voms_proxy_create(self, passphrase=None)
Definition: CrabHelper.py:99
def getTerminalSize()
Definition: tools.py:95
def submit_crab_task(self)
Definition: CrabHelper.py:20
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
def voms_proxy_time_left(self)
Definition: CrabHelper.py:89
def write_crabConfig(self)
Definition: CrabHelper.py:171
def stdinWait(text, default, time, timeoutDisplay=None, kwargs)
Definition: tools.py:69
def crab_taskname(self)
Definition: CrabHelper.py:229
def check_crabtask(self)
Definition: CrabHelper.py:44
double split
Definition: MVATrainer.cc:139
def crab_config_filename(self)
Definition: CrabHelper.py:218