CMS 3D CMS Logo

submitPVValidationJobs.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 
3 '''Script that submits CMS Tracker Alignment Primary Vertex Validation workflows,
4 usage:
5 
6 submitPVValidationJobs.py -j TEST -D /HLTPhysics/Run2016C-TkAlMinBias-07Dec2018-v1/ALCARECO -i testPVValidation_Relvals_DATA.ini -r
7 '''
8 
9 from __future__ import print_function
10 from builtins import range
11 
12 __author__ = 'Marco Musich'
13 __copyright__ = 'Copyright 2020, CERN CMS'
14 __credits__ = ['Ernesto Migliore', 'Salvatore Di Guida']
15 __license__ = 'Unknown'
16 __maintainer__ = 'Marco Musich'
17 __email__ = 'marco.musich@cern.ch'
18 __version__ = 1
19 
20 import datetime,time
21 import os,sys
22 import copy
23 import pickle
24 import string, re
25 import configparser as ConfigParser
26 import json
27 import pprint
28 import subprocess
29 from optparse import OptionParser
30 from subprocess import Popen, PIPE
31 import collections
32 import warnings
33 import shutil
34 import multiprocessing
35 from enum import Enum
36 
37 class RefitType(Enum):
38  STANDARD = 1
39  COMMON = 2
40 
41 CopyRights = '##################################\n'
42 CopyRights += '# submitPVValidationJobs.py #\n'
43 CopyRights += '# marco.musich@cern.ch #\n'
44 CopyRights += '# April 2020 #\n'
45 CopyRights += '##################################\n'
46 
47 
49 
50  """Check if GRID proxy has been initialized."""
51 
52  try:
53  with open(os.devnull, "w") as dump:
54  subprocess.check_call(["voms-proxy-info", "--exists"],
55  stdout = dump, stderr = dump)
56  except subprocess.CalledProcessError:
57  return False
58  return True
59 
60 
61 def forward_proxy(rundir):
62 
63  """Forward proxy to location visible from the batch system.
64  Arguments:
65  - `rundir`: directory for storing the forwarded proxy
66  """
67 
68  if not check_proxy():
69  print("Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
70  sys.exit(1)
71 
72  local_proxy = subprocess.check_output(["voms-proxy-info", "--path"]).strip()
73  shutil.copyfile(local_proxy, os.path.join(rundir,".user_proxy"))
74 
75 
76 def write_HTCondor_submit_file(path, name, nruns, proxy_path=None):
77 
78  """Writes 'job.submit' file in `path`.
79  Arguments:
80  - `path`: job directory
81  - `script`: script to be executed
82  - `proxy_path`: path to proxy (only used in case of requested proxy forward)
83  """
84 
85  job_submit_template="""\
86 universe = vanilla
87 requirements = (OpSysAndVer =?= "CentOS7")
88 executable = {script:s}
89 output = {jobm:s}/{out:s}.out
90 error = {jobm:s}/{out:s}.err
91 log = {jobm:s}/{out:s}.log
92 transfer_output_files = ""
93 +JobFlavour = "{flavour:s}"
94 queue {njobs:s}
95 """
96  if proxy_path is not None:
97  job_submit_template += """\
98 +x509userproxy = "{proxy:s}"
99 """
100 
101  job_submit_file = os.path.join(path, "job_"+name+".submit")
102  with open(job_submit_file, "w") as f:
103  f.write(job_submit_template.format(script = os.path.join(path,name+"_$(ProcId).sh"),
104  out = name+"_$(ProcId)",
105  jobm = os.path.abspath(path),
106  flavour = "tomorrow",
107  njobs = str(nruns),
108  proxy = proxy_path))
109 
110  return job_submit_file
111 
112 
113 def getCommandOutput(command):
114 
115  """This function executes `command` and returns it output.
116  Arguments:
117  - `command`: Shell command to be invoked by this function.
118  """
119  child = os.popen(command)
120  data = child.read()
121  err = child.close()
122  if err:
123  print('%s failed w/ exit code %d' % (command, err))
124  return data
125 
126 
127 def getFilesForRun(blob):
128 
129  cmd2 = ' dasgoclient -limit=0 -query \'file run='+blob[0][0]+' dataset='+blob[0][1]+ (' instance='+blob[1]+'\'' if (blob[1] is not None) else '\'')
130  #cmd2 = 'dasgoclient -query \'file run='+blob[0]+' dataset='+blob[1]+'\''
131  q = Popen(cmd2 , shell=True, stdout=PIPE, stderr=PIPE)
132  out, err = q.communicate()
133  #print(cmd2,'\n',out.rstrip('\n'))
134  outputList = out.decode().split('\n')
135  outputList.pop()
136  return outputList #,err
137 
138 
139 def getNEvents(run, dataset):
140 
141  nEvents = subprocess.check_output(["das_client", "--limit", "0", "--query", "summary run={} dataset={} | grep summary.nevents".format(run, dataset)])
142  return 0 if nEvents == "[]\n" else int(nEvents)
143 
144 
145 def getLuminosity(homedir,minRun,maxRun,isRunBased,verbose):
146 
147  """Expects something like
148  +-------+------+--------+--------+-------------------+------------------+
149  | nfill | nrun | nls | ncms | totdelivered(/fb) | totrecorded(/fb) |
150  +-------+------+--------+--------+-------------------+------------------+
151  | 73 | 327 | 142418 | 138935 | 19.562 | 18.036 |
152  +-------+------+--------+--------+-------------------+------------------+
153  And extracts the total recorded luminosity (/b).
154  """
155  myCachedLumi={}
156  if(not isRunBased):
157  return myCachedLumi
158 
159  try:
160  #output = subprocess.check_output([homedir+"/.local/bin/brilcalc", "lumi", "-b", "STABLE BEAMS", "--normtag=/afs/cern.ch/user/l/lumipro/public/normtag_file/normtag_BRIL.json", "-u", "/pb", "--begin", str(minRun),"--end",str(maxRun),"--output-style","csv"])
161 
162  output = subprocess.check_output([homedir+"/.local/bin/brilcalc", "lumi", "-b", "STABLE BEAMS","-u", "/pb", "--begin", str(minRun),"--end",str(maxRun),"--output-style","csv","-c","web"])
163  except:
164  warnings.warn('ATTENTION! Impossible to query the BRIL DB!')
165  return myCachedLumi
166 
167  if(verbose):
168  print("INSIDE GET LUMINOSITY")
169  print(output)
170 
171  for line in output.decode().split("\n"):
172  if ("#" not in line):
173  runToCache = line.split(",")[0].split(":")[0]
174  lumiToCache = line.split(",")[-1].replace("\r", "")
175  #print "run",runToCache
176  #print "lumi",lumiToCache
177  myCachedLumi[runToCache] = lumiToCache
178 
179  if(verbose):
180  print(myCachedLumi)
181  return myCachedLumi
182 
183 
184 def isInJSON(run,jsonfile):
185 
186  try:
187  with open(jsonfile, 'r') as myJSON: jsonDATA = json.load(myJSON)
188  return (run in jsonDATA)
189  except:
190  warnings.warn('ATTENTION! Impossible to find lumi mask! All runs will be used.')
191  return True
192 
193 
194 def as_dict(config):
196  dictionary = {}
197  for section in config.sections():
198  dictionary[section] = {}
199  for option in config.options(section):
200  dictionary[section][option] = config.get(section, option)
201 
202  return dictionary
203 
204 
205 def to_bool(value):
207  """
208  Converts 'something' to boolean. Raises exception for invalid formats
209  Possible True values: 1, True, "1", "TRue", "yes", "y", "t"
210  Possible False values: 0, False, None, [], {}, "", "0", "faLse", "no", "n", "f", 0.0, ...
211  """
212  if str(value).lower() in ("yes", "y", "true", "t", "1"): return True
213  if str(value).lower() in ("no", "n", "false", "f", "0", "0.0", "", "none", "[]", "{}"): return False
214  raise Exception('Invalid value for boolean conversion: ' + str(value))
215 
216 
217 def updateDB2():
219  dbName = "runInfo.pkl"
220  infos = {}
221  if os.path.exists(dbName):
222  with open(dbName,'rb') as f:
223  infos = pickle.load(f)
224 
225  for f in glob.glob("root-files/Run*.root"):
226  run = runFromFilename(f)
227  if run not in infos:
228  infos[run] = {}
229  infos[run]["start_time"] = getRunStartTime(run)
230  infos["isValid"] = isValid(f)
231 
232  with open(dbName, "wb") as f:
233  pickle.dump(infos, f)
234 
235 
236 def updateDB(run,runInfo):
238  dbName = "runInfo.pkl"
239  infos = {}
240  if os.path.exists(dbName):
241  with open(dbName,'rb') as f:
242  infos = pickle.load(f)
243 
244  if run not in infos:
245  infos[run] = runInfo
246 
247  with open(dbName, "wb") as f:
248  pickle.dump(infos, f)
249 
250 
251 class BetterConfigParser(ConfigParser.ConfigParser):
253 
254  def optionxform(self, optionstr):
255  return optionstr
256 
257 
258  def exists( self, section, option):
259  try:
260  items = self.items(section)
261  except ConfigParser.NoSectionError:
262  return False
263  for item in items:
264  if item[0] == option:
265  return True
266  return False
267 
268 
269  def __updateDict( self, dictionary, section ):
270  result = dictionary
271  try:
272  for option in self.options( section ):
273  result[option] = self.get( section, option )
274  if "local"+section.title() in self.sections():
275  for option in self.options( "local"+section.title() ):
276  result[option] = self.get( "local"+section.title(),option )
277  except ConfigParser.NoSectionError as section:
278  msg = ("%s in configuration files. This section is mandatory."
279  %(str(section).replace(":", "", 1)))
280  #raise AllInOneError(msg)
281  return result
282 
283 
284  def getResultingSection( self, section, defaultDict = {}, demandPars = [] ):
285  result = copy.deepcopy(defaultDict)
286  for option in demandPars:
287  try:
288  result[option] = self.get( section, option )
289  except ConfigParser.NoOptionError as globalSectionError:
290  globalSection = str( globalSectionError ).split( "'" )[-2]
291  splittedSectionName = section.split( ":" )
292  if len( splittedSectionName ) > 1:
293  localSection = ("local"+section.split( ":" )[0].title()+":"
294  +section.split(":")[1])
295  else:
296  localSection = ("local"+section.split( ":" )[0].title())
297  if self.has_section( localSection ):
298  try:
299  result[option] = self.get( localSection, option )
300  except ConfigParser.NoOptionError as option:
301  msg = ("%s. This option is mandatory."
302  %(str(option).replace(":", "", 1).replace(
303  "section",
304  "section '"+globalSection+"' or", 1)))
305  #raise AllInOneError(msg)
306  else:
307  msg = ("%s. This option is mandatory."
308  %(str(globalSectionError).replace(":", "", 1)))
309  #raise AllInOneError(msg)
310  result = self.__updateDict( result, section )
311  #print(result)
312  return result
313 
314 
315 def ConfigSectionMap(config, section):
316  the_dict = {}
317  options = config.options(section)
318  for option in options:
319  try:
320  the_dict[option] = config.get(section, option)
321  if the_dict[option] == -1:
322  DebugPrint("skip: %s" % option)
323  except:
324  print("exception on %s!" % option)
325  the_dict[option] = None
326  return the_dict
327 
328 
329 def mkdir_eos(out_path):
330  print("creating",out_path)
331  newpath='/'
332  for dir in out_path.split('/'):
333  newpath=os.path.join(newpath,dir)
334  # do not issue mkdir from very top of the tree
335  if newpath.find('test_out') > 0:
336  #getCommandOutput("eos mkdir"+newpath)
337  command="/afs/cern.ch/project/eos/installation/cms/bin/eos.select mkdir "+newpath
338  p = subprocess.Popen(command,shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
339  (out, err) = p.communicate()
340  #print(out,err)
341  p.wait()
342 
343  # now check that the directory exists
344  command2="/afs/cern.ch/project/eos/installation/cms/bin/eos.select ls "+out_path
345  p = subprocess.Popen(command2,shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
346  (out, err) = p.communicate()
347  p.wait()
348  if p.returncode !=0:
349  print(out)
350 
351 def split(sequence, size):
357  for i in range(0, len(sequence), size):
358  yield sequence[i:i+size]
359 
360 
361 class Job:
363 
364  def __init__(self,dataset, job_number, job_id, job_name, isDA, isMC, applyBOWS, applyEXTRACOND, extraconditions, runboundary, lumilist, intlumi, maxevents, gt, allFromGT, alignmentDB, alignmentTAG, apeDB, apeTAG, bowDB, bowTAG, vertextype, tracktype, refittertype, ttrhtype, applyruncontrol, ptcut, CMSSW_dir ,the_dir):
366 
367  theDataSet = dataset.split("/")[1]+"_"+(dataset.split("/")[2]).split("-")[0]
369  self.data = theDataSet
370  self.job_number = job_number
371  self.job_id = job_id
372  self.batch_job_id = None
373  self.job_name = job_name
374 
375  self.isDA = isDA
376  self.isMC = isMC
377  self.applyBOWS = applyBOWS
378  self.applyEXTRACOND = applyEXTRACOND
379  self.extraCondVect = extraconditions
380  self.runboundary = runboundary
381  self.lumilist = lumilist
382  self.intlumi = intlumi
383  self.maxevents = maxevents
384  self.gt = gt
385  self.allFromGT = allFromGT
386  self.alignmentDB = alignmentDB
387  self.alignmentTAG = alignmentTAG
388  self.apeDB = apeDB
389  self.apeTAG = apeTAG
390  self.bowDB = bowDB
391  self.bowTAG = bowTAG
392  self.vertextype = vertextype
393  self.tracktype = tracktype
394  self.refittertype = refittertype
395  self.ttrhtype = ttrhtype
396  self.applyruncontrol = applyruncontrol
397  self.ptcut = ptcut
399  self.the_dir=the_dir
400  self.CMSSW_dir=CMSSW_dir
402  self.output_full_name=self.getOutputBaseName()+"_"+str(self.job_id)
403  self.output_number_name=self.getOutputBaseNameWithData()+"_"+str(self.job_number)
404 
405  self.cfg_dir=None
406  self.outputCfgName=None
407 
408  # LSF variables
409  self.LSF_dir=None
410  self.BASH_dir=None
411  self.output_LSF_name=None
412  self.output_BASH_name=None
414  self.lfn_list=list()
416  def __del__(self):
418  del self.lfn_list
419 
420  def setEOSout(self,theEOSdir):
422  self.OUTDIR = theEOSdir
423 
424  def getOutputBaseName(self):
426  return "PVValidation_"+self.job_name
427 
428  def getOutputBaseNameWithData(self):
430  return "PVValidation_"+self.job_name+"_"+self.data
431 
432  def createTheCfgFile(self,lfn):
434 
435  global CopyRights
436  # write the cfg file
437 
438  self.cfg_dir = os.path.join(self.the_dir,"cfg")
439  if not os.path.exists(self.cfg_dir):
440  os.makedirs(self.cfg_dir)
441 
442  self.outputCfgName=self.output_full_name+"_cfg.py"
443  fout=open(os.path.join(self.cfg_dir,self.outputCfgName),'w')
445  template_cfg_file = os.path.join(self.the_dir,"PVValidation_T_cfg.py")
447  fin = open(template_cfg_file)
449  config_txt = '\n\n' + CopyRights + '\n\n'
450  config_txt += fin.read()
451 
452  config_txt=config_txt.replace("ISDATEMPLATE",self.isDA)
453  config_txt=config_txt.replace("ISMCTEMPLATE",self.isMC)
454  config_txt=config_txt.replace("APPLYBOWSTEMPLATE",self.applyBOWS)
455  config_txt=config_txt.replace("EXTRACONDTEMPLATE",self.applyEXTRACOND)
456  config_txt=config_txt.replace("USEFILELISTTEMPLATE","True")
457  config_txt=config_txt.replace("RUNBOUNDARYTEMPLATE",self.runboundary)
458  config_txt=config_txt.replace("LUMILISTTEMPLATE",self.lumilist)
459  config_txt=config_txt.replace("MAXEVENTSTEMPLATE",self.maxevents)
460  config_txt=config_txt.replace("GLOBALTAGTEMPLATE",self.gt)
461  config_txt=config_txt.replace("ALLFROMGTTEMPLATE",self.allFromGT)
462  config_txt=config_txt.replace("ALIGNOBJTEMPLATE",self.alignmentDB)
463  config_txt=config_txt.replace("GEOMTAGTEMPLATE",self.alignmentTAG)
464  config_txt=config_txt.replace("APEOBJTEMPLATE",self.apeDB)
465  config_txt=config_txt.replace("ERRORTAGTEMPLATE",self.apeTAG)
466  config_txt=config_txt.replace("BOWSOBJECTTEMPLATE",self.bowDB)
467  config_txt=config_txt.replace("BOWSTAGTEMPLATE",self.bowTAG)
468  config_txt=config_txt.replace("VERTEXTYPETEMPLATE",self.vertextype)
469  config_txt=config_txt.replace("TRACKTYPETEMPLATE",self.tracktype)
470  config_txt=config_txt.replace("REFITTERTEMPLATE",self.refittertype)
471  config_txt=config_txt.replace("TTRHBUILDERTEMPLATE",self.ttrhtype)
472  config_txt=config_txt.replace("PTCUTTEMPLATE",self.ptcut)
473  config_txt=config_txt.replace("INTLUMITEMPLATE",self.intlumi)
474  config_txt=config_txt.replace("RUNCONTROLTEMPLATE",self.applyruncontrol)
475  lfn_with_quotes = map(lambda x: "\'"+x+"\'",lfn)
476  config_txt=config_txt.replace("FILESOURCETEMPLATE","["+",".join(lfn_with_quotes)+"]")
477  config_txt=config_txt.replace("OUTFILETEMPLATE",self.output_full_name+".root")
478 
479  fout.write(config_txt)
480 
481  for line in fin.readlines():
482 
483  if 'END OF EXTRA CONDITIONS' in line:
484  for element in self.extraCondVect :
485  if("Rcd" in element):
486  params = self.extraCondVect[element].split(',')
488  fout.write(" \n")
489  fout.write(" process.conditionsIn"+element+"= CalibTracker.Configuration.Common.PoolDBESSource_cfi.poolDBESSource.clone( \n")
490  fout.write(" connect = cms.string('"+params[0]+"'), \n")
491  fout.write(" toGet = cms.VPSet(cms.PSet(record = cms.string('"+element+"'), \n")
492  fout.write(" tag = cms.string('"+params[1]+"'), \n")
493  if (len(params)>2):
494  fout.write(" label = cms.untracked.string('"+params[2]+"') \n")
495  fout.write(" ) \n")
496  fout.write(" ) \n")
497  fout.write(" ) \n")
498  fout.write(" process.prefer_conditionsIn"+element+" = cms.ESPrefer(\"PoolDBESSource\", \"conditionsIn"+element[0]+"\") \n \n")
499  fout.write(line)
500  fout.close()
501 
502  def createTheLSFFile(self):
504 
505  # directory to store the LSF to be submitted
506  self.LSF_dir = os.path.join(self.the_dir,"LSF")
507  if not os.path.exists(self.LSF_dir):
508  os.makedirs(self.LSF_dir)
509 
510  self.output_LSF_name=self.output_full_name+".lsf"
511  fout=open(os.path.join(self.LSF_dir,self.output_LSF_name),'w')
512 
513  job_name = self.output_full_name
514 
515  log_dir = os.path.join(self.the_dir,"log")
516  if not os.path.exists(log_dir):
517  os.makedirs(log_dir)
518 
519  fout.write("#!/bin/sh \n")
520  fout.write("#BSUB -L /bin/sh\n")
521  fout.write("#BSUB -J "+job_name+"\n")
522  fout.write("#BSUB -o "+os.path.join(log_dir,job_name+".log")+"\n")
523  fout.write("#BSUB -q cmscaf1nd \n")
524  fout.write("JobName="+job_name+" \n")
525  fout.write("OUT_DIR="+self.OUTDIR+" \n")
526  fout.write("LXBATCH_DIR=`pwd` \n")
527  fout.write("cd "+os.path.join(self.CMSSW_dir,"src")+" \n")
528  fout.write("eval `scram runtime -sh` \n")
529  fout.write("cd $LXBATCH_DIR \n")
530  fout.write("cmsRun "+os.path.join(self.cfg_dir,self.outputCfgName)+" \n")
531  fout.write("ls -lh . \n")
532  fout.write("for RootOutputFile in $(ls *root ); do xrdcp -f ${RootOutputFile} root://eoscms//eos/cms${OUT_DIR}/${RootOutputFile} ; done \n")
533  fout.write("for TxtOutputFile in $(ls *txt ); do xrdcp -f ${TxtOutputFile} root://eoscms//eos/cms${OUT_DIR}/${TxtOutputFile} ; done \n")
534 
535  fout.close()
536 
537 
538  def createTheBashFile(self):
540 
541  # directory to store the BASH to be submitted
542  self.BASH_dir = os.path.join(self.the_dir,"BASH")
543  if not os.path.exists(self.BASH_dir):
544  os.makedirs(self.BASH_dir)
545 
546  self.output_BASH_name=self.output_number_name+".sh"
547  fout=open(os.path.join(self.BASH_dir,self.output_BASH_name),'w')
548 
549  job_name = self.output_full_name
550 
551  log_dir = os.path.join(self.the_dir,"log")
552  if not os.path.exists(log_dir):
553  os.makedirs(log_dir)
554 
555  fout.write("#!/bin/bash \n")
556  #fout.write("export EOS_MGM_URL=root://eoscms.cern.ch \n")
557  fout.write("JobName="+job_name+" \n")
558  fout.write("echo \"Job started at \" `date` \n")
559  fout.write("CMSSW_DIR="+os.path.join(self.CMSSW_dir,"src")+" \n")
560  fout.write("export X509_USER_PROXY=$CMSSW_DIR/Alignment/OfflineValidation/test/.user_proxy \n")
561  fout.write("OUT_DIR="+self.OUTDIR+" \n")
562  fout.write("LXBATCH_DIR=$PWD \n")
563  #fout.write("cd "+os.path.join(self.CMSSW_dir,"src")+" \n")
564  fout.write("cd ${CMSSW_DIR} \n")
565  fout.write("eval `scramv1 runtime -sh` \n")
566  fout.write("echo \"batch dir: $LXBATCH_DIR release: $CMSSW_DIR release base: $CMSSW_RELEASE_BASE\" \n")
567  fout.write("cd $LXBATCH_DIR \n")
568  fout.write("cp "+os.path.join(self.cfg_dir,self.outputCfgName)+" . \n")
569  fout.write("echo \"cmsRun "+self.outputCfgName+"\" \n")
570  fout.write("cmsRun "+self.outputCfgName+" \n")
571  fout.write("echo \"Content of working dir is \"`ls -lh` \n")
572  #fout.write("less condor_exec.exe \n")
573  fout.write("for RootOutputFile in $(ls *root ); do xrdcp -f ${RootOutputFile} root://eoscms//eos/cms${OUT_DIR}/${RootOutputFile} ; done \n")
574  #fout.write("mv ${JobName}.out ${CMSSW_DIR}/BASH \n")
575  fout.write("echo \"Job ended at \" `date` \n")
576  fout.write("exit 0 \n")
577 
578  fout.close()
579 
580  def getOutputFileName(self):
582  return os.path.join(self.OUTDIR,self.output_full_name+".root")
583 
584  def submit(self):
586  print("submit job", self.job_id)
587  job_name = self.output_full_name
588  submitcommand1 = "chmod u+x " + os.path.join(self.LSF_dir,self.output_LSF_name)
589  child1 = os.system(submitcommand1)
590  #submitcommand2 = "bsub < "+os.path.join(self.LSF_dir,self.output_LSF_name)
591  #child2 = os.system(submitcommand2)
592  self.batch_job_id = getCommandOutput("bsub < "+os.path.join(self.LSF_dir,self.output_LSF_name))
593 
594  def getBatchjobId(self):
596  return self.batch_job_id.split("<")[1].split(">")[0]
597 
598 
599 def main():
601 
602 
603  if not check_proxy():
604  print("Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
605  sys.exit(1)
606 
607 
608  forward_proxy(".")
609 
610  global CopyRights
611  print('\n'+CopyRights)
612 
613  HOME = os.environ.get('HOME')
615  # CMSSW section
616  input_CMSSW_BASE = os.environ.get('CMSSW_BASE')
617  AnalysisStep_dir = os.path.join(input_CMSSW_BASE,"src/Alignment/OfflineValidation/test")
618  lib_path = os.path.abspath(AnalysisStep_dir)
619  sys.path.append(lib_path)
620 
621 
622  srcFiles = []
624  desc="""This is a description of %prog."""
625  parser = OptionParser(description=desc,version='%prog version 0.1')
626  parser.add_option('-s','--submit', help='job submitted', dest='submit', action='store_true', default=False)
627  parser.add_option('-j','--jobname', help='task name', dest='taskname', action='store', default='myTask')
628  parser.add_option('-D','--dataset', help='selected dataset', dest='data', action='store', default='')
629  parser.add_option('-r','--doRunBased',help='selected dataset', dest='doRunBased', action='store_true' , default=False)
630  parser.add_option('-i','--input', help='set input configuration (overrides default)', dest='inputconfig',action='store',default=None)
631  parser.add_option('-b','--begin', help='starting point', dest='start', action='store', default='1')
632  parser.add_option('-e','--end', help='ending point', dest='end', action='store', default='999999')
633  parser.add_option('-v','--verbose', help='verbose output', dest='verbose', action='store_true', default=False)
634  parser.add_option('-u','--unitTest', help='unit tests?', dest='isUnitTest', action='store_true', default=False)
635  parser.add_option('-I','--instance', help='DAS instance to use', dest='instance', action='store', default=None)
636  (opts, args) = parser.parse_args()
638  now = datetime.datetime.now()
639  #t = now.strftime("test_%Y_%m_%d_%H_%M_%S_")
640  #t = "2016UltraLegacy"
641  #t = "2017UltraLegacy"
642  #t = "2018UltraLegacy"
643  t=""
644  t+=opts.taskname
645 
646  USER = os.environ.get('USER')
647  eosdir=os.path.join("/store/group/alca_trackeralign",USER,"test_out",t)
648 
649  if opts.submit:
650  mkdir_eos(eosdir)
651  else:
652  print("Not going to create EOS folder. -s option has not been chosen")
653 
654 
655 
656  jobName = []
657  isMC = []
658  isDA = []
659  doRunBased = []
660  maxevents = []
661 
662  gt = []
663  allFromGT = []
664  applyEXTRACOND = []
665  extraCondVect = []
666  alignmentDB = []
667  alignmentTAG = []
668  apeDB = []
669  apeTAG = []
670  applyBOWS = []
671  bowDB = []
672  bowTAG = []
673  conditions = []
674 
675  vertextype = []
676  tracktype = []
677  refittertype = []
678  ttrhtype = []
679 
680  applyruncontrol = []
681  ptcut = []
682  runboundary = []
683  lumilist = []
684 
685  ConfigFile = opts.inputconfig
686 
687  if ConfigFile is not None:
688 
689  print("********************************************************")
690  print("* Parsing from input file:", ConfigFile," ")
691 
692  config = BetterConfigParser()
693  config.read(ConfigFile)
694 
695  print("Parsed the following configuration \n\n")
696  inputDict = as_dict(config)
697  pprint.pprint(inputDict)
698 
699  if(not bool(inputDict)):
700  raise SystemExit("\n\n ERROR! Could not parse any input file, perhaps you are submitting this from the wrong folder? \n\n")
701 
702  #print config.sections()
703 
704  # please notice: since in principle one wants to run on several different samples simultaneously,
705  # all these inputs are vectors
706 
707  doRunBased = opts.doRunBased
708 
709  listOfValidations = config.getResultingSection("validations")
710 
711  for item in listOfValidations:
712  if (bool(listOfValidations[item]) == True):
713 
714  jobName.append(ConfigSectionMap(config,"Conditions:"+item)['jobname'])
715  isDA.append(ConfigSectionMap(config,"Job")['isda'])
716  isMC.append(ConfigSectionMap(config,"Job")['ismc'])
717  maxevents.append(ConfigSectionMap(config,"Job")['maxevents'])
718 
719  gt.append(ConfigSectionMap(config,"Conditions:"+item)['gt'])
720  allFromGT.append(ConfigSectionMap(config,"Conditions:"+item)['allFromGT'])
721  applyEXTRACOND.append(ConfigSectionMap(config,"Conditions:"+item)['applyextracond'])
722  conditions.append(config.getResultingSection("ExtraConditions"))
723 
724  alignmentDB.append(ConfigSectionMap(config,"Conditions:"+item)['alignmentdb'])
725  alignmentTAG.append(ConfigSectionMap(config,"Conditions:"+item)['alignmenttag'])
726  apeDB.append(ConfigSectionMap(config,"Conditions:"+item)['apedb'])
727  apeTAG.append(ConfigSectionMap(config,"Conditions:"+item)['apetag'])
728  applyBOWS.append(ConfigSectionMap(config,"Conditions:"+item)['applybows'])
729  bowDB.append(ConfigSectionMap(config,"Conditions:"+item)['bowdb'])
730  bowTAG.append(ConfigSectionMap(config,"Conditions:"+item)['bowtag'])
731 
732  vertextype.append(ConfigSectionMap(config,"Type")['vertextype'])
733  tracktype.append(ConfigSectionMap(config,"Type")['tracktype'])
734 
735 
736 
737  if(config.exists("Refit","refittertype")):
738  refittertype.append(ConfigSectionMap(config,"Refit")['refittertype'])
739  else:
740  refittertype.append(str(RefitType.COMMON))
741 
742  if(config.exists("Refit","ttrhtype")):
743  ttrhtype.append(ConfigSectionMap(config,"Refit")['ttrhtype'])
744  else:
745  ttrhtype.append("WithAngleAndTemplate")
746 
747  applyruncontrol.append(ConfigSectionMap(config,"Selection")['applyruncontrol'])
748  ptcut.append(ConfigSectionMap(config,"Selection")['ptcut'])
749  runboundary.append(ConfigSectionMap(config,"Selection")['runboundary'])
750  lumilist.append(ConfigSectionMap(config,"Selection")['lumilist'])
751  else :
752 
753  print("********************************************************")
754  print("* Parsing from command line *")
755  print("********************************************************")
756 
757  jobName = ['testing']
758  isDA = ['True']
759  isMC = ['True']
760  doRunBased = opts.doRunBased
761  maxevents = ['10000']
762 
763  gt = ['74X_dataRun2_Prompt_v4']
764  allFromGT = ['False']
765  applyEXTRACOND = ['False']
766  conditions = [[('SiPixelTemplateDBObjectRcd','frontier://FrontierProd/CMS_CONDITIONS','SiPixelTemplateDBObject_38T_2015_v3_hltvalidation')]]
767  alignmentDB = ['frontier://FrontierProd/CMS_CONDITIONS']
768  alignmentTAG = ['TrackerAlignment_Prompt']
769  apeDB = ['frontier://FrontierProd/CMS_CONDITIONS']
770  apeTAG = ['TrackerAlignmentExtendedErr_2009_v2_express_IOVs']
771  applyBOWS = ['True']
772  bowDB = ['frontier://FrontierProd/CMS_CONDITIONS']
773  bowTAG = ['TrackerSurafceDeformations_v1_express']
774 
775  vertextype = ['offlinePrimaryVertices']
776  tracktype = ['ALCARECOTkAlMinBias']
777 
778  applyruncontrol = ['False']
779  ptcut = ['3']
780  runboundary = ['1']
781  lumilist = ['']
782 
783  # print some of the configuration
784 
785  print("********************************************************")
786  print("* Configuration info *")
787  print("********************************************************")
788  print("- submitted : ",opts.submit)
789  print("- taskname : ",opts.taskname)
790  print("- Jobname : ",jobName)
791  print("- use DA : ",isDA)
792  print("- is MC : ",isMC)
793  print("- is run-based: ",doRunBased)
794  print("- evts/job : ",maxevents)
795  print("- GlobatTag : ",gt)
796  print("- allFromGT? : ",allFromGT)
797  print("- extraCond? : ",applyEXTRACOND)
798  print("- extraCond : ",conditions)
799  print("- Align db : ",alignmentDB)
800  print("- Align tag : ",alignmentTAG)
801  print("- APE db : ",apeDB)
802  print("- APE tag : ",apeTAG)
803  print("- use bows? : ",applyBOWS)
804  print("- K&B db : ",bowDB)
805  print("- K&B tag : ",bowTAG)
806  print("- VertexColl : ",vertextype)
807  print("- TrackColl : ",tracktype)
808  print("- RefitterSeq : ",refittertype)
809  print("- TTRHBuilder : ",ttrhtype)
810  print("- RunControl? : ",applyruncontrol)
811  print("- Pt> ",ptcut)
812  print("- run= ",runboundary)
813  print("- JSON : ",lumilist)
814  print("- Out Dir : ",eosdir)
815 
816  print("********************************************************")
817  print("Will run on",len(jobName),"workflows")
818 
819  myRuns = []
820  mylist = {}
822  if(doRunBased):
823  print(">>>> This is Data!")
824  print(">>>> Doing run based selection")
825  cmd = 'dasgoclient -limit=0 -query \'run dataset='+opts.data + (' instance='+opts.instance+'\'' if (opts.instance is not None) else '\'')
826  p = Popen(cmd , shell=True, stdout=PIPE, stderr=PIPE)
827  out, err = p.communicate()
828  #print(out)
829  listOfRuns=out.decode().split("\n")
830  listOfRuns.pop()
831  listOfRuns.sort()
832  print("Will run on ",len(listOfRuns),"runs: \n",listOfRuns)
833 
834  mytuple=[]
836  print("first run:",opts.start,"last run:",opts.end)
837 
838  for run in listOfRuns:
839  if (int(run)<int(opts.start) or int(run)>int(opts.end)):
840  print("excluding",run)
841  continue
842 
843  if not isInJSON(run,lumilist[0]):
844  continue
845 
846  else:
847  print("'======> taking",run)
848  #print "preparing run",run
849  #if(int(run)%100==0):
850  mytuple.append((run,opts.data))
851 
852  #print mytuple
853 
854  instances=[opts.instance for entry in mytuple]
855  pool = multiprocessing.Pool(processes=20) # start 20 worker processes
856  count = pool.map(getFilesForRun,zip(mytuple,instances))
857  file_info = dict(zip(listOfRuns, count))
859  #print file_info
860 
861  for run in listOfRuns:
862  if (int(run)<int(opts.start) or int(run)>int(opts.end)):
863  print('rejecting run',run,' becasue outside of boundaries')
864  continue
865 
866  if not isInJSON(run,lumilist[0]):
867  print('rejecting run',run,' becasue outside not in JSON')
868  continue
869 
870  #if(int(run)%100==0):
871  # print "preparing run",run
872  myRuns.append(run)
873  #cmd2 = ' das_client --limit=0 --query \'file run='+run+' dataset='+opts.data+'\''
874  #q = Popen(cmd2 , shell=True, stdout=PIPE, stderr=PIPE)
875  #out2, err2 = q.communicate()
876 
877  #out2=getFilesForRun((run,opts.data))
878  #print out2
879  #pool.map(getFilesForRun,run,opts.data)
880 
881 
882  #if run in file_info:
883  #mylist[run] = file_info[run]
884  #print run,mylist[run]
885  #mylist[run] = out2.split('\n')
886  #print mylist
887  #mylist[run].pop()
888  #print mylist
889 
890  od = collections.OrderedDict(sorted(file_info.items()))
891  # print od
892 
893 
894  if(len(myRuns)==0):
895  if(opts.isUnitTest):
896  print('\n')
897  print('=' * 70)
898  print("|| WARNING: won't run on any run, probably DAS returned an empty query,\n|| but that's fine because this is a unit test!")
899  print('=' * 70)
900  print('\n')
901  sys.exit(0)
902  else:
903  raise Exception('Will not run on any run.... please check again the configuration')
904  else:
905  # get from the DB the int luminosities
906  myLumiDB = getLuminosity(HOME,myRuns[0],myRuns[-1],doRunBased,opts.verbose)
908  if(opts.verbose):
909  pprint.pprint(myLumiDB)
910 
911  # start loop on samples
912  for iConf in range(len(jobName)):
913  print("This is Task n.",iConf+1,"of",len(jobName))
914 
915 
916 
917  # for hadd script
918  scripts_dir = os.path.join(AnalysisStep_dir,"scripts")
919  if not os.path.exists(scripts_dir):
920  os.makedirs(scripts_dir)
921  hadd_script_file = os.path.join(scripts_dir,jobName[iConf]+"_"+opts.taskname+".sh")
922  fout = open(hadd_script_file,'w')
923 
924  output_file_list1=list()
925  output_file_list2=list()
926  output_file_list2.append("hadd ")
927 
928  inputFiles = []
930  if (to_bool(isMC[iConf]) or (not to_bool(doRunBased))):
931  if(to_bool(isMC[iConf])):
932  print("this is MC")
933  cmd = 'dasgoclient -query \'file dataset='+opts.data+ (' instance='+opts.instance+'\'' if (opts.instance is not None) else '\'')
934  s = Popen(cmd , shell=True, stdout=PIPE, stderr=PIPE)
935  out,err = s.communicate()
936  mylist = out.decode().split('\n')
937  mylist.pop()
938  #print mylist
939 
940  splitList = split(mylist,10)
941  for files in splitList:
942  inputFiles.append(files)
943  myRuns.append(str(1))
944  else:
945  print("this is DATA (not doing full run-based selection)")
946  print(runboundary[iConf])
947  cmd = 'dasgoclient -query \'file dataset='+opts.data+' run='+runboundary[iConf]+ (' instance='+opts.instance+'\'' if (opts.instance is not None) else '\'')
948  #print cmd
949  s = Popen(cmd , shell=True, stdout=PIPE, stderr=PIPE)
950  out,err = s.communicate()
951  #print(out)
952  mylist = out.decode().split('\n')
953  mylist.pop()
954  #print "len(mylist):",len(mylist)
955  print("mylist:",mylist)
956 
957  splitList = split(mylist,10)
958  for files in splitList:
959  inputFiles.append(files)
960  myRuns.append(str(runboundary[iConf]))
961 
962  myLumiDB = getLuminosity(HOME,myRuns[0],myRuns[-1],True,opts.verbose)
963 
964  else:
965  #pass
966  for element in od:
967  #print mylist[element]
968  inputFiles.append(od[element])
969  #print element,od[element]
970  #print mylist
971 
972  #print inputFiles
973 
974 
975  batchJobIds = []
976  mergedFile = None
978  if(opts.verbose):
979  print("myRuns =====>",myRuns)
980 
981  totalJobs=0
982  theBashDir=None
983  theBaseName=None
985  for jobN,theSrcFiles in enumerate(inputFiles):
986  if(opts.verbose):
987  print("JOB:",jobN,"run",myRuns[jobN],theSrcFiles)
988  else:
989  print("JOB:",jobN,"run",myRuns[jobN])
990  thejobIndex=None
991  theLumi='1'
993  #if(to_bool(isMC[iConf]) and (not to_bool(doRunBased))):
994  if(to_bool(isMC[iConf])):
995  thejobIndex=jobN
996  else:
997  if(doRunBased):
998  thejobIndex=myRuns[jobN]
999  else:
1000  thejobIndex=myRuns[jobN]+"_"+str(jobN)
1001 
1002  if (myRuns[jobN]) in myLumiDB:
1003  theLumi = myLumiDB[myRuns[jobN]]
1004  else:
1005  print("=====> COULD NOT FIND LUMI, setting default = 1/pb")
1006  theLumi='1'
1007  print("int. lumi:",theLumi,"/pb")
1008 
1009  #print 'the configuration is:',iConf,' theJobIndex is:',thejobIndex
1010  #print applyBOWS[iConf],applyEXTRACOND[iConf],conditions[iConf]
1011 
1012  runInfo = {}
1013  runInfo["run"] = myRuns[jobN]
1014  #runInfo["runevents"] = getNEvents(myRuns[jobN],opts.data)
1015  runInfo["conf"] = jobName[iConf]
1016  runInfo["gt"] = gt[iConf]
1017  runInfo["allFromGT"] = allFromGT[iConf]
1018  runInfo["alignmentDB"] = alignmentDB[iConf]
1019  runInfo["alignmentTag"] = alignmentTAG[iConf]
1020  runInfo["apeDB"] = apeDB[iConf]
1021  runInfo["apeTag"] = apeTAG[iConf]
1022  runInfo["applyBows"] = applyBOWS[iConf]
1023  runInfo["bowDB"] = bowDB[iConf]
1024  runInfo["bowTag"] = bowTAG[iConf]
1025  runInfo["ptCut"] = ptcut[iConf]
1026  runInfo["lumilist"] = lumilist[iConf]
1027  runInfo["applyEXTRACOND"] = applyEXTRACOND[iConf]
1028  runInfo["conditions"] = conditions[iConf]
1029  runInfo["nfiles"] = len(theSrcFiles)
1030  runInfo["srcFiles"] = theSrcFiles
1031  runInfo["intLumi"] = theLumi
1032 
1033  updateDB(((iConf+1)*10)+(jobN+1),runInfo)
1034 
1035  totalJobs=totalJobs+1
1036 
1037  aJob = Job(opts.data,
1038  jobN,
1039  thejobIndex,
1040  jobName[iConf],isDA[iConf],isMC[iConf],
1041  applyBOWS[iConf],applyEXTRACOND[iConf],conditions[iConf],
1042  myRuns[jobN], lumilist[iConf], theLumi, maxevents[iConf],
1043  gt[iConf],allFromGT[iConf],
1044  alignmentDB[iConf], alignmentTAG[iConf],
1045  apeDB[iConf], apeTAG[iConf],
1046  bowDB[iConf], bowTAG[iConf],
1047  vertextype[iConf], tracktype[iConf],
1048  refittertype[iConf], ttrhtype[iConf],
1049  applyruncontrol[iConf],
1050  ptcut[iConf],input_CMSSW_BASE,AnalysisStep_dir)
1051 
1052  aJob.setEOSout(eosdir)
1053  aJob.createTheCfgFile(theSrcFiles)
1054  aJob.createTheBashFile()
1055 
1056  output_file_list1.append("xrdcp root://eoscms//eos/cms"+aJob.getOutputFileName()+" /tmp/$USER/"+opts.taskname+" \n")
1057  if jobN == 0:
1058  theBashDir=aJob.BASH_dir
1059  theBaseName=aJob.getOutputBaseNameWithData()
1060  mergedFile = "/tmp/$USER/"+opts.taskname+"/"+aJob.getOutputBaseName()+" "+opts.taskname+".root"
1061  output_file_list2.append("/tmp/$USER/"+opts.taskname+"/"+aJob.getOutputBaseName()+opts.taskname+".root ")
1062  output_file_list2.append("/tmp/$USER/"+opts.taskname+"/"+os.path.split(aJob.getOutputFileName())[1]+" ")
1063  del aJob
1064 
1065  job_submit_file = write_HTCondor_submit_file(theBashDir,theBaseName,totalJobs,None)
1066 
1067  if opts.submit:
1068  os.system("chmod u+x "+theBashDir+"/*.sh")
1069  submissionCommand = "condor_submit "+job_submit_file
1070  submissionOutput = getCommandOutput(submissionCommand)
1071  print(submissionOutput)
1072 
1073  fout.write("#!/bin/bash \n")
1074  fout.write("MAIL=$USER@mail.cern.ch \n")
1075  fout.write("OUT_DIR="+eosdir+"\n")
1076  fout.write("FILE="+str(mergedFile)+"\n")
1077  fout.write("echo $HOST | mail -s \"Harvesting job started\" $USER@mail.cern.ch \n")
1078  fout.write("cd "+os.path.join(input_CMSSW_BASE,"src")+"\n")
1079  fout.write("eval `scram r -sh` \n")
1080  fout.write("mkdir -p /tmp/$USER/"+opts.taskname+" \n")
1081  fout.writelines(output_file_list1)
1082  fout.writelines(output_file_list2)
1083  fout.write("\n")
1084  fout.write("echo \"xrdcp -f $FILE root://eoscms//eos/cms$OUT_DIR\" \n")
1085  fout.write("xrdcp -f $FILE root://eoscms//eos/cms$OUT_DIR \n")
1086  fout.write("echo \"Harvesting for "+opts.taskname+" task is complete; please find output at $OUT_DIR \" | mail -s \"Harvesting for " +opts.taskname +" completed\" $MAIL \n")
1087 
1088  os.system("chmod u+x "+hadd_script_file)
1089 
1090  harvest_conditions = '"' + " && ".join(["ended(" + jobId + ")" for jobId in batchJobIds]) + '"'
1091  print(harvest_conditions)
1092  lastJobCommand = "bsub -o harvester"+opts.taskname+".tmp -q 1nh -w "+harvest_conditions+" "+hadd_script_file
1093  print(lastJobCommand)
1094  if opts.submit:
1095  lastJobOutput = getCommandOutput(lastJobCommand)
1096  print(lastJobOutput)
1097 
1098  fout.close()
1099  del output_file_list1
1100 
1101 
1102 if __name__ == "__main__":
1103  main()
1104 
1105 
1106 
1107 
— Classes —############################
const bool isValid(const Frame &aFrame, const FrameQuality &aQuality, const uint16_t aExpectedPos)
def getLuminosity(homedir, minRun, maxRun, isRunBased, verbose)
def __init__(self, dataset, job_number, job_id, job_name, isDA, isMC, applyBOWS, applyEXTRACOND, extraconditions, runboundary, lumilist, intlumi, maxevents, gt, allFromGT, alignmentDB, alignmentTAG, apeDB, apeTAG, bowDB, bowTAG, vertextype, tracktype, refittertype, ttrhtype, applyruncontrol, ptcut, CMSSW_dir, the_dir)
def replace(string, replacements)
def getResultingSection(self, section, defaultDict={}, demandPars=[])
def mkdir_eos(out_path)
method to create recursively directories on EOS #############
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
def ConfigSectionMap(config, section)
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
static std::string join(char **cmd)
Definition: RemoteFile.cc:19
def write_HTCondor_submit_file(path, name, nruns, proxy_path=None)
Definition: main.py:1
#define str(s)
def setEOSout(self, theEOSdir)