CMS 3D CMS Logo

DTWorkflow.py
Go to the documentation of this file.
1 import os,sys
2 import glob
3 import logging
4 import argparse
5 import subprocess
6 import time, datetime
7 import urllib2
8 import json
9 
10 import tools
11 from CLIHelper import CLIHelper
12 from CrabHelper import CrabHelper
13 import FWCore.ParameterSet.Config as cms
14 log = logging.getLogger(__name__)
15 
17  """ This is the base class for all DTWorkflows and contains some
18  common tasks """
19  def __init__(self, options):
20  self.options = options
21  super( DTWorkflow, self ).__init__()
22  self.digilabel = "muonDTDigis"
23  # dict to hold required variables. Can not be marked in argparse to allow
24  # loading of options from config
29  # These variables are determined in the derived classes
30  self.pset_name = ""
32  self.output_files = []
33  self.input_files = []
34 
35  self.run_all_command = False
36  self.files_reveived = False
37  self._user = ""
38  # change to working directory
39  os.chdir(self.options.working_dir)
40 
41  def check_missing_options(self, requirements_dict):
42  missing_options = []
43  # check if all required options exist
44  if self.options.command in requirements_dict:
45  for option in requirements_dict[self.options.command]:
46  if not (hasattr(self.options, option)
47  and ( (getattr(self.options,option))
48  or isinstance(getattr(self.options,option), bool) )):
49  missing_options.append(option)
50  if len(missing_options) > 0:
51  err = "The following CLI options are missing"
52  err += " for command %s: " % self.options.command
53  err += " ".join(missing_options)
54  raise ValueError(err)
55 
56  def run(self):
57  """ Generalized function to run workflow command"""
58  msg = "Preparing %s workflow" % self.options.workflow
59  if hasattr(self.options, "command"):
60  msg += " for command %s" % self.options.command
61  log.info(msg)
62  if self.options.config_path:
63  self.load_options( self.options.config_path )
64  #check if all options to prepare the command are used
66  self.prepare_workflow()
67  # create output folder if they do not exist yet
68  if not os.path.exists( self.local_path ):
69  os.makedirs(self.local_path)
70  # dump used options
71  self.dump_options()
72  #check if all options to run the command are used
74  try:
75  run_function = getattr(self, self.options.command)
76  except AttributeError:
77  errmsg = "Class `{}` does not implement `{}` for workflow %s" % self.options.workflow
78  if hasattr(self.options, "workflow_mode"):
79  errmsg += "and workflow mode %s" % self.options.workflow_mode
80  raise NotImplementedError( errmsg.format(self.__class__.__name__,
81  self.options.command))
82  log.debug("Running command %s" % self.options.command)
83  # call chosen function
84  run_function()
85 
86  def prepare_workflow(self):
87  """ Abstract implementation of prepare workflow function"""
88  errmsg = "Class `{}` does not implement `{}`"
89  raise NotImplementedError( errmsg.format(self.__class__.__name__,
90  "prepare_workflow"))
91 
92  def all(self):
93  """ generalized function to perform several workflow mode commands in chain.
94  All commands mus be specified in self.all_commands list in workflow mode specific
95  prepare function in child workflow objects.
96  """
97  self.run_all_command = True
98  for command in self.all_commands:
99  self.options.command = command
100  self.run()
101 
102  def submit(self):
103  self.submit_crab_task()
104 
105  def check(self):
106  """ Function to check status of submitted tasks """
107  self.check_crabtask()
108 
109  def write(self):
110  self.runCMSSWtask()
111 
112  def dump(self):
113  self.runCMSSWtask()
114 
115  def correction(self):
116  self.runCMSSWtask()
117 
118  def add_preselection(self):
119  """ Add preselection to the process object stored in workflow_object"""
120  if not hasattr(self, "process"):
121  raise NameError("Process is not initalized in workflow object")
122  pathsequence = self.options.preselection.split(':')[0]
123  seqname = self.options.preselection.split(':')[1]
124  self.process.load(pathsequence)
125  tools.prependPaths(self.process, seqname)
126 
127  def add_raw_option(self):
128  getattr(self.process, self.digilabel).inputLabel = 'rawDataCollector'
130 
131  def add_local_t0_db(self, local=False):
132  """ Add a local t0 database as input. Use the option local is used
133  if the pset is processed locally and not with crab.
134  """
135  if local:
136  connect = os.path.abspath(self.options.inputT0DB)
137  else:
138  connect = os.path.basename(self.options.inputT0DB)
139  self.addPoolDBESSource( process = self.process,
140  moduleName = 't0DB',
141  record = 'DTT0Rcd',
142  tag = 't0',
143  connect = 'sqlite_file:%s' % connect)
144  self.input_files.append(os.path.abspath(self.options.inputT0DB))
145 
146  def add_local_vdrift_db(self, local=False):
147  """ Add a local vdrift database as input. Use the option local is used
148  if the pset is processed locally and not with crab.
149  """
150  if local:
151  connect = os.path.abspath(self.options.inputVDriftDB)
152  else:
153  connect = os.path.basename(self.options.inputVDriftDB)
154  self.addPoolDBESSource( process = self.process,
155  moduleName = 'vDriftDB',
156  record = 'DTMtimeRcd',
157  tag = 'vDrift',
158  connect = 'sqlite_file:%s' % connect)
159  self.input_files.append( os.path.abspath(self.options.inputVDriftDB) )
160 
161  def add_local_calib_db(self, local=False):
162  """ Add a local calib database as input. Use the option local is used
163  if the pset is processed locally and not with crab.
164  """
165  label = ''
166  if self.options.datasettype == "Cosmics":
167  label = 'cosmics'
168  if local:
169  connect = os.path.abspath(self.options.inputCalibDB)
170  else:
171  connect = os.path.basename(self.options.inputCalibDB)
172  self.addPoolDBESSource( process = self.process,
173  moduleName = 'calibDB',
174  record = 'DTTtrigRcd',
175  tag = 'ttrig',
176  connect = str("sqlite_file:%s" % connect),
177  label = label
178  )
179  self.input_files.append( os.path.abspath(self.options.inputCalibDB) )
180 
182  for option in ('inputDBRcd', 'connectStrDBTag'):
183  if hasattr(self.options, option) and not getattr(self.options, option):
184  raise ValueError("Option %s needed for custom input db" % option)
185  self.addPoolDBESSource( process = self.process,
186  record = self.options.inputDBRcd,
187  tag = self.options.inputDBTag,
188  connect = self.options.connectStrDBTag,
189  moduleName = 'customDB%s' % self.options.inputDBRcd
190  )
191 
193  """ Common operations used in most prepare_[workflow_mode]_submit functions"""
194  if not self.options.run:
195  raise ValueError("Option run is required for submission!")
196  if hasattr(self.options, "inputT0DB") and self.options.inputT0DB:
197  self.add_local_t0_db()
198 
199  if hasattr(self.options, "inputVDriftDB") and self.options.inputVDriftDB:
200  self.add_local_vdrift_db()
201 
202  if hasattr(self.options, "inputDBTag") and self.options.inputDBTag:
203  self.add_local_custom_db()
204 
205  if self.options.run_on_RAW:
206  self.add_raw_option()
207  if self.options.preselection:
208  self.add_preselection()
209 
210  def prepare_common_write(self, do_hadd=True):
211  """ Common operations used in most prepare_[workflow_mode]_erite functions"""
212  self.load_options_command("submit")
213  output_path = os.path.join( self.local_path, "unmerged_results" )
214  merged_file = os.path.join(self.result_path, self.output_file)
215  crabtask = self.crabFunctions.CrabTask(crab_config = self.crab_config_filepath,
216  initUpdate = False)
217  if not (self.options.skip_stageout or self.files_reveived or self.options.no_exec):
218  self.get_output_files(crabtask, output_path)
219  log.info("Received files from storage element")
220  log.info("Using hadd to merge output files")
221  if not self.options.no_exec and do_hadd:
222  returncode = tools.haddLocal(output_path, merged_file)
223  if returncode != 0:
224  raise RuntimeError("Failed to merge files with hadd")
225  return crabtask.crabConfig.Data.outputDatasetTag
226 
227  def prepare_common_dump(self, db_path):
228  self.process = tools.loadCmsProcess(self.pset_template)
229  self.process.calibDB.connect = 'sqlite_file:%s' % db_path
230  try:
231  path = self.result_path
232  except:
233  path = os.getcwd()
234  print "path", path
235  out_path = os.path.abspath(os.path.join(path,
236  os.path.splitext(db_path)[0] + ".txt"))
237 
238  self.process.dumpToFile.outputFileName = out_path
239 
240  @staticmethod
241  def addPoolDBESSource( process,
242  moduleName,
243  record,
244  tag,
245  connect='sqlite_file:',
246  label='',):
247 
248  from CondCore.CondDB.CondDB_cfi import CondDB
249 
250  calibDB = cms.ESSource("PoolDBESSource",
251  CondDB,
252  timetype = cms.string('runnumber'),
253  toGet = cms.VPSet(cms.PSet(
254  record = cms.string(record),
255  tag = cms.string(tag),
256  label = cms.untracked.string(label)
257  )),
258  )
259  calibDB.connect = cms.string( str(connect) )
260  #if authPath: calibDB.DBParameters.authenticationPath = authPath
261  if 'oracle:' in connect:
262  calibDB.DBParameters.authenticationPath = '/afs/cern.ch/cms/DB/conddb'
263  setattr(process,moduleName,calibDB)
264  setattr(process,"es_prefer_" + moduleName,cms.ESPrefer('PoolDBESSource',
265  moduleName)
266  )
267 
268  def get_output_files(self, crabtask, output_path):
269  self.crab.callCrabCommand( ["getoutput",
270  "--outputpath",
271  output_path,
272  crabtask.crabFolder ] )
273 
274  def runCMSSWtask(self, pset_path=""):
275  """ Run a cmsRun job locally. The member variable self.pset_path is used
276  if pset_path argument is not given"""
277  if self.options.no_exec:
278  return 0
279  process = subprocess.Popen( "cmsRun %s" % self.pset_path,
280  stdout=subprocess.PIPE,
281  stderr=subprocess.STDOUT,
282  shell = True)
283  stdout = process.communicate()[0]
284  log.info(stdout)
285  if process.returncode != 0:
286  raise RuntimeError("Failed to use cmsRun for pset %s" % self.pset_name)
287  return process.returncode
288 
289  @property
290  def remote_out_path(self):
291  """ Output path on remote excluding user base path
292  Returns a dict if crab is used due to crab path setting policy"""
293  if self.options.command =="submit":
294  return {
295  "outLFNDirBase" : os.path.join( "/store",
296  "user",
297  self.user,
298  'DTCalibration/',
299  self.outpath_command_tag,
301  "outputDatasetTag" : self.tag
302  }
303  else:
304  return os.path.join( 'DTCalibration/',
305  datasetstr,
306  'Run' + str(self.options.run),
307  self.outpath_command_tag,
309  'v' + str(self.options.trial),
310  )
311  @property
313  if not self.options.workflow_mode in self.outpath_workflow_mode_dict:
314  raise NotImplementedError("%s missing in outpath_workflow_mode_dict" % self.options.workflow_mode)
315  return self.outpath_workflow_mode_dict[self.options.workflow_mode]
316 
317  @property
318  def tag(self):
319  return 'Run' + str(self.options.run) + '_v' + str(self.options.trial)
320 
321  @property
322  def user(self):
323  if self._user:
324  return self._user
325  if hasattr(self.options, "user") and self.options.user:
326  self._user = self.options.user
327  else:
328  self._user = self.crab.checkusername()
329  return self._user
330 
331  @property
332  def local_path(self):
333  """ Output path on local machine """
334  if self.options.run and self.options.label:
335  prefix = "Run%d-%s_v%d" % ( self.options.run,
336  self.options.label,
337  self.options.trial)
338  else:
339  prefix = ""
341  path = os.path.join( self.options.working_dir,
342  prefix,
344  else:
345  path = os.path.join( self.options.working_dir,
346  prefix,
347  self.outpath_command_tag )
348  return path
349 
350  @property
351  def result_path(self):
352  result_path = os.path.abspath(os.path.join(self.local_path,"results"))
353  if not os.path.exists(result_path):
354  os.makedirs(result_path)
355  return result_path
356 
357  @property
359  """ Base path to folder containing pset files for cmsRun"""
360  return os.path.expandvars(os.path.join("$CMSSW_BASE",
361  "src",
362  "CalibMuon",
363  "test",
364  )
365  )
366 
367  @property
368  def pset_path(self):
369  """ full path to the pset file """
370  basepath = os.path.join( self.local_path, "psets")
371  if not os.path.exists( basepath ):
372  os.makedirs( basepath )
373  return os.path.join( basepath, self.pset_name )
374 
375  def write_pset_file(self):
376  if not hasattr(self, "process"):
377  raise NameError("Process is not initalized in workflow object")
378  if not os.path.exists(self.local_path):
379  os.makedirs(self.local_path)
380  with open( self.pset_path,'w') as pfile:
381  pfile.write(self.process.dumpPython())
382 
383  def get_config_name(self, command= ""):
384  """ Create the name for the output json file which will be dumped"""
385  if not command:
386  command = self.options.command
387  return "config_" + command + ".json"
388 
389  def dump_options(self):
390  with open(os.path.join(self.local_path, self.get_config_name()),"w") as out_file:
391  json.dump(vars(self.options), out_file, indent=4)
392 
393  def load_options(self, config_file_path):
394  if not os.path.exists(config_file_path):
395  raise IOError("File %s not found" % config_file_path)
396  with open(config_file_path, "r") as input_file:
397  config_json = json.load(input_file)
398  for key, val in config_json.items():
399  if not hasattr(self.options, key) or not getattr(self.options, key):
400  setattr(self.options, key, val)
401 
402  def load_options_command(self, command ):
403  """Load options for previous command in workflow """
404  if not self.options.config_path:
405  if not self.options.run:
406  raise RuntimeError("Option run is required if no config path specified")
407  if not os.path.exists(self.local_path):
408  raise IOError("Local path %s does not exist" % self.local_path)
409  self.options.config_path = os.path.join(self.local_path,
410  self.get_config_name(command))
411  self.load_options( self.options.config_path )
412 
def remote_out_path(self)
Definition: DTWorkflow.py:290
def add_local_vdrift_db(self, local=False)
Definition: DTWorkflow.py:146
def addPoolDBESSource(process, moduleName, record, tag, connect='sqlite_file:', label='')
Definition: DTWorkflow.py:246
def add_local_t0_db(self, local=False)
Definition: DTWorkflow.py:131
def get_config_name(self, command="")
Definition: DTWorkflow.py:383
def add_local_calib_db(self, local=False)
Definition: DTWorkflow.py:161
def load_options_command(self, command)
Definition: DTWorkflow.py:402
def outpath_workflow_mode_tag(self)
Definition: DTWorkflow.py:312
def prepare_common_write(self, do_hadd=True)
Definition: DTWorkflow.py:210
def haddLocal(localdir, result_file, extension='root')
Definition: tools.py:40
def fill_required_options_dict(self)
Definition: CLIHelper.py:39
def get_output_files(self, crabtask, output_path)
Definition: DTWorkflow.py:268
def check_missing_options(self, requirements_dict)
Definition: DTWorkflow.py:41
def crab_config_filepath(self)
Definition: CrabHelper.py:224
def prependPaths(process, seqname)
Definition: tools.py:65
def load_options(self, config_file_path)
Definition: DTWorkflow.py:393
def loadCmsProcess(psetPath)
Definition: tools.py:55
def submit_crab_task(self)
Definition: CrabHelper.py:20
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
def __init__(self, options)
Definition: DTWorkflow.py:19
def add_raw_option(self)
Definition: DTWorkflow.py:127
def fill_required_options_prepare_dict(self)
Definition: CLIHelper.py:35
def runCMSSWtask(self, pset_path="")
Definition: DTWorkflow.py:274
def prepare_common_submit(self)
Definition: DTWorkflow.py:192
def prepare_common_dump(self, db_path)
Definition: DTWorkflow.py:227
def dump_options(self)
Definition: DTWorkflow.py:389
def check_crabtask(self)
Definition: CrabHelper.py:44
def pset_template_base_bath(self)
Definition: DTWorkflow.py:358
def write_pset_file(self)
Definition: DTWorkflow.py:375
def add_preselection(self)
Definition: DTWorkflow.py:118
def prepare_workflow(self)
Definition: DTWorkflow.py:86
#define str(s)
def add_local_custom_db(self)
Definition: DTWorkflow.py:181