1 from __future__
import print_function
2 from __future__
import absolute_import
13 from .CLIHelper
import CLIHelper
14 from .CrabHelper
import CrabHelper
15 import FWCore.ParameterSet.Config
as cms
16 log = logging.getLogger(__name__)
19 """ This is the base class for all DTWorkflows and contains some 41 os.chdir(self.
options.working_dir)
46 if self.
options.command
in requirements_dict:
47 for option
in requirements_dict[self.
options.command]:
48 if not (hasattr(self.
options, option)
49 and ( (getattr(self.
options,option))
50 or isinstance(getattr(self.
options,option), bool) )):
51 missing_options.append(option)
52 if len(missing_options) > 0:
53 err =
"The following CLI options are missing" 54 err +=
" for command %s: " % self.
options.command
55 err +=
" ".
join(missing_options)
59 """ Generalized function to run workflow command""" 60 msg =
"Preparing %s workflow" % self.
options.workflow
61 if hasattr(self.
options,
"command"):
62 msg +=
" for command %s" % self.
options.command
77 run_function = getattr(self, self.
options.command)
78 except AttributeError:
79 errmsg =
"Class `{}` does not implement `{}` for workflow %s" % self.
options.workflow
80 if hasattr(self.
options,
"workflow_mode"):
81 errmsg +=
"and workflow mode %s" % self.
options.workflow_mode
82 raise NotImplementedError( errmsg.format(self.__class__.__name__,
84 log.debug(
"Running command %s" % self.
options.command)
89 """ Abstract implementation of prepare workflow function""" 90 errmsg =
"Class `{}` does not implement `{}`" 91 raise NotImplementedError( errmsg.format(self.__class__.__name__,
95 """ generalized function to perform several workflow mode commands in chain. 96 All commands mus be specified in self.all_commands list in workflow mode specific 97 prepare function in child workflow objects. 100 for command
in self.all_commands:
101 log.info(f
"Will run command: {command}")
109 """ Function to check status of submitted tasks """ 122 """ Add preselection to the process object stored in workflow_object""" 123 if not hasattr(self,
"process"):
124 raise NameError(
"Process is not initalized in workflow object")
125 pathsequence = self.
options.preselection.split(
':')[0]
126 seqname = self.
options.preselection.split(
':')[1]
135 """ Add a local t0 database as input. Use the option local is used 136 if the pset is processed locally and not with crab. 139 connect = os.path.abspath(self.
options.inputT0DB)
141 connect = os.path.basename(self.
options.inputT0DB)
146 connect =
'sqlite_file:%s' % connect)
150 """ Add a local vdrift database as input. Use the option local is used 151 if the pset is processed locally and not with crab. 154 connect = os.path.abspath(self.
options.inputVDriftDB)
156 connect = os.path.basename(self.
options.inputVDriftDB)
158 moduleName =
'vDriftDB',
159 record =
'DTMtimeRcd',
161 connect =
'sqlite_file:%s' % connect)
165 """ Add a local calib database as input. Use the option local is used 166 if the pset is processed locally and not with crab. 169 if self.
options.datasettype ==
"Cosmics":
172 connect = os.path.abspath(self.
options.inputCalibDB)
174 connect = os.path.basename(self.
options.inputCalibDB)
176 moduleName =
'calibDB',
177 record =
'DTTtrigRcd',
179 connect =
str(
"sqlite_file:%s" % connect),
185 for option
in (
'inputDBRcd',
'connectStrDBTag'):
186 if hasattr(self.
options, option)
and not getattr(self.
options, option):
187 raise ValueError(
"Option %s needed for custom input db" % option)
189 record = self.
options.inputDBRcd,
191 connect = self.
options.connectStrDBTag,
192 moduleName =
'customDB%s' % self.
options.inputDBRcd
196 """ Common operations used in most prepare_[workflow_mode]_submit functions""" 198 raise ValueError(
"Option run is required for submission!")
202 if hasattr(self.
options,
"inputVDriftDB")
and self.
options.inputVDriftDB:
205 if hasattr(self.
options,
"inputDBTag")
and self.
options.inputDBTag:
214 """ Common operations used in most prepare_[workflow_mode]_erite functions""" 216 output_path = os.path.join( self.
local_path,
"unmerged_results" )
217 merged_file = os.path.join(self.
result_path, self.output_file)
222 if "xrootd" not in output_files.keys():
223 raise RuntimeError(
"Could not get output files. No xrootd key found.")
224 if len(output_files[
"xrootd"]) == 0:
225 raise RuntimeError(
"Could not get output files. Output file list is empty.")
226 log.info(
"Received files from storage element")
227 log.info(
"Using hadd to merge output files")
228 if not self.
options.no_exec
and do_hadd:
231 raise RuntimeError(
"Failed to merge files with hadd")
232 return crabtask.crabConfig.Data.outputDatasetTag
236 self.
process.calibDB.connect =
'sqlite_file:%s' % db_path
242 out_path = os.path.abspath(os.path.join(path,
243 os.path.splitext(db_path)[0] +
".txt"))
245 self.
process.dumpToFile.outputFileName = out_path
252 connect='sqlite_file:
', 257 calibDB = cms.ESSource(
"PoolDBESSource",
259 timetype = cms.string(
'runnumber'),
260 toGet = cms.VPSet(cms.PSet(
261 record = cms.string(record),
262 tag = cms.string(tag),
263 label = cms.untracked.string(label)
266 calibDB.connect = cms.string(
str(connect) )
268 if 'oracle:' in connect:
269 calibDB.DBParameters.authenticationPath =
'/afs/cern.ch/cms/DB/conddb' 270 setattr(process,moduleName,calibDB)
271 setattr(process,
"es_prefer_" + moduleName,cms.ESPrefer(
'PoolDBESSource',
276 res = self.
crab.callCrabCommand( [
"getoutput",
279 crabtask.crabFolder ] )
284 """ Run a cmsRun job locally. The member variable self.pset_path is used 285 if pset_path argument is not given""" 288 process = subprocess.Popen(
"cmsRun %s" % self.
pset_path,
289 stdout=subprocess.PIPE,
290 stderr=subprocess.STDOUT,
292 stdout = process.communicate()[0]
294 if process.returncode != 0:
295 raise RuntimeError(
"Failed to use cmsRun for pset %s" % self.
pset_name)
296 return process.returncode
300 """ Output path on remote excluding user base path 301 Returns a dict if crab is used due to crab path setting policy""" 302 if self.
options.command ==
"submit":
304 "outLFNDirBase" : os.path.join(
"/store",
310 "outputDatasetTag" : self.
tag 313 return os.path.join(
'DTCalibration/',
322 if not self.
options.workflow_mode
in self.outpath_workflow_mode_dict:
323 raise NotImplementedError(
"%s missing in outpath_workflow_mode_dict" % self.
options.workflow_mode)
324 return self.outpath_workflow_mode_dict[self.
options.workflow_mode]
342 """ Output path on local machine """ 344 prefix =
"Run%d-%s_v%d" % ( self.
options.run,
350 path = os.path.join( self.
options.working_dir,
354 path = os.path.join( self.
options.working_dir,
361 result_path = os.path.abspath(os.path.join(self.
local_path,
"results"))
362 if not os.path.exists(result_path):
363 os.makedirs(result_path)
368 """ Base path to folder containing pset files for cmsRun""" 369 return os.path.expandvars(os.path.join(
"$CMSSW_BASE",
378 """ full path to the pset file """ 379 basepath = os.path.join( self.
local_path,
"psets")
380 if not os.path.exists( basepath ):
381 os.makedirs( basepath )
382 return os.path.join( basepath, self.
pset_name )
385 if not hasattr(self,
"process"):
386 raise NameError(
"Process is not initalized in workflow object")
393 """ Create the name for the output json file which will be dumped""" 396 return "config_" + command +
".json" 400 json.dump(vars(self.
options), out_file, indent=4)
403 if not os.path.exists(config_file_path):
404 raise IOError(
"File %s not found" % config_file_path)
405 with open(config_file_path,
"r") as input_file: 406 config_json = json.load(input_file) 407 for key, val
in config_json.items():
408 if not hasattr(self.
options, key)
or not getattr(self.
options, key):
409 setattr(self.
options, key, val)
412 """Load options for previous command in workflow """ 413 if not self.
options.config_path:
415 raise RuntimeError(
"Option run is required if no config path specified")
417 raise IOError(
"Local path %s does not exist" % self.
local_path)
def remote_out_path(self)
def add_local_vdrift_db(self, local=False)
def addPoolDBESSource(process, moduleName, record, tag, connect='sqlite_file:', label='')
def add_local_t0_db(self, local=False)
required_options_prepare_dict
def get_config_name(self, command="")
def add_local_calib_db(self, local=False)
def load_options_command(self, command)
def outpath_workflow_mode_tag(self)
def prepare_common_write(self, do_hadd=True)
def fill_required_options_dict(self)
def get_output_files(self, crabtask, output_path)
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
def check_missing_options(self, requirements_dict)
def crab_config_filepath(self)
def load_options(self, config_file_path)
def submit_crab_task(self)
static std::string join(char **cmd)
def __init__(self, options)
def dumpPython(process, name)
def fill_required_options_prepare_dict(self)
def runCMSSWtask(self, pset_path="")
def prepare_common_submit(self)
def prepare_common_dump(self, db_path)
def pset_template_base_bath(self)
def write_pset_file(self)
def add_preselection(self)
def prepare_workflow(self)
def add_local_custom_db(self)