11 from CLIHelper
import CLIHelper
12 from CrabHelper
import CrabHelper
13 import FWCore.ParameterSet.Config
as cms
14 log = logging.getLogger(__name__)
17 """ This is the base class for all DTWorkflows and contains some 39 os.chdir(self.options.working_dir)
44 if self.options.command
in requirements_dict:
45 for option
in requirements_dict[self.options.command]:
46 if not (hasattr(self.
options, option)
47 and ( (getattr(self.
options,option))
48 or type(getattr(self.
options,option)) == bool )):
49 missing_options.append(option)
50 if len(missing_options) > 0:
51 err =
"The following CLI options are missing" 52 err +=
" for command %s: " % self.options.command
53 err +=
" ".
join(missing_options)
57 """ Generalized function to run workflow command""" 58 msg =
"Preparing %s workflow" % self.options.workflow
59 if hasattr(self.
options,
"command"):
60 msg +=
" for command %s" % self.options.command
62 if self.options.config_path:
75 run_function = getattr(self, self.options.command)
76 except AttributeError:
77 errmsg =
"Class `{}` does not implement `{}` for workflow %s" % self.options.workflow
78 if hasattr(self.
options,
"workflow_mode"):
79 errmsg +=
"and workflow mode %s" % self.options.workflow_mode
80 raise NotImplementedError( errmsg.format(self.__class__.__name__,
81 self.options.command))
82 log.debug(
"Running command %s" % self.options.command)
87 """ Abstract implementation of prepare workflow function""" 88 errmsg =
"Class `{}` does not implement `{}`" 89 raise NotImplementedError( errmsg.format(self.__class__.__name__,
93 """ generalized function to perform several workflow mode commands in chain. 94 All commands mus be specified in self.all_commands list in workflow mode specific 95 prepare function in child workflow objects. 98 for command
in self.all_commands:
99 self.options.command = command
106 """ Function to check status of submitted tasks """ 119 """ Add preselection to the process object stored in workflow_object""" 120 if not hasattr(self,
"process"):
121 raise NameError(
"Process is not initalized in workflow object")
122 pathsequence = self.options.preselection.split(
':')[0]
123 seqname = self.options.preselection.split(
':')[1]
124 self.process.load(pathsequence)
132 """ Add a local t0 database as input. Use the option local is used 133 if the pset is processed locally and not with crab. 136 connect = os.path.abspath(self.options.inputT0DB)
138 connect = os.path.basename(self.options.inputT0DB)
143 connect =
'sqlite_file:%s' % connect)
144 self.input_files.append(os.path.abspath(self.options.inputT0DB))
147 """ Add a local vdrift database as input. Use the option local is used 148 if the pset is processed locally and not with crab. 151 connect = os.path.abspath(self.options.inputVDriftDB)
153 connect = os.path.basename(self.options.inputVDriftDB)
155 moduleName =
'vDriftDB',
156 record =
'DTMtimeRcd',
158 connect =
'sqlite_file:%s' % connect)
159 self.input_files.append( os.path.abspath(self.options.inputVDriftDB) )
162 """ Add a local calib database as input. Use the option local is used 163 if the pset is processed locally and not with crab. 166 if self.options.datasettype ==
"Cosmics":
169 connect = os.path.abspath(self.options.inputCalibDB)
171 connect = os.path.basename(self.options.inputCalibDB)
173 moduleName =
'calibDB',
174 record =
'DTTtrigRcd',
176 connect =
str(
"sqlite_file:%s" % connect),
179 self.input_files.append( os.path.abspath(self.options.inputCalibDB) )
182 for option
in (
'inputDBRcd',
'connectStrDBTag'):
183 if hasattr(self.
options, option)
and not getattr(self.
options, option):
184 raise ValueError(
"Option %s needed for custom input db" % option)
186 record = self.options.inputDBRcd,
187 tag = self.options.inputDBTag,
188 connect = self.options.connectStrDBTag,
189 moduleName =
'customDB%s' % self.options.inputDBRcd
193 """ Common operations used in most prepare_[workflow_mode]_submit functions""" 194 if not self.options.run:
195 raise ValueError(
"Option run is required for submission!")
196 if hasattr(self.
options,
"inputT0DB")
and self.options.inputT0DB:
199 if hasattr(self.
options,
"inputVDriftDB")
and self.options.inputVDriftDB:
202 if hasattr(self.
options,
"inputDBTag")
and self.options.inputDBTag:
205 if self.options.run_on_RAW:
207 if self.options.preselection:
211 """ Common operations used in most prepare_[workflow_mode]_erite functions""" 213 output_path = os.path.join( self.
local_path,
"unmerged_results" )
214 merged_file = os.path.join(self.
result_path, self.output_file)
217 if not (self.options.skip_stageout
or self.
files_reveived or self.options.no_exec):
219 log.info(
"Received files from storage element")
220 log.info(
"Using hadd to merge output files")
221 if not self.options.no_exec
and do_hadd:
224 raise RuntimeError(
"Failed to merge files with hadd")
225 return crabtask.crabConfig.Data.outputDatasetTag
229 self.process.calibDB.connect =
'sqlite_file:%s' % db_path
235 out_path = os.path.abspath(os.path.join(path,
236 os.path.splitext(db_path)[0] +
".txt"))
238 self.process.dumpToFile.outputFileName = out_path
245 connect=
'sqlite_file:',
250 calibDB = cms.ESSource(
"PoolDBESSource",
252 timetype = cms.string(
'runnumber'),
253 toGet = cms.VPSet(cms.PSet(
254 record = cms.string(record),
255 tag = cms.string(tag),
256 label = cms.untracked.string(label)
259 calibDB.connect = cms.string(
str(connect) )
261 if 'oracle:' in connect:
262 calibDB.DBParameters.authenticationPath =
'/afs/cern.ch/cms/DB/conddb' 263 setattr(process,moduleName,calibDB)
264 setattr(process,
"es_prefer_" + moduleName,cms.ESPrefer(
'PoolDBESSource',
269 self.crab.callCrabCommand( [
"getoutput",
272 crabtask.crabFolder ] )
275 """ Run a cmsRun job locally. The member variable self.pset_path is used 276 if pset_path argument is not given""" 277 if self.options.no_exec:
279 process = subprocess.Popen(
"cmsRun %s" % self.
pset_path,
280 stdout=subprocess.PIPE,
281 stderr=subprocess.STDOUT,
283 stdout = process.communicate()[0]
285 if process.returncode != 0:
286 raise RuntimeError(
"Failed to use cmsRun for pset %s" % self.
pset_name)
287 return process.returncode
291 """ Output path on remote excluding user base path 292 Returns a dict if crab is used due to crab path setting policy""" 293 if self.options.command ==
"submit":
295 "outLFNDirBase" : os.path.join(
"/store",
301 "outputDatasetTag" : self.
tag 304 return os.path.join(
'DTCalibration/',
306 'Run' +
str(self.options.run),
309 'v' +
str(self.options.trial),
313 if not self.options.workflow_mode
in self.outpath_workflow_mode_dict:
314 raise NotImplementedError(
"%s missing in outpath_workflow_mode_dict" % self.options.workflow_mode)
315 return self.outpath_workflow_mode_dict[self.options.workflow_mode]
319 return 'Run' +
str(self.options.run) +
'_v' +
str(self.options.trial)
325 if hasattr(self.
options,
"user")
and self.options.user:
326 self.
_user = self.options.user
328 self.
_user = self.crab.checkusername()
333 """ Output path on local machine """ 334 if self.options.run
and self.options.label:
335 prefix =
"Run%d-%s_v%d" % ( self.options.run,
341 path = os.path.join( self.options.working_dir,
345 path = os.path.join( self.options.working_dir,
352 result_path = os.path.abspath(os.path.join(self.
local_path,
"results"))
353 if not os.path.exists(result_path):
354 os.makedirs(result_path)
359 """ Base path to folder containing pset files for cmsRun""" 360 return os.path.expandvars(os.path.join(
"$CMSSW_BASE",
369 """ full path to the pset file """ 370 basepath = os.path.join( self.
local_path,
"psets")
371 if not os.path.exists( basepath ):
372 os.makedirs( basepath )
373 return os.path.join( basepath, self.
pset_name )
376 if not hasattr(self,
"process"):
377 raise NameError(
"Process is not initalized in workflow object")
381 pfile.write(self.process.dumpPython())
384 """ Create the name for the output json file which will be dumped""" 386 command = self.options.command
387 return "config_" + command +
".json" 391 json.dump(vars(self.
options), out_file, indent=4)
394 if not os.path.exists(config_file_path):
395 raise IOError(
"File %s not found" % config_file_path)
396 with open(config_file_path,
"r") as input_file: 397 config_json = json.load(input_file) 398 for key, val
in config_json.items():
399 if not hasattr(self.
options, key)
or not getattr(self.
options, key):
400 setattr(self.
options, key, val)
403 """Load options for previous command in workflow """ 404 if not self.options.config_path:
405 if not self.options.run:
406 raise RuntimeError(
"Option run is required if no config path specified")
408 raise IOError(
"Local path %s does not exist" % self.
local_path)
409 self.options.config_path = os.path.join(self.
local_path,
def remote_out_path(self)
def add_local_vdrift_db(self, local=False)
def addPoolDBESSource(process, moduleName, record, tag, connect='sqlite_file:', label='')
def add_local_t0_db(self, local=False)
required_options_prepare_dict
def get_config_name(self, command="")
def add_local_calib_db(self, local=False)
def load_options_command(self, command)
def outpath_workflow_mode_tag(self)
def prepare_common_write(self, do_hadd=True)
def fill_required_options_dict(self)
def get_output_files(self, crabtask, output_path)
def check_missing_options(self, requirements_dict)
def crab_config_filepath(self)
def load_options(self, config_file_path)
def submit_crab_task(self)
static std::string join(char **cmd)
def __init__(self, options)
def fill_required_options_prepare_dict(self)
def runCMSSWtask(self, pset_path="")
def prepare_common_submit(self)
def prepare_common_dump(self, db_path)
def pset_template_base_bath(self)
def write_pset_file(self)
def add_preselection(self)
def prepare_workflow(self)
def add_local_custom_db(self)