CMS 3D CMS Logo

DTWorkflow.py
Go to the documentation of this file.
1 from __future__ import print_function
2 from __future__ import absolute_import
3 import os,sys
4 import glob
5 import logging
6 import argparse
7 import subprocess
8 import time, datetime
9 import urllib
10 import json
11 
12 from . import tools
13 from .CLIHelper import CLIHelper
14 from .CrabHelper import CrabHelper
15 import FWCore.ParameterSet.Config as cms
16 log = logging.getLogger(__name__)
17 
19  """ This is the base class for all DTWorkflows and contains some
20  common tasks """
21  def __init__(self, options):
22  self.options = options
23  super( DTWorkflow, self ).__init__()
24  self.digilabel = "muonDTDigis"
25  # dict to hold required variables. Can not be marked in argparse to allow
26  # loading of options from config
31  # These variables are determined in the derived classes
32  self.pset_name = ""
34  self.output_files = []
35  self.input_files = []
36 
37  self.run_all_command = False
38  self.files_reveived = False
39  self._user = ""
40  # change to working directory
41  os.chdir(self.options.working_dir)
42 
43  def check_missing_options(self, requirements_dict):
44  missing_options = []
45  # check if all required options exist
46  if self.options.command in requirements_dict:
47  for option in requirements_dict[self.options.command]:
48  if not (hasattr(self.options, option)
49  and ( (getattr(self.options,option))
50  or isinstance(getattr(self.options,option), bool) )):
51  missing_options.append(option)
52  if len(missing_options) > 0:
53  err = "The following CLI options are missing"
54  err += " for command %s: " % self.options.command
55  err += " ".join(missing_options)
56  raise ValueError(err)
57 
58  def run(self):
59  """ Generalized function to run workflow command"""
60  msg = "Preparing %s workflow" % self.options.workflow
61  if hasattr(self.options, "command"):
62  msg += " for command %s" % self.options.command
63  log.info(msg)
64  if self.options.config_path:
65  self.load_options( self.options.config_path )
66  #check if all options to prepare the command are used
68  self.prepare_workflow()
69  # create output folder if they do not exist yet
70  if not os.path.exists( self.local_path ):
71  os.makedirs(self.local_path)
72  # dump used options
73  self.dump_options()
74  #check if all options to run the command are used
76  try:
77  run_function = getattr(self, self.options.command)
78  except AttributeError:
79  errmsg = "Class `{}` does not implement `{}` for workflow %s" % self.options.workflow
80  if hasattr(self.options, "workflow_mode"):
81  errmsg += "and workflow mode %s" % self.options.workflow_mode
82  raise NotImplementedError( errmsg.format(self.__class__.__name__,
83  self.options.command))
84  log.debug("Running command %s" % self.options.command)
85  # call chosen function
86  run_function()
87 
88  def prepare_workflow(self):
89  """ Abstract implementation of prepare workflow function"""
90  errmsg = "Class `{}` does not implement `{}`"
91  raise NotImplementedError( errmsg.format(self.__class__.__name__,
92  "prepare_workflow"))
93 
94  def all(self):
95  """ generalized function to perform several workflow mode commands in chain.
96  All commands mus be specified in self.all_commands list in workflow mode specific
97  prepare function in child workflow objects.
98  """
99  self.run_all_command = True
100  for command in self.all_commands:
101  log.info(f"Will run command: {command}")
102  self.options.command = command
103  self.run()
104 
105  def submit(self):
106  self.submit_crab_task()
107 
108  def check(self):
109  """ Function to check status of submitted tasks """
110  self.check_crabtask()
111 
112  def write(self):
113  self.runCMSSWtask()
114 
115  def dump(self):
116  self.runCMSSWtask()
117 
118  def correction(self):
119  self.runCMSSWtask()
120 
121  def add_preselection(self):
122  """ Add preselection to the process object stored in workflow_object"""
123  if not hasattr(self, "process"):
124  raise NameError("Process is not initalized in workflow object")
125  pathsequence = self.options.preselection.split(':')[0]
126  seqname = self.options.preselection.split(':')[1]
127  self.process.load(pathsequence)
128  tools.prependPaths(self.process, seqname)
129 
130  def add_raw_option(self):
131  getattr(self.process, self.digilabel).inputLabel = self.options.raw_data_label
133 
134  def add_local_t0_db(self, local=False):
135  """ Add a local t0 database as input. Use the option local is used
136  if the pset is processed locally and not with crab.
137  """
138  if local:
139  connect = os.path.abspath(self.options.inputT0DB)
140  else:
141  connect = os.path.basename(self.options.inputT0DB)
142  self.addPoolDBESSource( process = self.process,
143  moduleName = 't0DB',
144  record = 'DTT0Rcd',
145  tag = 't0',
146  connect = 'sqlite_file:%s' % connect)
147  self.input_files.append(os.path.abspath(self.options.inputT0DB))
148 
149  def add_local_vdrift_db(self, local=False):
150  """ Add a local vdrift database as input. Use the option local is used
151  if the pset is processed locally and not with crab.
152  """
153  if local:
154  connect = os.path.abspath(self.options.inputVDriftDB)
155  else:
156  connect = os.path.basename(self.options.inputVDriftDB)
157  self.addPoolDBESSource( process = self.process,
158  moduleName = 'vDriftDB',
159  record = 'DTMtimeRcd',
160  tag = 'vDrift',
161  connect = 'sqlite_file:%s' % connect)
162  self.input_files.append( os.path.abspath(self.options.inputVDriftDB) )
163 
164  def add_local_calib_db(self, local=False):
165  """ Add a local calib database as input. Use the option local is used
166  if the pset is processed locally and not with crab.
167  """
168  label = ''
169  if self.options.datasettype == "Cosmics":
170  label = 'cosmics'
171  if local:
172  connect = os.path.abspath(self.options.inputCalibDB)
173  else:
174  connect = os.path.basename(self.options.inputCalibDB)
175  self.addPoolDBESSource( process = self.process,
176  moduleName = 'calibDB',
177  record = 'DTTtrigRcd',
178  tag = 'ttrig',
179  connect = str("sqlite_file:%s" % connect),
180  label = label
181  )
182  self.input_files.append( os.path.abspath(self.options.inputCalibDB) )
183 
185  for option in ('inputDBRcd', 'connectStrDBTag'):
186  if hasattr(self.options, option) and not getattr(self.options, option):
187  raise ValueError("Option %s needed for custom input db" % option)
188  self.addPoolDBESSource( process = self.process,
189  record = self.options.inputDBRcd,
190  tag = self.options.inputDBTag,
191  connect = self.options.connectStrDBTag,
192  moduleName = 'customDB%s' % self.options.inputDBRcd
193  )
194 
196  """ Common operations used in most prepare_[workflow_mode]_submit functions"""
197  if not self.options.run:
198  raise ValueError("Option run is required for submission!")
199  if hasattr(self.options, "inputT0DB") and self.options.inputT0DB:
200  self.add_local_t0_db()
201 
202  if hasattr(self.options, "inputVDriftDB") and self.options.inputVDriftDB:
203  self.add_local_vdrift_db()
204 
205  if hasattr(self.options, "inputDBTag") and self.options.inputDBTag:
206  self.add_local_custom_db()
207 
208  if self.options.run_on_RAW:
209  self.add_raw_option()
210  if self.options.preselection:
211  self.add_preselection()
212 
213  def prepare_common_write(self, do_hadd=True):
214  """ Common operations used in most prepare_[workflow_mode]_erite functions"""
215  self.load_options_command("submit")
216  output_path = os.path.join( self.local_path, "unmerged_results" )
217  merged_file = os.path.join(self.result_path, self.output_file)
218  crabtask = self.crabFunctions.CrabTask(crab_config = self.crab_config_filepath,
219  initUpdate = False)
220  if not (self.options.skip_stageout or self.files_reveived or self.options.no_exec):
221  output_files = self.get_output_files(crabtask, output_path)
222  if "xrootd" not in output_files.keys():
223  raise RuntimeError("Could not get output files. No xrootd key found.")
224  if len(output_files["xrootd"]) == 0:
225  raise RuntimeError("Could not get output files. Output file list is empty.")
226  log.info("Received files from storage element")
227  log.info("Using hadd to merge output files")
228  if not self.options.no_exec and do_hadd:
229  returncode = tools.haddLocal(output_files["xrootd"], merged_file)
230  if returncode != 0:
231  raise RuntimeError("Failed to merge files with hadd")
232  return crabtask.crabConfig.Data.outputDatasetTag
233 
234  def prepare_common_dump(self, db_path):
235  self.process = tools.loadCmsProcess(self.pset_template)
236  self.process.calibDB.connect = 'sqlite_file:%s' % db_path
237  try:
238  path = self.result_path
239  except:
240  path = os.getcwd()
241  print("path", path)
242  out_path = os.path.abspath(os.path.join(path,
243  os.path.splitext(db_path)[0] + ".txt"))
244 
245  self.process.dumpToFile.outputFileName = out_path
246 
247  @staticmethod
248  def addPoolDBESSource( process,
249  moduleName,
250  record,
251  tag,
252  connect='sqlite_file:',
253  label='',):
254 
255  from CondCore.CondDB.CondDB_cfi import CondDB
256 
257  calibDB = cms.ESSource("PoolDBESSource",
258  CondDB,
259  timetype = cms.string('runnumber'),
260  toGet = cms.VPSet(cms.PSet(
261  record = cms.string(record),
262  tag = cms.string(tag),
263  label = cms.untracked.string(label)
264  )),
265  )
266  calibDB.connect = cms.string( str(connect) )
267  #if authPath: calibDB.DBParameters.authenticationPath = authPath
268  if 'oracle:' in connect:
269  calibDB.DBParameters.authenticationPath = '/afs/cern.ch/cms/DB/conddb'
270  setattr(process,moduleName,calibDB)
271  setattr(process,"es_prefer_" + moduleName,cms.ESPrefer('PoolDBESSource',
272  moduleName)
273  )
274 
275  def get_output_files(self, crabtask, output_path):
276  res = self.crab.callCrabCommand( ["getoutput",
277  "--dump",
278  "--xrootd",
279  crabtask.crabFolder ] )
280 
281  return res
282 
283  def runCMSSWtask(self, pset_path=""):
284  """ Run a cmsRun job locally. The member variable self.pset_path is used
285  if pset_path argument is not given"""
286  if self.options.no_exec:
287  return 0
288  process = subprocess.Popen( "cmsRun %s" % self.pset_path,
289  stdout=subprocess.PIPE,
290  stderr=subprocess.STDOUT,
291  shell = True)
292  stdout = process.communicate()[0]
293  log.info(stdout)
294  if process.returncode != 0:
295  raise RuntimeError("Failed to use cmsRun for pset %s" % self.pset_name)
296  return process.returncode
297 
298  @property
299  def remote_out_path(self):
300  """ Output path on remote excluding user base path
301  Returns a dict if crab is used due to crab path setting policy"""
302  if self.options.command =="submit":
303  return {
304  "outLFNDirBase" : os.path.join( "/store",
305  "user",
306  self.user,
307  'DTCalibration/',
308  self.outpath_command_tag,
310  "outputDatasetTag" : self.tag
311  }
312  else:
313  return os.path.join( 'DTCalibration/',
314  datasetstr,
315  'Run' + str(self.options.run),
316  self.outpath_command_tag,
318  'v' + str(self.options.trial),
319  )
320  @property
322  if not self.options.workflow_mode in self.outpath_workflow_mode_dict:
323  raise NotImplementedError("%s missing in outpath_workflow_mode_dict" % self.options.workflow_mode)
324  return self.outpath_workflow_mode_dict[self.options.workflow_mode]
325 
326  @property
327  def tag(self):
328  return 'Run' + str(self.options.run) + '_v' + str(self.options.trial)
329 
330  @property
331  def user(self):
332  if self._user:
333  return self._user
334  if hasattr(self.options, "user") and self.options.user:
335  self._user = self.options.user
336  else:
337  self._user = self.crab.checkusername()
338  return self._user
339 
340  @property
341  def local_path(self):
342  """ Output path on local machine """
343  if self.options.run and self.options.label:
344  prefix = "Run%d-%s_v%d" % ( self.options.run,
345  self.options.label,
346  self.options.trial)
347  else:
348  prefix = ""
350  path = os.path.join( self.options.working_dir,
351  prefix,
353  else:
354  path = os.path.join( self.options.working_dir,
355  prefix,
356  self.outpath_command_tag )
357  return path
358 
359  @property
360  def result_path(self):
361  result_path = os.path.abspath(os.path.join(self.local_path,"results"))
362  if not os.path.exists(result_path):
363  os.makedirs(result_path)
364  return result_path
365 
366  @property
368  """ Base path to folder containing pset files for cmsRun"""
369  return os.path.expandvars(os.path.join("$CMSSW_BASE",
370  "src",
371  "CalibMuon",
372  "test",
373  )
374  )
375 
376  @property
377  def pset_path(self):
378  """ full path to the pset file """
379  basepath = os.path.join( self.local_path, "psets")
380  if not os.path.exists( basepath ):
381  os.makedirs( basepath )
382  return os.path.join( basepath, self.pset_name )
383 
384  def write_pset_file(self):
385  if not hasattr(self, "process"):
386  raise NameError("Process is not initalized in workflow object")
387  if not os.path.exists(self.local_path):
388  os.makedirs(self.local_path)
389  with open( self.pset_path,'w') as pfile:
390  pfile.write(self.process.dumpPython())
391 
392  def get_config_name(self, command= ""):
393  """ Create the name for the output json file which will be dumped"""
394  if not command:
395  command = self.options.command
396  return "config_" + command + ".json"
397 
398  def dump_options(self):
399  with open(os.path.join(self.local_path, self.get_config_name()),"w") as out_file:
400  json.dump(vars(self.options), out_file, indent=4)
401 
402  def load_options(self, config_file_path):
403  if not os.path.exists(config_file_path):
404  raise IOError("File %s not found" % config_file_path)
405  with open(config_file_path, "r") as input_file:
406  config_json = json.load(input_file)
407  for key, val in config_json.items():
408  if not hasattr(self.options, key) or not getattr(self.options, key):
409  setattr(self.options, key, val)
410 
411  def load_options_command(self, command ):
412  """Load options for previous command in workflow """
413  if not self.options.config_path:
414  if not self.options.run:
415  raise RuntimeError("Option run is required if no config path specified")
416  if not os.path.exists(self.local_path):
417  raise IOError("Local path %s does not exist" % self.local_path)
418  self.options.config_path = os.path.join(self.local_path,
419  self.get_config_name(command))
420  self.load_options( self.options.config_path )
421 
def remote_out_path(self)
Definition: DTWorkflow.py:299
def add_local_vdrift_db(self, local=False)
Definition: DTWorkflow.py:149
def addPoolDBESSource(process, moduleName, record, tag, connect='sqlite_file:', label='')
Definition: DTWorkflow.py:253
def add_local_t0_db(self, local=False)
Definition: DTWorkflow.py:134
def get_config_name(self, command="")
Definition: DTWorkflow.py:392
def add_local_calib_db(self, local=False)
Definition: DTWorkflow.py:164
def load_options_command(self, command)
Definition: DTWorkflow.py:411
def outpath_workflow_mode_tag(self)
Definition: DTWorkflow.py:321
def prepare_common_write(self, do_hadd=True)
Definition: DTWorkflow.py:213
def fill_required_options_dict(self)
Definition: CLIHelper.py:39
def haddLocal(files, result_file, extension='root')
Definition: tools.py:41
def get_output_files(self, crabtask, output_path)
Definition: DTWorkflow.py:275
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
def check_missing_options(self, requirements_dict)
Definition: DTWorkflow.py:43
def crab_config_filepath(self)
Definition: CrabHelper.py:232
def prependPaths(process, seqname)
Definition: tools.py:64
def load_options(self, config_file_path)
Definition: DTWorkflow.py:402
def loadCmsProcess(psetPath)
Definition: tools.py:54
def submit_crab_task(self)
Definition: CrabHelper.py:23
static std::string join(char **cmd)
Definition: RemoteFile.cc:21
def __init__(self, options)
Definition: DTWorkflow.py:21
def add_raw_option(self)
Definition: DTWorkflow.py:130
def load(fileName)
Definition: svgfig.py:547
def dumpPython(process, name)
def fill_required_options_prepare_dict(self)
Definition: CLIHelper.py:35
def runCMSSWtask(self, pset_path="")
Definition: DTWorkflow.py:283
def prepare_common_submit(self)
Definition: DTWorkflow.py:195
def prepare_common_dump(self, db_path)
Definition: DTWorkflow.py:234
def dump_options(self)
Definition: DTWorkflow.py:398
def check_crabtask(self)
Definition: CrabHelper.py:51
def pset_template_base_bath(self)
Definition: DTWorkflow.py:367
def write_pset_file(self)
Definition: DTWorkflow.py:384
def add_preselection(self)
Definition: DTWorkflow.py:121
def prepare_workflow(self)
Definition: DTWorkflow.py:88
#define str(s)
def add_local_custom_db(self)
Definition: DTWorkflow.py:184