1 from __future__
import print_function
2 from __future__
import absolute_import
13 from .CLIHelper
import CLIHelper
14 from .CrabHelper
import CrabHelper
15 import FWCore.ParameterSet.Config
as cms
16 log = logging.getLogger(__name__)
19 """ This is the base class for all DTWorkflows and contains some 41 os.chdir(self.options.working_dir)
46 if self.options.command
in requirements_dict:
47 for option
in requirements_dict[self.options.command]:
48 if not (hasattr(self.
options, option)
49 and ( (getattr(self.
options,option))
50 or isinstance(getattr(self.
options,option), bool) )):
51 missing_options.append(option)
52 if len(missing_options) > 0:
53 err =
"The following CLI options are missing" 54 err +=
" for command %s: " % self.options.command
55 err +=
" ".
join(missing_options)
59 """ Generalized function to run workflow command""" 60 msg =
"Preparing %s workflow" % self.options.workflow
61 if hasattr(self.
options,
"command"):
62 msg +=
" for command %s" % self.options.command
64 if self.options.config_path:
77 run_function = getattr(self, self.options.command)
78 except AttributeError:
79 errmsg =
"Class `{}` does not implement `{}` for workflow %s" % self.options.workflow
80 if hasattr(self.
options,
"workflow_mode"):
81 errmsg +=
"and workflow mode %s" % self.options.workflow_mode
82 raise NotImplementedError( errmsg.format(self.__class__.__name__,
83 self.options.command))
84 log.debug(
"Running command %s" % self.options.command)
89 """ Abstract implementation of prepare workflow function""" 90 errmsg =
"Class `{}` does not implement `{}`" 91 raise NotImplementedError( errmsg.format(self.__class__.__name__,
95 """ generalized function to perform several workflow mode commands in chain. 96 All commands mus be specified in self.all_commands list in workflow mode specific 97 prepare function in child workflow objects. 100 for command
in self.all_commands:
101 self.options.command = command
108 """ Function to check status of submitted tasks """ 121 """ Add preselection to the process object stored in workflow_object""" 122 if not hasattr(self,
"process"):
123 raise NameError(
"Process is not initalized in workflow object")
124 pathsequence = self.options.preselection.split(
':')[0]
125 seqname = self.options.preselection.split(
':')[1]
126 self.process.load(pathsequence)
134 """ Add a local t0 database as input. Use the option local is used 135 if the pset is processed locally and not with crab. 138 connect = os.path.abspath(self.options.inputT0DB)
140 connect = os.path.basename(self.options.inputT0DB)
145 connect =
'sqlite_file:%s' % connect)
146 self.input_files.append(os.path.abspath(self.options.inputT0DB))
149 """ Add a local vdrift database as input. Use the option local is used 150 if the pset is processed locally and not with crab. 153 connect = os.path.abspath(self.options.inputVDriftDB)
155 connect = os.path.basename(self.options.inputVDriftDB)
157 moduleName =
'vDriftDB',
158 record =
'DTMtimeRcd',
160 connect =
'sqlite_file:%s' % connect)
161 self.input_files.append( os.path.abspath(self.options.inputVDriftDB) )
164 """ Add a local calib database as input. Use the option local is used 165 if the pset is processed locally and not with crab. 168 if self.options.datasettype ==
"Cosmics":
171 connect = os.path.abspath(self.options.inputCalibDB)
173 connect = os.path.basename(self.options.inputCalibDB)
175 moduleName =
'calibDB',
176 record =
'DTTtrigRcd',
178 connect =
str(
"sqlite_file:%s" % connect),
181 self.input_files.append( os.path.abspath(self.options.inputCalibDB) )
184 for option
in (
'inputDBRcd',
'connectStrDBTag'):
185 if hasattr(self.
options, option)
and not getattr(self.
options, option):
186 raise ValueError(
"Option %s needed for custom input db" % option)
188 record = self.options.inputDBRcd,
189 tag = self.options.inputDBTag,
190 connect = self.options.connectStrDBTag,
191 moduleName =
'customDB%s' % self.options.inputDBRcd
195 """ Common operations used in most prepare_[workflow_mode]_submit functions""" 196 if not self.options.run:
197 raise ValueError(
"Option run is required for submission!")
198 if hasattr(self.
options,
"inputT0DB")
and self.options.inputT0DB:
201 if hasattr(self.
options,
"inputVDriftDB")
and self.options.inputVDriftDB:
204 if hasattr(self.
options,
"inputDBTag")
and self.options.inputDBTag:
207 if self.options.run_on_RAW:
209 if self.options.preselection:
213 """ Common operations used in most prepare_[workflow_mode]_erite functions""" 215 output_path = os.path.join( self.
local_path,
"unmerged_results" )
216 merged_file = os.path.join(self.
result_path, self.output_file)
219 if not (self.options.skip_stageout
or self.
files_reveived or self.options.no_exec):
221 log.info(
"Received files from storage element")
222 log.info(
"Using hadd to merge output files")
223 if not self.options.no_exec
and do_hadd:
226 raise RuntimeError(
"Failed to merge files with hadd")
227 return crabtask.crabConfig.Data.outputDatasetTag
231 self.process.calibDB.connect =
'sqlite_file:%s' % db_path
237 out_path = os.path.abspath(os.path.join(path,
238 os.path.splitext(db_path)[0] +
".txt"))
240 self.process.dumpToFile.outputFileName = out_path
247 connect=
'sqlite_file:',
252 calibDB = cms.ESSource(
"PoolDBESSource",
254 timetype = cms.string(
'runnumber'),
255 toGet = cms.VPSet(cms.PSet(
256 record = cms.string(record),
257 tag = cms.string(tag),
258 label = cms.untracked.string(label)
261 calibDB.connect = cms.string(
str(connect) )
263 if 'oracle:' in connect:
264 calibDB.DBParameters.authenticationPath =
'/afs/cern.ch/cms/DB/conddb' 265 setattr(process,moduleName,calibDB)
266 setattr(process,
"es_prefer_" + moduleName,cms.ESPrefer(
'PoolDBESSource',
271 self.crab.callCrabCommand( [
"getoutput",
274 crabtask.crabFolder ] )
277 """ Run a cmsRun job locally. The member variable self.pset_path is used 278 if pset_path argument is not given""" 279 if self.options.no_exec:
281 process = subprocess.Popen(
"cmsRun %s" % self.
pset_path,
282 stdout=subprocess.PIPE,
283 stderr=subprocess.STDOUT,
285 stdout = process.communicate()[0]
287 if process.returncode != 0:
288 raise RuntimeError(
"Failed to use cmsRun for pset %s" % self.
pset_name)
289 return process.returncode
293 """ Output path on remote excluding user base path 294 Returns a dict if crab is used due to crab path setting policy""" 295 if self.options.command ==
"submit":
297 "outLFNDirBase" : os.path.join(
"/store",
303 "outputDatasetTag" : self.
tag 306 return os.path.join(
'DTCalibration/',
308 'Run' +
str(self.options.run),
311 'v' +
str(self.options.trial),
315 if not self.options.workflow_mode
in self.outpath_workflow_mode_dict:
316 raise NotImplementedError(
"%s missing in outpath_workflow_mode_dict" % self.options.workflow_mode)
317 return self.outpath_workflow_mode_dict[self.options.workflow_mode]
321 return 'Run' +
str(self.options.run) +
'_v' +
str(self.options.trial)
327 if hasattr(self.
options,
"user")
and self.options.user:
328 self.
_user = self.options.user
330 self.
_user = self.crab.checkusername()
335 """ Output path on local machine """ 336 if self.options.run
and self.options.label:
337 prefix =
"Run%d-%s_v%d" % ( self.options.run,
343 path = os.path.join( self.options.working_dir,
347 path = os.path.join( self.options.working_dir,
354 result_path = os.path.abspath(os.path.join(self.
local_path,
"results"))
355 if not os.path.exists(result_path):
356 os.makedirs(result_path)
361 """ Base path to folder containing pset files for cmsRun""" 362 return os.path.expandvars(os.path.join(
"$CMSSW_BASE",
371 """ full path to the pset file """ 372 basepath = os.path.join( self.
local_path,
"psets")
373 if not os.path.exists( basepath ):
374 os.makedirs( basepath )
375 return os.path.join( basepath, self.
pset_name )
378 if not hasattr(self,
"process"):
379 raise NameError(
"Process is not initalized in workflow object")
383 pfile.write(self.process.dumpPython())
386 """ Create the name for the output json file which will be dumped""" 388 command = self.options.command
389 return "config_" + command +
".json" 393 json.dump(vars(self.
options), out_file, indent=4)
396 if not os.path.exists(config_file_path):
397 raise IOError(
"File %s not found" % config_file_path)
398 with open(config_file_path,
"r") as input_file: 399 config_json = json.load(input_file) 400 for key, val
in config_json.items():
401 if not hasattr(self.
options, key)
or not getattr(self.
options, key):
402 setattr(self.
options, key, val)
405 """Load options for previous command in workflow """ 406 if not self.options.config_path:
407 if not self.options.run:
408 raise RuntimeError(
"Option run is required if no config path specified")
410 raise IOError(
"Local path %s does not exist" % self.
local_path)
411 self.options.config_path = os.path.join(self.
local_path,
def remote_out_path(self)
def add_local_vdrift_db(self, local=False)
def addPoolDBESSource(process, moduleName, record, tag, connect='sqlite_file:', label='')
def add_local_t0_db(self, local=False)
required_options_prepare_dict
def get_config_name(self, command="")
S & print(S &os, JobReport::InputFile const &f)
def add_local_calib_db(self, local=False)
def load_options_command(self, command)
def outpath_workflow_mode_tag(self)
def prepare_common_write(self, do_hadd=True)
def fill_required_options_dict(self)
def get_output_files(self, crabtask, output_path)
def check_missing_options(self, requirements_dict)
def crab_config_filepath(self)
def load_options(self, config_file_path)
def submit_crab_task(self)
static std::string join(char **cmd)
def __init__(self, options)
def fill_required_options_prepare_dict(self)
def runCMSSWtask(self, pset_path="")
def prepare_common_submit(self)
def prepare_common_dump(self, db_path)
def pset_template_base_bath(self)
def write_pset_file(self)
def add_preselection(self)
def prepare_workflow(self)
def add_local_custom_db(self)