CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
DTWorkflow.py
Go to the documentation of this file.
1 from __future__ import print_function
2 from __future__ import absolute_import
3 import os,sys
4 import glob
5 import logging
6 import argparse
7 import subprocess
8 import time, datetime
9 import urllib2
10 import json
11 
12 from . import tools
13 from .CLIHelper import CLIHelper
14 from .CrabHelper import CrabHelper
15 import FWCore.ParameterSet.Config as cms
16 log = logging.getLogger(__name__)
17 
19  """ This is the base class for all DTWorkflows and contains some
20  common tasks """
21  def __init__(self, options):
22  self.options = options
23  super( DTWorkflow, self ).__init__()
24  self.digilabel = "muonDTDigis"
25  # dict to hold required variables. Can not be marked in argparse to allow
26  # loading of options from config
29  self.fill_required_options_dict()
30  self.fill_required_options_prepare_dict()
31  # These variables are determined in the derived classes
32  self.pset_name = ""
34  self.output_files = []
35  self.input_files = []
36 
37  self.run_all_command = False
38  self.files_reveived = False
39  self._user = ""
40  # change to working directory
41  os.chdir(self.options.working_dir)
42 
43  def check_missing_options(self, requirements_dict):
44  missing_options = []
45  # check if all required options exist
46  if self.options.command in requirements_dict:
47  for option in requirements_dict[self.options.command]:
48  if not (hasattr(self.options, option)
49  and ( (getattr(self.options,option))
50  or isinstance(getattr(self.options,option), bool) )):
51  missing_options.append(option)
52  if len(missing_options) > 0:
53  err = "The following CLI options are missing"
54  err += " for command %s: " % self.options.command
55  err += " ".join(missing_options)
56  raise ValueError(err)
57 
58  def run(self):
59  """ Generalized function to run workflow command"""
60  msg = "Preparing %s workflow" % self.options.workflow
61  if hasattr(self.options, "command"):
62  msg += " for command %s" % self.options.command
63  log.info(msg)
64  if self.options.config_path:
65  self.load_options( self.options.config_path )
66  #check if all options to prepare the command are used
68  self.prepare_workflow()
69  # create output folder if they do not exist yet
70  if not os.path.exists( self.local_path ):
71  os.makedirs(self.local_path)
72  # dump used options
73  self.dump_options()
74  #check if all options to run the command are used
76  try:
77  run_function = getattr(self, self.options.command)
78  except AttributeError:
79  errmsg = "Class `{}` does not implement `{}` for workflow %s" % self.options.workflow
80  if hasattr(self.options, "workflow_mode"):
81  errmsg += "and workflow mode %s" % self.options.workflow_mode
82  raise NotImplementedError( errmsg.format(self.__class__.__name__,
83  self.options.command))
84  log.debug("Running command %s" % self.options.command)
85  # call chosen function
86  run_function()
87 
88  def prepare_workflow(self):
89  """ Abstract implementation of prepare workflow function"""
90  errmsg = "Class `{}` does not implement `{}`"
91  raise NotImplementedError( errmsg.format(self.__class__.__name__,
92  "prepare_workflow"))
93 
94  def all(self):
95  """ generalized function to perform several workflow mode commands in chain.
96  All commands mus be specified in self.all_commands list in workflow mode specific
97  prepare function in child workflow objects.
98  """
99  self.run_all_command = True
100  for command in self.all_commands:
101  self.options.command = command
102  self.run()
103 
104  def submit(self):
105  self.submit_crab_task()
106 
107  def check(self):
108  """ Function to check status of submitted tasks """
109  self.check_crabtask()
110 
111  def write(self):
112  self.runCMSSWtask()
113 
114  def dump(self):
115  self.runCMSSWtask()
116 
117  def correction(self):
118  self.runCMSSWtask()
119 
120  def add_preselection(self):
121  """ Add preselection to the process object stored in workflow_object"""
122  if not hasattr(self, "process"):
123  raise NameError("Process is not initalized in workflow object")
124  pathsequence = self.options.preselection.split(':')[0]
125  seqname = self.options.preselection.split(':')[1]
126  self.process.load(pathsequence)
127  tools.prependPaths(self.process, seqname)
128 
129  def add_raw_option(self):
130  getattr(self.process, self.digilabel).inputLabel = 'rawDataCollector'
132 
133  def add_local_t0_db(self, local=False):
134  """ Add a local t0 database as input. Use the option local is used
135  if the pset is processed locally and not with crab.
136  """
137  if local:
138  connect = os.path.abspath(self.options.inputT0DB)
139  else:
140  connect = os.path.basename(self.options.inputT0DB)
141  self.addPoolDBESSource( process = self.process,
142  moduleName = 't0DB',
143  record = 'DTT0Rcd',
144  tag = 't0',
145  connect = 'sqlite_file:%s' % connect)
146  self.input_files.append(os.path.abspath(self.options.inputT0DB))
147 
148  def add_local_vdrift_db(self, local=False):
149  """ Add a local vdrift database as input. Use the option local is used
150  if the pset is processed locally and not with crab.
151  """
152  if local:
153  connect = os.path.abspath(self.options.inputVDriftDB)
154  else:
155  connect = os.path.basename(self.options.inputVDriftDB)
156  self.addPoolDBESSource( process = self.process,
157  moduleName = 'vDriftDB',
158  record = 'DTMtimeRcd',
159  tag = 'vDrift',
160  connect = 'sqlite_file:%s' % connect)
161  self.input_files.append( os.path.abspath(self.options.inputVDriftDB) )
162 
163  def add_local_calib_db(self, local=False):
164  """ Add a local calib database as input. Use the option local is used
165  if the pset is processed locally and not with crab.
166  """
167  label = ''
168  if self.options.datasettype == "Cosmics":
169  label = 'cosmics'
170  if local:
171  connect = os.path.abspath(self.options.inputCalibDB)
172  else:
173  connect = os.path.basename(self.options.inputCalibDB)
174  self.addPoolDBESSource( process = self.process,
175  moduleName = 'calibDB',
176  record = 'DTTtrigRcd',
177  tag = 'ttrig',
178  connect = str("sqlite_file:%s" % connect),
179  label = label
180  )
181  self.input_files.append( os.path.abspath(self.options.inputCalibDB) )
182 
184  for option in ('inputDBRcd', 'connectStrDBTag'):
185  if hasattr(self.options, option) and not getattr(self.options, option):
186  raise ValueError("Option %s needed for custom input db" % option)
187  self.addPoolDBESSource( process = self.process,
188  record = self.options.inputDBRcd,
189  tag = self.options.inputDBTag,
190  connect = self.options.connectStrDBTag,
191  moduleName = 'customDB%s' % self.options.inputDBRcd
192  )
193 
195  """ Common operations used in most prepare_[workflow_mode]_submit functions"""
196  if not self.options.run:
197  raise ValueError("Option run is required for submission!")
198  if hasattr(self.options, "inputT0DB") and self.options.inputT0DB:
199  self.add_local_t0_db()
200 
201  if hasattr(self.options, "inputVDriftDB") and self.options.inputVDriftDB:
202  self.add_local_vdrift_db()
203 
204  if hasattr(self.options, "inputDBTag") and self.options.inputDBTag:
205  self.add_local_custom_db()
206 
207  if self.options.run_on_RAW:
208  self.add_raw_option()
209  if self.options.preselection:
210  self.add_preselection()
211 
212  def prepare_common_write(self, do_hadd=True):
213  """ Common operations used in most prepare_[workflow_mode]_erite functions"""
214  self.load_options_command("submit")
215  output_path = os.path.join( self.local_path, "unmerged_results" )
216  merged_file = os.path.join(self.result_path, self.output_file)
217  crabtask = self.crabFunctions.CrabTask(crab_config = self.crab_config_filepath,
218  initUpdate = False)
219  if not (self.options.skip_stageout or self.files_reveived or self.options.no_exec):
220  self.get_output_files(crabtask, output_path)
221  log.info("Received files from storage element")
222  log.info("Using hadd to merge output files")
223  if not self.options.no_exec and do_hadd:
224  returncode = tools.haddLocal(output_path, merged_file)
225  if returncode != 0:
226  raise RuntimeError("Failed to merge files with hadd")
227  return crabtask.crabConfig.Data.outputDatasetTag
228 
229  def prepare_common_dump(self, db_path):
230  self.process = tools.loadCmsProcess(self.pset_template)
231  self.process.calibDB.connect = 'sqlite_file:%s' % db_path
232  try:
233  path = self.result_path
234  except:
235  path = os.getcwd()
236  print("path", path)
237  out_path = os.path.abspath(os.path.join(path,
238  os.path.splitext(db_path)[0] + ".txt"))
239 
240  self.process.dumpToFile.outputFileName = out_path
241 
242  @staticmethod
243  def addPoolDBESSource( process,
244  moduleName,
245  record,
246  tag,
247  connect='sqlite_file:',
248  label='',):
249 
250  from CondCore.CondDB.CondDB_cfi import CondDB
251 
252  calibDB = cms.ESSource("PoolDBESSource",
253  CondDB,
254  timetype = cms.string('runnumber'),
255  toGet = cms.VPSet(cms.PSet(
256  record = cms.string(record),
257  tag = cms.string(tag),
258  label = cms.untracked.string(label)
259  )),
260  )
261  calibDB.connect = cms.string( str(connect) )
262  #if authPath: calibDB.DBParameters.authenticationPath = authPath
263  if 'oracle:' in connect:
264  calibDB.DBParameters.authenticationPath = '/afs/cern.ch/cms/DB/conddb'
265  setattr(process,moduleName,calibDB)
266  setattr(process,"es_prefer_" + moduleName,cms.ESPrefer('PoolDBESSource',
267  moduleName)
268  )
269 
270  def get_output_files(self, crabtask, output_path):
271  self.crab.callCrabCommand( ["getoutput",
272  "--outputpath",
273  output_path,
274  crabtask.crabFolder ] )
275 
276  def runCMSSWtask(self, pset_path=""):
277  """ Run a cmsRun job locally. The member variable self.pset_path is used
278  if pset_path argument is not given"""
279  if self.options.no_exec:
280  return 0
281  process = subprocess.Popen( "cmsRun %s" % self.pset_path,
282  stdout=subprocess.PIPE,
283  stderr=subprocess.STDOUT,
284  shell = True)
285  stdout = process.communicate()[0]
286  log.info(stdout)
287  if process.returncode != 0:
288  raise RuntimeError("Failed to use cmsRun for pset %s" % self.pset_name)
289  return process.returncode
290 
291  @property
292  def remote_out_path(self):
293  """ Output path on remote excluding user base path
294  Returns a dict if crab is used due to crab path setting policy"""
295  if self.options.command =="submit":
296  return {
297  "outLFNDirBase" : os.path.join( "/store",
298  "user",
299  self.user,
300  'DTCalibration/',
301  self.outpath_command_tag,
303  "outputDatasetTag" : self.tag
304  }
305  else:
306  return os.path.join( 'DTCalibration/',
307  datasetstr,
308  'Run' + str(self.options.run),
309  self.outpath_command_tag,
311  'v' + str(self.options.trial),
312  )
313  @property
315  if not self.options.workflow_mode in self.outpath_workflow_mode_dict:
316  raise NotImplementedError("%s missing in outpath_workflow_mode_dict" % self.options.workflow_mode)
317  return self.outpath_workflow_mode_dict[self.options.workflow_mode]
318 
319  @property
320  def tag(self):
321  return 'Run' + str(self.options.run) + '_v' + str(self.options.trial)
322 
323  @property
324  def user(self):
325  if self._user:
326  return self._user
327  if hasattr(self.options, "user") and self.options.user:
328  self._user = self.options.user
329  else:
330  self._user = self.crab.checkusername()
331  return self._user
332 
333  @property
334  def local_path(self):
335  """ Output path on local machine """
336  if self.options.run and self.options.label:
337  prefix = "Run%d-%s_v%d" % ( self.options.run,
338  self.options.label,
339  self.options.trial)
340  else:
341  prefix = ""
343  path = os.path.join( self.options.working_dir,
344  prefix,
346  else:
347  path = os.path.join( self.options.working_dir,
348  prefix,
349  self.outpath_command_tag )
350  return path
351 
352  @property
353  def result_path(self):
354  result_path = os.path.abspath(os.path.join(self.local_path,"results"))
355  if not os.path.exists(result_path):
356  os.makedirs(result_path)
357  return result_path
358 
359  @property
361  """ Base path to folder containing pset files for cmsRun"""
362  return os.path.expandvars(os.path.join("$CMSSW_BASE",
363  "src",
364  "CalibMuon",
365  "test",
366  )
367  )
368 
369  @property
370  def pset_path(self):
371  """ full path to the pset file """
372  basepath = os.path.join( self.local_path, "psets")
373  if not os.path.exists( basepath ):
374  os.makedirs( basepath )
375  return os.path.join( basepath, self.pset_name )
376 
377  def write_pset_file(self):
378  if not hasattr(self, "process"):
379  raise NameError("Process is not initalized in workflow object")
380  if not os.path.exists(self.local_path):
381  os.makedirs(self.local_path)
382  with open( self.pset_path,'w') as pfile:
383  pfile.write(self.process.dumpPython())
384 
385  def get_config_name(self, command= ""):
386  """ Create the name for the output json file which will be dumped"""
387  if not command:
388  command = self.options.command
389  return "config_" + command + ".json"
390 
391  def dump_options(self):
392  with open(os.path.join(self.local_path, self.get_config_name()),"w") as out_file:
393  json.dump(vars(self.options), out_file, indent=4)
394 
395  def load_options(self, config_file_path):
396  if not os.path.exists(config_file_path):
397  raise IOError("File %s not found" % config_file_path)
398  with open(config_file_path, "r") as input_file:
399  config_json = json.load(input_file)
400  for key, val in config_json.items():
401  if not hasattr(self.options, key) or not getattr(self.options, key):
402  setattr(self.options, key, val)
403 
404  def load_options_command(self, command ):
405  """Load options for previous command in workflow """
406  if not self.options.config_path:
407  if not self.options.run:
408  raise RuntimeError("Option run is required if no config path specified")
409  if not os.path.exists(self.local_path):
410  raise IOError("Local path %s does not exist" % self.local_path)
411  self.options.config_path = os.path.join(self.local_path,
412  self.get_config_name(command))
413  self.load_options( self.options.config_path )
414 
def loadCmsProcess
Definition: tools.py:56
def haddLocal
Definition: tools.py:41
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
static std::string join(char **cmd)
Definition: RemoteFile.cc:19
def prependPaths
Definition: tools.py:66
#define str(s)