CMS 3D CMS Logo

DTWorkflow.py
Go to the documentation of this file.
1 from __future__ import print_function
2 import os,sys
3 import glob
4 import logging
5 import argparse
6 import subprocess
7 import time, datetime
8 import urllib2
9 import json
10 
11 import tools
12 from CLIHelper import CLIHelper
13 from CrabHelper import CrabHelper
14 import FWCore.ParameterSet.Config as cms
15 log = logging.getLogger(__name__)
16 
18  """ This is the base class for all DTWorkflows and contains some
19  common tasks """
20  def __init__(self, options):
21  self.options = options
22  super( DTWorkflow, self ).__init__()
23  self.digilabel = "muonDTDigis"
24  # dict to hold required variables. Can not be marked in argparse to allow
25  # loading of options from config
30  # These variables are determined in the derived classes
31  self.pset_name = ""
33  self.output_files = []
34  self.input_files = []
35 
36  self.run_all_command = False
37  self.files_reveived = False
38  self._user = ""
39  # change to working directory
40  os.chdir(self.options.working_dir)
41 
42  def check_missing_options(self, requirements_dict):
43  missing_options = []
44  # check if all required options exist
45  if self.options.command in requirements_dict:
46  for option in requirements_dict[self.options.command]:
47  if not (hasattr(self.options, option)
48  and ( (getattr(self.options,option))
49  or isinstance(getattr(self.options,option), bool) )):
50  missing_options.append(option)
51  if len(missing_options) > 0:
52  err = "The following CLI options are missing"
53  err += " for command %s: " % self.options.command
54  err += " ".join(missing_options)
55  raise ValueError(err)
56 
57  def run(self):
58  """ Generalized function to run workflow command"""
59  msg = "Preparing %s workflow" % self.options.workflow
60  if hasattr(self.options, "command"):
61  msg += " for command %s" % self.options.command
62  log.info(msg)
63  if self.options.config_path:
64  self.load_options( self.options.config_path )
65  #check if all options to prepare the command are used
67  self.prepare_workflow()
68  # create output folder if they do not exist yet
69  if not os.path.exists( self.local_path ):
70  os.makedirs(self.local_path)
71  # dump used options
72  self.dump_options()
73  #check if all options to run the command are used
75  try:
76  run_function = getattr(self, self.options.command)
77  except AttributeError:
78  errmsg = "Class `{}` does not implement `{}` for workflow %s" % self.options.workflow
79  if hasattr(self.options, "workflow_mode"):
80  errmsg += "and workflow mode %s" % self.options.workflow_mode
81  raise NotImplementedError( errmsg.format(self.__class__.__name__,
82  self.options.command))
83  log.debug("Running command %s" % self.options.command)
84  # call chosen function
85  run_function()
86 
87  def prepare_workflow(self):
88  """ Abstract implementation of prepare workflow function"""
89  errmsg = "Class `{}` does not implement `{}`"
90  raise NotImplementedError( errmsg.format(self.__class__.__name__,
91  "prepare_workflow"))
92 
93  def all(self):
94  """ generalized function to perform several workflow mode commands in chain.
95  All commands mus be specified in self.all_commands list in workflow mode specific
96  prepare function in child workflow objects.
97  """
98  self.run_all_command = True
99  for command in self.all_commands:
100  self.options.command = command
101  self.run()
102 
103  def submit(self):
104  self.submit_crab_task()
105 
106  def check(self):
107  """ Function to check status of submitted tasks """
108  self.check_crabtask()
109 
110  def write(self):
111  self.runCMSSWtask()
112 
113  def dump(self):
114  self.runCMSSWtask()
115 
116  def correction(self):
117  self.runCMSSWtask()
118 
119  def add_preselection(self):
120  """ Add preselection to the process object stored in workflow_object"""
121  if not hasattr(self, "process"):
122  raise NameError("Process is not initalized in workflow object")
123  pathsequence = self.options.preselection.split(':')[0]
124  seqname = self.options.preselection.split(':')[1]
125  self.process.load(pathsequence)
126  tools.prependPaths(self.process, seqname)
127 
128  def add_raw_option(self):
129  getattr(self.process, self.digilabel).inputLabel = 'rawDataCollector'
131 
132  def add_local_t0_db(self, local=False):
133  """ Add a local t0 database as input. Use the option local is used
134  if the pset is processed locally and not with crab.
135  """
136  if local:
137  connect = os.path.abspath(self.options.inputT0DB)
138  else:
139  connect = os.path.basename(self.options.inputT0DB)
140  self.addPoolDBESSource( process = self.process,
141  moduleName = 't0DB',
142  record = 'DTT0Rcd',
143  tag = 't0',
144  connect = 'sqlite_file:%s' % connect)
145  self.input_files.append(os.path.abspath(self.options.inputT0DB))
146 
147  def add_local_vdrift_db(self, local=False):
148  """ Add a local vdrift database as input. Use the option local is used
149  if the pset is processed locally and not with crab.
150  """
151  if local:
152  connect = os.path.abspath(self.options.inputVDriftDB)
153  else:
154  connect = os.path.basename(self.options.inputVDriftDB)
155  self.addPoolDBESSource( process = self.process,
156  moduleName = 'vDriftDB',
157  record = 'DTMtimeRcd',
158  tag = 'vDrift',
159  connect = 'sqlite_file:%s' % connect)
160  self.input_files.append( os.path.abspath(self.options.inputVDriftDB) )
161 
162  def add_local_calib_db(self, local=False):
163  """ Add a local calib database as input. Use the option local is used
164  if the pset is processed locally and not with crab.
165  """
166  label = ''
167  if self.options.datasettype == "Cosmics":
168  label = 'cosmics'
169  if local:
170  connect = os.path.abspath(self.options.inputCalibDB)
171  else:
172  connect = os.path.basename(self.options.inputCalibDB)
173  self.addPoolDBESSource( process = self.process,
174  moduleName = 'calibDB',
175  record = 'DTTtrigRcd',
176  tag = 'ttrig',
177  connect = str("sqlite_file:%s" % connect),
178  label = label
179  )
180  self.input_files.append( os.path.abspath(self.options.inputCalibDB) )
181 
183  for option in ('inputDBRcd', 'connectStrDBTag'):
184  if hasattr(self.options, option) and not getattr(self.options, option):
185  raise ValueError("Option %s needed for custom input db" % option)
186  self.addPoolDBESSource( process = self.process,
187  record = self.options.inputDBRcd,
188  tag = self.options.inputDBTag,
189  connect = self.options.connectStrDBTag,
190  moduleName = 'customDB%s' % self.options.inputDBRcd
191  )
192 
194  """ Common operations used in most prepare_[workflow_mode]_submit functions"""
195  if not self.options.run:
196  raise ValueError("Option run is required for submission!")
197  if hasattr(self.options, "inputT0DB") and self.options.inputT0DB:
198  self.add_local_t0_db()
199 
200  if hasattr(self.options, "inputVDriftDB") and self.options.inputVDriftDB:
201  self.add_local_vdrift_db()
202 
203  if hasattr(self.options, "inputDBTag") and self.options.inputDBTag:
204  self.add_local_custom_db()
205 
206  if self.options.run_on_RAW:
207  self.add_raw_option()
208  if self.options.preselection:
209  self.add_preselection()
210 
211  def prepare_common_write(self, do_hadd=True):
212  """ Common operations used in most prepare_[workflow_mode]_erite functions"""
213  self.load_options_command("submit")
214  output_path = os.path.join( self.local_path, "unmerged_results" )
215  merged_file = os.path.join(self.result_path, self.output_file)
216  crabtask = self.crabFunctions.CrabTask(crab_config = self.crab_config_filepath,
217  initUpdate = False)
218  if not (self.options.skip_stageout or self.files_reveived or self.options.no_exec):
219  self.get_output_files(crabtask, output_path)
220  log.info("Received files from storage element")
221  log.info("Using hadd to merge output files")
222  if not self.options.no_exec and do_hadd:
223  returncode = tools.haddLocal(output_path, merged_file)
224  if returncode != 0:
225  raise RuntimeError("Failed to merge files with hadd")
226  return crabtask.crabConfig.Data.outputDatasetTag
227 
228  def prepare_common_dump(self, db_path):
229  self.process = tools.loadCmsProcess(self.pset_template)
230  self.process.calibDB.connect = 'sqlite_file:%s' % db_path
231  try:
232  path = self.result_path
233  except:
234  path = os.getcwd()
235  print("path", path)
236  out_path = os.path.abspath(os.path.join(path,
237  os.path.splitext(db_path)[0] + ".txt"))
238 
239  self.process.dumpToFile.outputFileName = out_path
240 
241  @staticmethod
242  def addPoolDBESSource( process,
243  moduleName,
244  record,
245  tag,
246  connect='sqlite_file:',
247  label='',):
248 
249  from CondCore.CondDB.CondDB_cfi import CondDB
250 
251  calibDB = cms.ESSource("PoolDBESSource",
252  CondDB,
253  timetype = cms.string('runnumber'),
254  toGet = cms.VPSet(cms.PSet(
255  record = cms.string(record),
256  tag = cms.string(tag),
257  label = cms.untracked.string(label)
258  )),
259  )
260  calibDB.connect = cms.string( str(connect) )
261  #if authPath: calibDB.DBParameters.authenticationPath = authPath
262  if 'oracle:' in connect:
263  calibDB.DBParameters.authenticationPath = '/afs/cern.ch/cms/DB/conddb'
264  setattr(process,moduleName,calibDB)
265  setattr(process,"es_prefer_" + moduleName,cms.ESPrefer('PoolDBESSource',
266  moduleName)
267  )
268 
269  def get_output_files(self, crabtask, output_path):
270  self.crab.callCrabCommand( ["getoutput",
271  "--outputpath",
272  output_path,
273  crabtask.crabFolder ] )
274 
275  def runCMSSWtask(self, pset_path=""):
276  """ Run a cmsRun job locally. The member variable self.pset_path is used
277  if pset_path argument is not given"""
278  if self.options.no_exec:
279  return 0
280  process = subprocess.Popen( "cmsRun %s" % self.pset_path,
281  stdout=subprocess.PIPE,
282  stderr=subprocess.STDOUT,
283  shell = True)
284  stdout = process.communicate()[0]
285  log.info(stdout)
286  if process.returncode != 0:
287  raise RuntimeError("Failed to use cmsRun for pset %s" % self.pset_name)
288  return process.returncode
289 
290  @property
291  def remote_out_path(self):
292  """ Output path on remote excluding user base path
293  Returns a dict if crab is used due to crab path setting policy"""
294  if self.options.command =="submit":
295  return {
296  "outLFNDirBase" : os.path.join( "/store",
297  "user",
298  self.user,
299  'DTCalibration/',
300  self.outpath_command_tag,
302  "outputDatasetTag" : self.tag
303  }
304  else:
305  return os.path.join( 'DTCalibration/',
306  datasetstr,
307  'Run' + str(self.options.run),
308  self.outpath_command_tag,
310  'v' + str(self.options.trial),
311  )
312  @property
314  if not self.options.workflow_mode in self.outpath_workflow_mode_dict:
315  raise NotImplementedError("%s missing in outpath_workflow_mode_dict" % self.options.workflow_mode)
316  return self.outpath_workflow_mode_dict[self.options.workflow_mode]
317 
318  @property
319  def tag(self):
320  return 'Run' + str(self.options.run) + '_v' + str(self.options.trial)
321 
322  @property
323  def user(self):
324  if self._user:
325  return self._user
326  if hasattr(self.options, "user") and self.options.user:
327  self._user = self.options.user
328  else:
329  self._user = self.crab.checkusername()
330  return self._user
331 
332  @property
333  def local_path(self):
334  """ Output path on local machine """
335  if self.options.run and self.options.label:
336  prefix = "Run%d-%s_v%d" % ( self.options.run,
337  self.options.label,
338  self.options.trial)
339  else:
340  prefix = ""
342  path = os.path.join( self.options.working_dir,
343  prefix,
345  else:
346  path = os.path.join( self.options.working_dir,
347  prefix,
348  self.outpath_command_tag )
349  return path
350 
351  @property
352  def result_path(self):
353  result_path = os.path.abspath(os.path.join(self.local_path,"results"))
354  if not os.path.exists(result_path):
355  os.makedirs(result_path)
356  return result_path
357 
358  @property
360  """ Base path to folder containing pset files for cmsRun"""
361  return os.path.expandvars(os.path.join("$CMSSW_BASE",
362  "src",
363  "CalibMuon",
364  "test",
365  )
366  )
367 
368  @property
369  def pset_path(self):
370  """ full path to the pset file """
371  basepath = os.path.join( self.local_path, "psets")
372  if not os.path.exists( basepath ):
373  os.makedirs( basepath )
374  return os.path.join( basepath, self.pset_name )
375 
376  def write_pset_file(self):
377  if not hasattr(self, "process"):
378  raise NameError("Process is not initalized in workflow object")
379  if not os.path.exists(self.local_path):
380  os.makedirs(self.local_path)
381  with open( self.pset_path,'w') as pfile:
382  pfile.write(self.process.dumpPython())
383 
384  def get_config_name(self, command= ""):
385  """ Create the name for the output json file which will be dumped"""
386  if not command:
387  command = self.options.command
388  return "config_" + command + ".json"
389 
390  def dump_options(self):
391  with open(os.path.join(self.local_path, self.get_config_name()),"w") as out_file:
392  json.dump(vars(self.options), out_file, indent=4)
393 
394  def load_options(self, config_file_path):
395  if not os.path.exists(config_file_path):
396  raise IOError("File %s not found" % config_file_path)
397  with open(config_file_path, "r") as input_file:
398  config_json = json.load(input_file)
399  for key, val in config_json.items():
400  if not hasattr(self.options, key) or not getattr(self.options, key):
401  setattr(self.options, key, val)
402 
403  def load_options_command(self, command ):
404  """Load options for previous command in workflow """
405  if not self.options.config_path:
406  if not self.options.run:
407  raise RuntimeError("Option run is required if no config path specified")
408  if not os.path.exists(self.local_path):
409  raise IOError("Local path %s does not exist" % self.local_path)
410  self.options.config_path = os.path.join(self.local_path,
411  self.get_config_name(command))
412  self.load_options( self.options.config_path )
413 
def remote_out_path(self)
Definition: DTWorkflow.py:291
def add_local_vdrift_db(self, local=False)
Definition: DTWorkflow.py:147
def addPoolDBESSource(process, moduleName, record, tag, connect='sqlite_file:', label='')
Definition: DTWorkflow.py:247
def add_local_t0_db(self, local=False)
Definition: DTWorkflow.py:132
def get_config_name(self, command="")
Definition: DTWorkflow.py:384
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:65
def add_local_calib_db(self, local=False)
Definition: DTWorkflow.py:162
def load_options_command(self, command)
Definition: DTWorkflow.py:403
def outpath_workflow_mode_tag(self)
Definition: DTWorkflow.py:313
def prepare_common_write(self, do_hadd=True)
Definition: DTWorkflow.py:211
def haddLocal(localdir, result_file, extension='root')
Definition: tools.py:41
def fill_required_options_dict(self)
Definition: CLIHelper.py:39
def get_output_files(self, crabtask, output_path)
Definition: DTWorkflow.py:269
def check_missing_options(self, requirements_dict)
Definition: DTWorkflow.py:42
def crab_config_filepath(self)
Definition: CrabHelper.py:225
def prependPaths(process, seqname)
Definition: tools.py:66
def load_options(self, config_file_path)
Definition: DTWorkflow.py:394
def loadCmsProcess(psetPath)
Definition: tools.py:56
def submit_crab_task(self)
Definition: CrabHelper.py:21
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
def __init__(self, options)
Definition: DTWorkflow.py:20
def add_raw_option(self)
Definition: DTWorkflow.py:128
def fill_required_options_prepare_dict(self)
Definition: CLIHelper.py:35
def runCMSSWtask(self, pset_path="")
Definition: DTWorkflow.py:275
def prepare_common_submit(self)
Definition: DTWorkflow.py:193
def prepare_common_dump(self, db_path)
Definition: DTWorkflow.py:228
def dump_options(self)
Definition: DTWorkflow.py:390
def check_crabtask(self)
Definition: CrabHelper.py:45
def pset_template_base_bath(self)
Definition: DTWorkflow.py:359
def write_pset_file(self)
Definition: DTWorkflow.py:376
def add_preselection(self)
Definition: DTWorkflow.py:119
def prepare_workflow(self)
Definition: DTWorkflow.py:87
#define str(s)
def add_local_custom_db(self)
Definition: DTWorkflow.py:182