1 from __future__
import print_function
2 from __future__
import absolute_import
13 from .CLIHelper
import CLIHelper
14 from .CrabHelper
import CrabHelper
15 import FWCore.ParameterSet.Config
as cms
16 log = logging.getLogger(__name__)
19 """ This is the base class for all DTWorkflows and contains some
41 os.chdir(self.
options.working_dir)
46 if self.
options.command
in requirements_dict:
47 for option
in requirements_dict[self.
options.command]:
48 if not (hasattr(self.
options, option)
49 and ( (getattr(self.
options,option))
50 or isinstance(getattr(self.
options,option), bool) )):
51 missing_options.append(option)
52 if len(missing_options) > 0:
53 err =
"The following CLI options are missing"
54 err +=
" for command %s: " % self.
options.command
55 err +=
" ".
join(missing_options)
59 """ Generalized function to run workflow command"""
60 msg =
"Preparing %s workflow" % self.
options.workflow
61 if hasattr(self.
options,
"command"):
62 msg +=
" for command %s" % self.
options.command
77 run_function = getattr(self, self.
options.command)
78 except AttributeError:
79 errmsg =
"Class `{}` does not implement `{}` for workflow %s" % self.
options.workflow
80 if hasattr(self.
options,
"workflow_mode"):
81 errmsg +=
"and workflow mode %s" % self.
options.workflow_mode
82 raise NotImplementedError( errmsg.format(self.__class__.__name__,
84 log.debug(
"Running command %s" % self.
options.command)
89 """ Abstract implementation of prepare workflow function"""
90 errmsg =
"Class `{}` does not implement `{}`"
91 raise NotImplementedError( errmsg.format(self.__class__.__name__,
95 """ generalized function to perform several workflow mode commands in chain.
96 All commands mus be specified in self.all_commands list in workflow mode specific
97 prepare function in child workflow objects.
100 for command
in self.all_commands:
108 """ Function to check status of submitted tasks """
121 """ Add preselection to the process object stored in workflow_object"""
122 if not hasattr(self,
"process"):
123 raise NameError(
"Process is not initalized in workflow object")
124 pathsequence = self.
options.preselection.split(
':')[0]
125 seqname = self.
options.preselection.split(
':')[1]
134 """ Add a local t0 database as input. Use the option local is used
135 if the pset is processed locally and not with crab.
138 connect = os.path.abspath(self.
options.inputT0DB)
140 connect = os.path.basename(self.
options.inputT0DB)
145 connect =
'sqlite_file:%s' % connect)
149 """ Add a local vdrift database as input. Use the option local is used
150 if the pset is processed locally and not with crab.
153 connect = os.path.abspath(self.
options.inputVDriftDB)
155 connect = os.path.basename(self.
options.inputVDriftDB)
157 moduleName =
'vDriftDB',
158 record =
'DTMtimeRcd',
160 connect =
'sqlite_file:%s' % connect)
164 """ Add a local calib database as input. Use the option local is used
165 if the pset is processed locally and not with crab.
168 if self.
options.datasettype ==
"Cosmics":
171 connect = os.path.abspath(self.
options.inputCalibDB)
173 connect = os.path.basename(self.
options.inputCalibDB)
175 moduleName =
'calibDB',
176 record =
'DTTtrigRcd',
178 connect =
str(
"sqlite_file:%s" % connect),
184 for option
in (
'inputDBRcd',
'connectStrDBTag'):
185 if hasattr(self.
options, option)
and not getattr(self.
options, option):
186 raise ValueError(
"Option %s needed for custom input db" % option)
188 record = self.
options.inputDBRcd,
190 connect = self.
options.connectStrDBTag,
191 moduleName =
'customDB%s' % self.
options.inputDBRcd
195 """ Common operations used in most prepare_[workflow_mode]_submit functions"""
197 raise ValueError(
"Option run is required for submission!")
201 if hasattr(self.
options,
"inputVDriftDB")
and self.
options.inputVDriftDB:
204 if hasattr(self.
options,
"inputDBTag")
and self.
options.inputDBTag:
213 """ Common operations used in most prepare_[workflow_mode]_erite functions"""
215 output_path = os.path.join( self.
local_path,
"unmerged_results" )
216 merged_file = os.path.join(self.
result_path, self.output_file)
221 log.info(
"Received files from storage element")
222 log.info(
"Using hadd to merge output files")
223 if not self.
options.no_exec
and do_hadd:
226 raise RuntimeError(
"Failed to merge files with hadd")
227 return crabtask.crabConfig.Data.outputDatasetTag
231 self.
process.calibDB.connect =
'sqlite_file:%s' % db_path
237 out_path = os.path.abspath(os.path.join(path,
238 os.path.splitext(db_path)[0] +
".txt"))
240 self.
process.dumpToFile.outputFileName = out_path
247 connect='sqlite_file:
',
252 calibDB = cms.ESSource(
"PoolDBESSource",
254 timetype = cms.string(
'runnumber'),
255 toGet = cms.VPSet(cms.PSet(
256 record = cms.string(record),
257 tag = cms.string(tag),
258 label = cms.untracked.string(label)
261 calibDB.connect = cms.string(
str(connect) )
263 if 'oracle:' in connect:
264 calibDB.DBParameters.authenticationPath =
'/afs/cern.ch/cms/DB/conddb'
265 setattr(process,moduleName,calibDB)
266 setattr(process,
"es_prefer_" + moduleName,cms.ESPrefer(
'PoolDBESSource',
271 self.
crab.callCrabCommand( [
"getoutput",
274 crabtask.crabFolder ] )
277 """ Run a cmsRun job locally. The member variable self.pset_path is used
278 if pset_path argument is not given"""
281 process = subprocess.Popen(
"cmsRun %s" % self.
pset_path,
282 stdout=subprocess.PIPE,
283 stderr=subprocess.STDOUT,
285 stdout = process.communicate()[0]
287 if process.returncode != 0:
288 raise RuntimeError(
"Failed to use cmsRun for pset %s" % self.
pset_name)
289 return process.returncode
293 """ Output path on remote excluding user base path
294 Returns a dict if crab is used due to crab path setting policy"""
295 if self.
options.command ==
"submit":
297 "outLFNDirBase" : os.path.join(
"/store",
303 "outputDatasetTag" : self.
tag
306 return os.path.join(
'DTCalibration/',
315 if not self.
options.workflow_mode
in self.outpath_workflow_mode_dict:
316 raise NotImplementedError(
"%s missing in outpath_workflow_mode_dict" % self.
options.workflow_mode)
317 return self.outpath_workflow_mode_dict[self.
options.workflow_mode]
335 """ Output path on local machine """
337 prefix =
"Run%d-%s_v%d" % ( self.
options.run,
343 path = os.path.join( self.
options.working_dir,
347 path = os.path.join( self.
options.working_dir,
354 result_path = os.path.abspath(os.path.join(self.
local_path,
"results"))
355 if not os.path.exists(result_path):
356 os.makedirs(result_path)
361 """ Base path to folder containing pset files for cmsRun"""
362 return os.path.expandvars(os.path.join(
"$CMSSW_BASE",
371 """ full path to the pset file """
372 basepath = os.path.join( self.
local_path,
"psets")
373 if not os.path.exists( basepath ):
374 os.makedirs( basepath )
375 return os.path.join( basepath, self.
pset_name )
378 if not hasattr(self,
"process"):
379 raise NameError(
"Process is not initalized in workflow object")
386 """ Create the name for the output json file which will be dumped"""
389 return "config_" + command +
".json"
393 json.dump(vars(self.
options), out_file, indent=4)
396 if not os.path.exists(config_file_path):
397 raise IOError(
"File %s not found" % config_file_path)
398 with open(config_file_path,
"r")
as input_file:
399 config_json = json.load(input_file)
400 for key, val
in config_json.items():
401 if not hasattr(self.
options, key)
or not getattr(self.
options, key):
402 setattr(self.
options, key, val)
405 """Load options for previous command in workflow """
406 if not self.
options.config_path:
408 raise RuntimeError(
"Option run is required if no config path specified")
410 raise IOError(
"Local path %s does not exist" % self.
local_path)