1 from __future__
import print_function
12 from CLIHelper
import CLIHelper
13 from CrabHelper
import CrabHelper
14 import FWCore.ParameterSet.Config
as cms
15 log = logging.getLogger(__name__)
18 """ This is the base class for all DTWorkflows and contains some 40 os.chdir(self.options.working_dir)
45 if self.options.command
in requirements_dict:
46 for option
in requirements_dict[self.options.command]:
47 if not (hasattr(self.
options, option)
48 and ( (getattr(self.
options,option))
49 or isinstance(getattr(self.
options,option), bool) )):
50 missing_options.append(option)
51 if len(missing_options) > 0:
52 err =
"The following CLI options are missing" 53 err +=
" for command %s: " % self.options.command
54 err +=
" ".
join(missing_options)
58 """ Generalized function to run workflow command""" 59 msg =
"Preparing %s workflow" % self.options.workflow
60 if hasattr(self.
options,
"command"):
61 msg +=
" for command %s" % self.options.command
63 if self.options.config_path:
76 run_function = getattr(self, self.options.command)
77 except AttributeError:
78 errmsg =
"Class `{}` does not implement `{}` for workflow %s" % self.options.workflow
79 if hasattr(self.
options,
"workflow_mode"):
80 errmsg +=
"and workflow mode %s" % self.options.workflow_mode
81 raise NotImplementedError( errmsg.format(self.__class__.__name__,
82 self.options.command))
83 log.debug(
"Running command %s" % self.options.command)
88 """ Abstract implementation of prepare workflow function""" 89 errmsg =
"Class `{}` does not implement `{}`" 90 raise NotImplementedError( errmsg.format(self.__class__.__name__,
94 """ generalized function to perform several workflow mode commands in chain. 95 All commands mus be specified in self.all_commands list in workflow mode specific 96 prepare function in child workflow objects. 99 for command
in self.all_commands:
100 self.options.command = command
107 """ Function to check status of submitted tasks """ 120 """ Add preselection to the process object stored in workflow_object""" 121 if not hasattr(self,
"process"):
122 raise NameError(
"Process is not initalized in workflow object")
123 pathsequence = self.options.preselection.split(
':')[0]
124 seqname = self.options.preselection.split(
':')[1]
125 self.process.load(pathsequence)
133 """ Add a local t0 database as input. Use the option local is used 134 if the pset is processed locally and not with crab. 137 connect = os.path.abspath(self.options.inputT0DB)
139 connect = os.path.basename(self.options.inputT0DB)
144 connect =
'sqlite_file:%s' % connect)
145 self.input_files.append(os.path.abspath(self.options.inputT0DB))
148 """ Add a local vdrift database as input. Use the option local is used 149 if the pset is processed locally and not with crab. 152 connect = os.path.abspath(self.options.inputVDriftDB)
154 connect = os.path.basename(self.options.inputVDriftDB)
156 moduleName =
'vDriftDB',
157 record =
'DTMtimeRcd',
159 connect =
'sqlite_file:%s' % connect)
160 self.input_files.append( os.path.abspath(self.options.inputVDriftDB) )
163 """ Add a local calib database as input. Use the option local is used 164 if the pset is processed locally and not with crab. 167 if self.options.datasettype ==
"Cosmics":
170 connect = os.path.abspath(self.options.inputCalibDB)
172 connect = os.path.basename(self.options.inputCalibDB)
174 moduleName =
'calibDB',
175 record =
'DTTtrigRcd',
177 connect =
str(
"sqlite_file:%s" % connect),
180 self.input_files.append( os.path.abspath(self.options.inputCalibDB) )
183 for option
in (
'inputDBRcd',
'connectStrDBTag'):
184 if hasattr(self.
options, option)
and not getattr(self.
options, option):
185 raise ValueError(
"Option %s needed for custom input db" % option)
187 record = self.options.inputDBRcd,
188 tag = self.options.inputDBTag,
189 connect = self.options.connectStrDBTag,
190 moduleName =
'customDB%s' % self.options.inputDBRcd
194 """ Common operations used in most prepare_[workflow_mode]_submit functions""" 195 if not self.options.run:
196 raise ValueError(
"Option run is required for submission!")
197 if hasattr(self.
options,
"inputT0DB")
and self.options.inputT0DB:
200 if hasattr(self.
options,
"inputVDriftDB")
and self.options.inputVDriftDB:
203 if hasattr(self.
options,
"inputDBTag")
and self.options.inputDBTag:
206 if self.options.run_on_RAW:
208 if self.options.preselection:
212 """ Common operations used in most prepare_[workflow_mode]_erite functions""" 214 output_path = os.path.join( self.
local_path,
"unmerged_results" )
215 merged_file = os.path.join(self.
result_path, self.output_file)
218 if not (self.options.skip_stageout
or self.
files_reveived or self.options.no_exec):
220 log.info(
"Received files from storage element")
221 log.info(
"Using hadd to merge output files")
222 if not self.options.no_exec
and do_hadd:
225 raise RuntimeError(
"Failed to merge files with hadd")
226 return crabtask.crabConfig.Data.outputDatasetTag
230 self.process.calibDB.connect =
'sqlite_file:%s' % db_path
236 out_path = os.path.abspath(os.path.join(path,
237 os.path.splitext(db_path)[0] +
".txt"))
239 self.process.dumpToFile.outputFileName = out_path
246 connect=
'sqlite_file:',
251 calibDB = cms.ESSource(
"PoolDBESSource",
253 timetype = cms.string(
'runnumber'),
254 toGet = cms.VPSet(cms.PSet(
255 record = cms.string(record),
256 tag = cms.string(tag),
257 label = cms.untracked.string(label)
260 calibDB.connect = cms.string(
str(connect) )
262 if 'oracle:' in connect:
263 calibDB.DBParameters.authenticationPath =
'/afs/cern.ch/cms/DB/conddb' 264 setattr(process,moduleName,calibDB)
265 setattr(process,
"es_prefer_" + moduleName,cms.ESPrefer(
'PoolDBESSource',
270 self.crab.callCrabCommand( [
"getoutput",
273 crabtask.crabFolder ] )
276 """ Run a cmsRun job locally. The member variable self.pset_path is used 277 if pset_path argument is not given""" 278 if self.options.no_exec:
280 process = subprocess.Popen(
"cmsRun %s" % self.
pset_path,
281 stdout=subprocess.PIPE,
282 stderr=subprocess.STDOUT,
284 stdout = process.communicate()[0]
286 if process.returncode != 0:
287 raise RuntimeError(
"Failed to use cmsRun for pset %s" % self.
pset_name)
288 return process.returncode
292 """ Output path on remote excluding user base path 293 Returns a dict if crab is used due to crab path setting policy""" 294 if self.options.command ==
"submit":
296 "outLFNDirBase" : os.path.join(
"/store",
302 "outputDatasetTag" : self.
tag 305 return os.path.join(
'DTCalibration/',
307 'Run' +
str(self.options.run),
310 'v' +
str(self.options.trial),
314 if not self.options.workflow_mode
in self.outpath_workflow_mode_dict:
315 raise NotImplementedError(
"%s missing in outpath_workflow_mode_dict" % self.options.workflow_mode)
316 return self.outpath_workflow_mode_dict[self.options.workflow_mode]
320 return 'Run' +
str(self.options.run) +
'_v' +
str(self.options.trial)
326 if hasattr(self.
options,
"user")
and self.options.user:
327 self.
_user = self.options.user
329 self.
_user = self.crab.checkusername()
334 """ Output path on local machine """ 335 if self.options.run
and self.options.label:
336 prefix =
"Run%d-%s_v%d" % ( self.options.run,
342 path = os.path.join( self.options.working_dir,
346 path = os.path.join( self.options.working_dir,
353 result_path = os.path.abspath(os.path.join(self.
local_path,
"results"))
354 if not os.path.exists(result_path):
355 os.makedirs(result_path)
360 """ Base path to folder containing pset files for cmsRun""" 361 return os.path.expandvars(os.path.join(
"$CMSSW_BASE",
370 """ full path to the pset file """ 371 basepath = os.path.join( self.
local_path,
"psets")
372 if not os.path.exists( basepath ):
373 os.makedirs( basepath )
374 return os.path.join( basepath, self.
pset_name )
377 if not hasattr(self,
"process"):
378 raise NameError(
"Process is not initalized in workflow object")
382 pfile.write(self.process.dumpPython())
385 """ Create the name for the output json file which will be dumped""" 387 command = self.options.command
388 return "config_" + command +
".json" 392 json.dump(vars(self.
options), out_file, indent=4)
395 if not os.path.exists(config_file_path):
396 raise IOError(
"File %s not found" % config_file_path)
397 with open(config_file_path,
"r") as input_file: 398 config_json = json.load(input_file) 399 for key, val
in config_json.items():
400 if not hasattr(self.
options, key)
or not getattr(self.
options, key):
401 setattr(self.
options, key, val)
404 """Load options for previous command in workflow """ 405 if not self.options.config_path:
406 if not self.options.run:
407 raise RuntimeError(
"Option run is required if no config path specified")
409 raise IOError(
"Local path %s does not exist" % self.
local_path)
410 self.options.config_path = os.path.join(self.
local_path,
def remote_out_path(self)
def add_local_vdrift_db(self, local=False)
def addPoolDBESSource(process, moduleName, record, tag, connect='sqlite_file:', label='')
def add_local_t0_db(self, local=False)
required_options_prepare_dict
def get_config_name(self, command="")
S & print(S &os, JobReport::InputFile const &f)
def add_local_calib_db(self, local=False)
def load_options_command(self, command)
def outpath_workflow_mode_tag(self)
def prepare_common_write(self, do_hadd=True)
def fill_required_options_dict(self)
def get_output_files(self, crabtask, output_path)
def check_missing_options(self, requirements_dict)
def crab_config_filepath(self)
def load_options(self, config_file_path)
def submit_crab_task(self)
static std::string join(char **cmd)
def __init__(self, options)
def fill_required_options_prepare_dict(self)
def runCMSSWtask(self, pset_path="")
def prepare_common_submit(self)
def prepare_common_dump(self, db_path)
def pset_template_base_bath(self)
def write_pset_file(self)
def add_preselection(self)
def prepare_workflow(self)
def add_local_custom_db(self)