CMS 3D CMS Logo

submitPVValidationJobs.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 
3 '''Script that submits CMS Tracker Alignment Primary Vertex Validation workflows,
4 usage:
5 
6 submitPVValidationJobs.py -j TEST -D /HLTPhysics/Run2016C-TkAlMinBias-07Dec2018-v1/ALCARECO -i testPVValidation_Relvals_DATA.ini -r
7 '''
8 
9 from __future__ import print_function
10 from builtins import range
11 
12 __author__ = 'Marco Musich'
13 __copyright__ = 'Copyright 2020, CERN CMS'
14 __credits__ = ['Ernesto Migliore', 'Salvatore Di Guida']
15 __license__ = 'Unknown'
16 __maintainer__ = 'Marco Musich'
17 __email__ = 'marco.musich@cern.ch'
18 __version__ = 1
19 
20 import datetime,time
21 import os,sys
22 import copy
23 import pickle
24 import string, re
25 import configparser as ConfigParser
26 import json
27 import pprint
28 import subprocess
29 from optparse import OptionParser
30 from subprocess import Popen, PIPE
31 import collections
32 import warnings
33 import shutil
34 import multiprocessing
35 from enum import Enum
36 
37 class RefitType(Enum):
38  STANDARD = 1
39  COMMON = 2
40 
41 CopyRights = '##################################\n'
42 CopyRights += '# submitPVValidationJobs.py #\n'
43 CopyRights += '# marco.musich@cern.ch #\n'
44 CopyRights += '# April 2020 #\n'
45 CopyRights += '##################################\n'
46 
47 
49 
50  """Check if GRID proxy has been initialized."""
51 
52  try:
53  with open(os.devnull, "w") as dump:
54  subprocess.check_call(["voms-proxy-info", "--exists"],
55  stdout = dump, stderr = dump)
56  except subprocess.CalledProcessError:
57  return False
58  return True
59 
60 
61 def forward_proxy(rundir):
62 
63  """Forward proxy to location visible from the batch system.
64  Arguments:
65  - `rundir`: directory for storing the forwarded proxy
66  """
67 
68  if not check_proxy():
69  print("Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
70  sys.exit(1)
71 
72  local_proxy = subprocess.check_output(["voms-proxy-info", "--path"]).strip()
73  shutil.copyfile(local_proxy, os.path.join(rundir,".user_proxy"))
74 
75 
76 def write_HTCondor_submit_file(path, logs, name, nruns, proxy_path=None):
77 
78  """Writes 'job.submit' file in `path`.
79  Arguments:
80  - `path`: job directory
81  - `script`: script to be executed
82  - `proxy_path`: path to proxy (only used in case of requested proxy forward)
83  """
84 
85  job_submit_template="""\
86 universe = vanilla
87 requirements = (OpSysAndVer =?= "AlmaLinux9")
88 executable = {script:s}
89 output = {jobm:s}/{out:s}.out
90 error = {jobm:s}/{out:s}.err
91 log = {jobm:s}/{out:s}.log
92 transfer_output_files = ""
93 +JobFlavour = "{flavour:s}"
94 queue {njobs:s}
95 """
96  if proxy_path is not None:
97  job_submit_template += """\
98 +x509userproxy = "{proxy:s}"
99 """
100 
101  job_submit_file = os.path.join(path, "job_"+name+".submit")
102  with open(job_submit_file, "w") as f:
103  f.write(job_submit_template.format(script = os.path.join(path,name+"_$(ProcId).sh"),
104  out = name+"_$(ProcId)",
105  jobm = os.path.abspath(logs),
106  flavour = "tomorrow",
107  njobs = str(nruns),
108  proxy = proxy_path))
109 
110  return job_submit_file
111 
112 
113 def getCommandOutput(command):
114 
115  """This function executes `command` and returns it output.
116  Arguments:
117  - `command`: Shell command to be invoked by this function.
118  """
119  child = os.popen(command)
120  data = child.read()
121  err = child.close()
122  if err:
123  print('%s failed w/ exit code %d' % (command, err))
124  return data
125 
126 
127 def getFilesForRun(blob):
128 
129  cmd2 = ' dasgoclient -limit=0 -query \'file run='+blob[0][0]+' dataset='+blob[0][1]+ (' instance='+blob[1]+'\'' if (blob[1] is not None) else '\'')
130  #cmd2 = 'dasgoclient -query \'file run='+blob[0]+' dataset='+blob[1]+'\''
131  q = Popen(cmd2 , shell=True, stdout=PIPE, stderr=PIPE)
132  out, err = q.communicate()
133  #print(cmd2,'\n',out.rstrip('\n'))
134  outputList = out.decode().split('\n')
135  outputList.pop()
136  return outputList #,err
137 
138 
139 def getNEvents(run, dataset):
140 
141  nEvents = subprocess.check_output(["das_client", "--limit", "0", "--query", "summary run={} dataset={} | grep summary.nevents".format(run, dataset)])
142  return 0 if nEvents == "[]\n" else int(nEvents)
143 
144 
145 def getLuminosity(homedir,minRun,maxRun,isRunBased,verbose):
146 
147  """Expects something like
148  +-------+------+--------+--------+-------------------+------------------+
149  | nfill | nrun | nls | ncms | totdelivered(/fb) | totrecorded(/fb) |
150  +-------+------+--------+--------+-------------------+------------------+
151  | 73 | 327 | 142418 | 138935 | 19.562 | 18.036 |
152  +-------+------+--------+--------+-------------------+------------------+
153  And extracts the total recorded luminosity (/b).
154  """
155  myCachedLumi={}
156  if(not isRunBased):
157  return myCachedLumi
158 
159  try:
160  #output = subprocess.check_output([homedir+"/.local/bin/brilcalc", "lumi", "-b", "STABLE BEAMS", "--normtag=/afs/cern.ch/user/l/lumipro/public/normtag_file/normtag_BRIL.json", "-u", "/pb", "--begin", str(minRun),"--end",str(maxRun),"--output-style","csv"])
161 
162  output = subprocess.check_output([homedir+"/.local/bin/brilcalc", "lumi", "-b", "STABLE BEAMS","-u", "/pb", "--begin", str(minRun),"--end",str(maxRun),"--output-style","csv","-c","web"])
163  except:
164  warnings.warn('ATTENTION! Impossible to query the BRIL DB!')
165  return myCachedLumi
166 
167  if(verbose):
168  print("INSIDE GET LUMINOSITY")
169  print(output)
170 
171  for line in output.decode().split("\n"):
172  if ("#" not in line):
173  runToCache = line.split(",")[0].split(":")[0]
174  lumiToCache = line.split(",")[-1].replace("\r", "")
175  #print "run",runToCache
176  #print "lumi",lumiToCache
177  myCachedLumi[runToCache] = lumiToCache
178 
179  if(verbose):
180  print(myCachedLumi)
181  return myCachedLumi
182 
183 
184 def isInJSON(run,jsonfile):
185 
186  try:
187  with open(jsonfile, 'r') as myJSON: jsonDATA = json.load(myJSON)
188  return (run in jsonDATA)
189  except:
190  warnings.warn('ATTENTION! Impossible to find lumi mask! All runs will be used.')
191  return True
192 
193 
194 def as_dict(config):
196  dictionary = {}
197  for section in config.sections():
198  dictionary[section] = {}
199  for option in config.options(section):
200  dictionary[section][option] = config.get(section, option)
201 
202  return dictionary
203 
204 
205 def to_bool(value):
207  """
208  Converts 'something' to boolean. Raises exception for invalid formats
209  Possible True values: 1, True, "1", "TRue", "yes", "y", "t"
210  Possible False values: 0, False, None, [], {}, "", "0", "faLse", "no", "n", "f", 0.0, ...
211  """
212  if str(value).lower() in ("yes", "y", "true", "t", "1"): return True
213  if str(value).lower() in ("no", "n", "false", "f", "0", "0.0", "", "none", "[]", "{}"): return False
214  raise Exception('Invalid value for boolean conversion: ' + str(value))
215 
216 
217 def updateDB2():
219  dbName = "runInfo.pkl"
220  infos = {}
221  if os.path.exists(dbName):
222  with open(dbName,'rb') as f:
223  infos = pickle.load(f)
224 
225  for f in glob.glob("root-files/Run*.root"):
226  run = runFromFilename(f)
227  if run not in infos:
228  infos[run] = {}
229  infos[run]["start_time"] = getRunStartTime(run)
230  infos["isValid"] = isValid(f)
231 
232  with open(dbName, "wb") as f:
233  pickle.dump(infos, f)
234 
235 
236 def updateDB(run,runInfo):
238  dbName = "runInfo.pkl"
239  infos = {}
240  if os.path.exists(dbName):
241  with open(dbName,'rb') as f:
242  infos = pickle.load(f)
243 
244  if run not in infos:
245  infos[run] = runInfo
246 
247  with open(dbName, "wb") as f:
248  pickle.dump(infos, f)
249 
250 
251 class BetterConfigParser(ConfigParser.ConfigParser):
253 
254  def optionxform(self, optionstr):
255  return optionstr
256 
257 
258  def exists( self, section, option):
259  try:
260  items = self.items(section)
261  except ConfigParser.NoSectionError:
262  return False
263  for item in items:
264  if item[0] == option:
265  return True
266  return False
267 
268 
269  def __updateDict( self, dictionary, section ):
270  result = dictionary
271  try:
272  for option in self.options( section ):
273  result[option] = self.get( section, option )
274  if "local"+section.title() in self.sections():
275  for option in self.options( "local"+section.title() ):
276  result[option] = self.get( "local"+section.title(),option )
277  except ConfigParser.NoSectionError as section:
278  msg = ("%s in configuration files. This section is mandatory."
279  %(str(section).replace(":", "", 1)))
280  #raise AllInOneError(msg)
281  return result
282 
283 
284  def getResultingSection( self, section, defaultDict = {}, demandPars = [] ):
285  result = copy.deepcopy(defaultDict)
286  for option in demandPars:
287  try:
288  result[option] = self.get( section, option )
289  except ConfigParser.NoOptionError as globalSectionError:
290  globalSection = str( globalSectionError ).split( "'" )[-2]
291  splittedSectionName = section.split( ":" )
292  if len( splittedSectionName ) > 1:
293  localSection = ("local"+section.split( ":" )[0].title()+":"
294  +section.split(":")[1])
295  else:
296  localSection = ("local"+section.split( ":" )[0].title())
297  if self.has_section( localSection ):
298  try:
299  result[option] = self.get( localSection, option )
300  except ConfigParser.NoOptionError as option:
301  msg = ("%s. This option is mandatory."
302  %(str(option).replace(":", "", 1).replace(
303  "section",
304  "section '"+globalSection+"' or", 1)))
305  #raise AllInOneError(msg)
306  else:
307  msg = ("%s. This option is mandatory."
308  %(str(globalSectionError).replace(":", "", 1)))
309  #raise AllInOneError(msg)
310  result = self.__updateDict( result, section )
311  #print(result)
312  return result
313 
314 
315 def ConfigSectionMap(config, section):
316  the_dict = {}
317  options = config.options(section)
318  for option in options:
319  try:
320  the_dict[option] = config.get(section, option)
321  if the_dict[option] == -1:
322  DebugPrint("skip: %s" % option)
323  except:
324  print("exception on %s!" % option)
325  the_dict[option] = None
326  return the_dict
327 
328 
329 def mkdir_eos(out_path):
330  print("============== creating",out_path)
331  newpath='/'
332  for dir in out_path.split('/'):
333  newpath=os.path.join(newpath,dir)
334  # do not issue mkdir from very top of the tree
335  if newpath.find('test_out') > 0:
336  #getCommandOutput("eos mkdir"+newpath)
337  command="eos mkdir "+newpath
338  p = subprocess.Popen(command,shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
339  (out, err) = p.communicate()
340  print("============== created ",out_path)
341  #print(out,err)
342  p.wait()
343 
344  # now check that the directory exists
345  command2="eos ls "+out_path
346  p = subprocess.Popen(command2,shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
347  (out, err) = p.communicate()
348  p.wait()
349  if p.returncode !=0:
350  print(out)
351 
352 def split(sequence, size):
358  for i in range(0, len(sequence), size):
359  yield sequence[i:i+size]
360 
361 
362 class Job:
364 
365  def __init__(self,dataset, job_number, job_id, job_name, isDA, isMC, applyBOWS, applyEXTRACOND, extraconditions, runboundary, lumilist, intlumi, maxevents, gt, allFromGT, alignmentDB, alignmentTAG, apeDB, apeTAG, bowDB, bowTAG, vertextype, tracktype, refittertype, ttrhtype, applyruncontrol, ptcut, CMSSW_dir ,the_dir):
367 
368  theDataSet = dataset.split("/")[1]+"_"+(dataset.split("/")[2]).split("-")[0]
370  self.data = theDataSet
371  self.job_number = job_number
372  self.job_id = job_id
373  self.batch_job_id = None
374  self.job_name = job_name
375 
376  self.isDA = isDA
377  self.isMC = isMC
378  self.applyBOWS = applyBOWS
379  self.applyEXTRACOND = applyEXTRACOND
380  self.extraCondVect = extraconditions
381  self.runboundary = runboundary
382  self.lumilist = lumilist
383  self.intlumi = intlumi
384  self.maxevents = maxevents
385  self.gt = gt
386  self.allFromGT = allFromGT
387  self.alignmentDB = alignmentDB
388  self.alignmentTAG = alignmentTAG
389  self.apeDB = apeDB
390  self.apeTAG = apeTAG
391  self.bowDB = bowDB
392  self.bowTAG = bowTAG
393  self.vertextype = vertextype
394  self.tracktype = tracktype
395  self.refittertype = refittertype
396  self.ttrhtype = ttrhtype
397  self.applyruncontrol = applyruncontrol
398  self.ptcut = ptcut
400  self.the_dir=the_dir
401  self.CMSSW_dir=CMSSW_dir
403  self.output_full_name=self.getOutputBaseName()+"_"+str(self.job_id)
404  self.output_number_name=self.getOutputBaseNameWithData()+"_"+str(self.job_number)
405 
406  self.cfg_dir=None
407  self.outputCfgName=None
408 
409  # LSF variables
410  self.LSF_dir=None
411  self.BASH_dir=None
412  self.output_LSF_name=None
413  self.output_BASH_name=None
415  self.lfn_list=list()
417  def __del__(self):
419  del self.lfn_list
420 
421  def setEOSout(self,theEOSdir):
423  self.OUTDIR = theEOSdir
424 
425  def getOutputBaseName(self):
427  return "PVValidation_"+self.job_name
428 
429  def getOutputBaseNameWithData(self):
431  return "PVValidation_"+self.job_name+"_"+self.data
432 
433  def createTheCfgFile(self,lfn):
435 
436  global CopyRights
437  # write the cfg file
438 
439  self.cfg_dir = os.path.join(self.the_dir,"cfg")
440  if not os.path.exists(self.cfg_dir):
441  os.makedirs(self.cfg_dir)
442 
443  self.outputCfgName=self.output_full_name+"_cfg.py"
444  fout=open(os.path.join(self.cfg_dir,self.outputCfgName),'w')
446  template_cfg_file = os.path.join(self.CMSSW_dir,"src/Alignment/OfflineValidation/test","PVValidation_T_cfg.py")
447  file = open(template_cfg_file,'r')
449  config_txt = '\n\n' + CopyRights + '\n\n'
450  config_txt += file.read()
451  config_txt=config_txt.replace("ISDATEMPLATE",self.isDA)
452  config_txt=config_txt.replace("ISMCTEMPLATE",self.isMC)
453  config_txt=config_txt.replace("APPLYBOWSTEMPLATE",self.applyBOWS)
454  config_txt=config_txt.replace("EXTRACONDTEMPLATE",self.applyEXTRACOND)
455  config_txt=config_txt.replace("USEFILELISTTEMPLATE","True")
456  config_txt=config_txt.replace("RUNBOUNDARYTEMPLATE",self.runboundary)
457  config_txt=config_txt.replace("LUMILISTTEMPLATE",self.lumilist)
458  config_txt=config_txt.replace("MAXEVENTSTEMPLATE",self.maxevents)
459  config_txt=config_txt.replace("GLOBALTAGTEMPLATE",self.gt)
460  config_txt=config_txt.replace("ALLFROMGTTEMPLATE",self.allFromGT)
461  config_txt=config_txt.replace("ALIGNOBJTEMPLATE",self.alignmentDB)
462  config_txt=config_txt.replace("GEOMTAGTEMPLATE",self.alignmentTAG)
463  config_txt=config_txt.replace("APEOBJTEMPLATE",self.apeDB)
464  config_txt=config_txt.replace("ERRORTAGTEMPLATE",self.apeTAG)
465  config_txt=config_txt.replace("BOWSOBJECTTEMPLATE",self.bowDB)
466  config_txt=config_txt.replace("BOWSTAGTEMPLATE",self.bowTAG)
467  config_txt=config_txt.replace("VERTEXTYPETEMPLATE",self.vertextype)
468  config_txt=config_txt.replace("TRACKTYPETEMPLATE",self.tracktype)
469  config_txt=config_txt.replace("REFITTERTEMPLATE",self.refittertype)
470  config_txt=config_txt.replace("TTRHBUILDERTEMPLATE",self.ttrhtype)
471  config_txt=config_txt.replace("PTCUTTEMPLATE",self.ptcut)
472  config_txt=config_txt.replace("INTLUMITEMPLATE",self.intlumi)
473  config_txt=config_txt.replace("RUNCONTROLTEMPLATE",self.applyruncontrol)
474  lfn_with_quotes = map(lambda x: "\'"+x+"\'",lfn)
475  config_txt=config_txt.replace("FILESOURCETEMPLATE","["+",".join(lfn_with_quotes)+"]")
476  config_txt=config_txt.replace("OUTFILETEMPLATE",self.output_full_name+".root")
477 
478 
479  textToWrite=''
480  for element in self.extraCondVect :
481  if("Rcd" in element):
482  params = self.extraCondVect[element].split(',')
483  text = '''\n
484  process.conditionsIn{record} = CalibTracker.Configuration.Common.PoolDBESSource_cfi.poolDBESSource.clone(
485  connect = cms.string('{database}'),
486  toGet = cms.VPSet(cms.PSet(record = cms.string('{record}'),
487  tag = cms.string('{tag}'),
488  label = cms.untracked.string('{label}')
489  )
490  )
491  )
492  process.prefer_conditionsIn{record} = cms.ESPrefer("PoolDBESSource", "conditionsIn{record}")
493  '''.format(record = element, database = params[0], tag = params[1], label = (params[2] if len(params)>2 else ''))
494  textToWrite+=text
495 
496  if(self.applyEXTRACOND=="True"):
497  if not self.extraCondVect:
498  raise Exception('Requested extra conditions, but none provided')
499 
500  config_txt=config_txt.replace("END OF EXTRA CONDITIONS",textToWrite)
501  else:
502  print("INFO: Will not apply any extra conditions")
503  pass
504 
505  fout.write(config_txt)
506 
507  file.close()
508  fout.close()
509 
510  def createTheLSFFile(self):
512 
513  # directory to store the LSF to be submitted
514  self.LSF_dir = os.path.join(self.the_dir,"LSF")
515  if not os.path.exists(self.LSF_dir):
516  os.makedirs(self.LSF_dir)
517 
518  self.output_LSF_name=self.output_full_name+".lsf"
519  fout=open(os.path.join(self.LSF_dir,self.output_LSF_name),'w')
520 
521  job_name = self.output_full_name
522 
523  log_dir = os.path.join(self.the_dir,"log")
524  if not os.path.exists(log_dir):
525  os.makedirs(log_dir)
526 
527  fout.write("#!/bin/sh \n")
528  fout.write("#BSUB -L /bin/sh\n")
529  fout.write("#BSUB -J "+job_name+"\n")
530  fout.write("#BSUB -o "+os.path.join(log_dir,job_name+".log")+"\n")
531  fout.write("#BSUB -q cmscaf1nd \n")
532  fout.write("JobName="+job_name+" \n")
533  fout.write("OUT_DIR="+self.OUTDIR+" \n")
534  fout.write("LXBATCH_DIR=`pwd` \n")
535  fout.write("cd "+os.path.join(self.CMSSW_dir,"src")+" \n")
536  fout.write("eval `scram runtime -sh` \n")
537  fout.write("cd $LXBATCH_DIR \n")
538  fout.write("cmsRun "+os.path.join(self.cfg_dir,self.outputCfgName)+" \n")
539  fout.write("ls -lh . \n")
540  fout.write("for RootOutputFile in $(ls *root ); do xrdcp -f ${RootOutputFile} root://eoscms//eos/cms${OUT_DIR}/${RootOutputFile} ; done \n")
541  fout.write("for TxtOutputFile in $(ls *txt ); do xrdcp -f ${TxtOutputFile} root://eoscms//eos/cms${OUT_DIR}/${TxtOutputFile} ; done \n")
542 
543  fout.close()
544 
545 
546  def createTheBashFile(self, isUnitTest):
548 
549  # directory to store the BASH to be submitted
550  self.BASH_dir = os.path.join(self.the_dir,"BASH")
551  if not os.path.exists(self.BASH_dir):
552  os.makedirs(self.BASH_dir)
553 
554  self.output_BASH_name=self.output_number_name+".sh"
555  fout=open(os.path.join(self.BASH_dir,self.output_BASH_name),'w')
556 
557  job_name = self.output_full_name
558 
559  fout.write("#!/bin/bash \n")
560  #fout.write("export EOS_MGM_URL=root://eoscms.cern.ch \n")
561  fout.write("JobName="+job_name+" \n")
562  fout.write("echo \"Job started at \" `date` \n")
563  fout.write("CMSSW_DIR="+os.path.join(self.CMSSW_dir,"src")+" \n")
564  fout.write("export X509_USER_PROXY=$CMSSW_DIR/Alignment/OfflineValidation/test/.user_proxy \n")
565  fout.write("OUT_DIR="+self.OUTDIR+" \n")
566  fout.write("LXBATCH_DIR=$PWD \n")
567  #fout.write("cd "+os.path.join(self.CMSSW_dir,"src")+" \n")
568  fout.write("cd ${CMSSW_DIR} \n")
569  fout.write("eval `scramv1 runtime -sh` \n")
570  fout.write("echo \"batch dir: $LXBATCH_DIR release: $CMSSW_DIR release base: $CMSSW_RELEASE_BASE\" \n")
571  fout.write("cd $LXBATCH_DIR \n")
572  fout.write("cp "+os.path.join(self.cfg_dir,self.outputCfgName)+" . \n")
573  fout.write("echo \"cmsRun "+self.outputCfgName+"\" \n")
574  fout.write("cmsRun "+self.outputCfgName+" \n")
575  fout.write("echo \"Content of working dir is:\" \n")
576  fout.write("ls -lh | sort \n")
577  #fout.write("less condor_exec.exe \n")
578  if(not isUnitTest):
579  fout.write("for RootOutputFile in $(ls *root ); do xrdcp -f ${RootOutputFile} root://eoscms//eos/cms${OUT_DIR}/${RootOutputFile} ; done \n")
580  #fout.write("mv ${JobName}.out ${CMSSW_DIR}/BASH \n")
581  fout.write("echo \"Job ended at \" `date` \n")
582  fout.write("exit 0 \n")
583 
584  fout.close()
585 
586  def getOutputFileName(self):
588  return os.path.join(self.OUTDIR,self.output_full_name+".root")
589 
590  def submit(self):
592  print("submit job", self.job_id)
593  job_name = self.output_full_name
594  submitcommand1 = "chmod u+x " + os.path.join(self.LSF_dir,self.output_LSF_name)
595  child1 = os.system(submitcommand1)
596  #submitcommand2 = "bsub < "+os.path.join(self.LSF_dir,self.output_LSF_name)
597  #child2 = os.system(submitcommand2)
598  self.batch_job_id = getCommandOutput("bsub < "+os.path.join(self.LSF_dir,self.output_LSF_name))
599 
600  def getBatchjobId(self):
602  return self.batch_job_id.split("<")[1].split(">")[0]
603 
604 
605 def main():
607 
608 
609  if not check_proxy():
610  print("Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
611  sys.exit(1)
612 
613 
614  forward_proxy(".")
615 
616  global CopyRights
617  print('\n'+CopyRights)
618 
619  HOME = os.environ.get('HOME')
621  # CMSSW section
622  input_CMSSW_BASE = os.environ.get('CMSSW_BASE')
623  lib_path = os.path.abspath(os.path.join(input_CMSSW_BASE,"src/Alignment/OfflineValidation/test"))
624  sys.path.append(lib_path)
625 
626 
627  srcFiles = []
629  desc="""This is a description of %prog."""
630  parser = OptionParser(description=desc,version='%prog version 0.1')
631  parser.add_option('-s','--submit', help='job submitted', dest='submit', action='store_true', default=False)
632  parser.add_option('-j','--jobname', help='task name', dest='taskname', action='store', default='myTask')
633  parser.add_option('-D','--dataset', help='selected dataset', dest='data', action='store', default='')
634  parser.add_option('-r','--doRunBased',help='selected dataset', dest='doRunBased', action='store_true' , default=False)
635  parser.add_option('-i','--input', help='set input configuration (overrides default)', dest='inputconfig',action='store',default=None)
636  parser.add_option('-b','--begin', help='starting point', dest='start', action='store', default='1')
637  parser.add_option('-e','--end', help='ending point', dest='end', action='store', default='999999')
638  parser.add_option('-v','--verbose', help='verbose output', dest='verbose', action='store_true', default=False)
639  parser.add_option('-u','--unitTest', help='unit tests?', dest='isUnitTest', action='store_true', default=False)
640  parser.add_option('-I','--instance', help='DAS instance to use', dest='instance', action='store', default=None)
641  (opts, args) = parser.parse_args()
643  now = datetime.datetime.now()
644  #t = now.strftime("test_%Y_%m_%d_%H_%M_%S_")
645  #t = "2016UltraLegacy"
646  #t = "2017UltraLegacy"
647  #t = "2018UltraLegacy"
648  t=""
649  t+=opts.taskname
650 
651  USER = os.environ.get('USER')
652  eosdir=os.path.join("/store/group/alca_trackeralign",USER,"test_out",t)
653 
654  if opts.submit:
655  mkdir_eos(eosdir)
656  else:
657  print("Not going to create EOS folder. -s option has not been chosen")
658 
659 
660 
661  jobName = []
662  isMC = []
663  isDA = []
664  doRunBased = []
665  maxevents = []
666 
667  gt = []
668  allFromGT = []
669  applyEXTRACOND = []
670  extraCondVect = []
671  alignmentDB = []
672  alignmentTAG = []
673  apeDB = []
674  apeTAG = []
675  applyBOWS = []
676  bowDB = []
677  bowTAG = []
678  conditions = []
679 
680  vertextype = []
681  tracktype = []
682  refittertype = []
683  ttrhtype = []
684 
685  applyruncontrol = []
686  ptcut = []
687  runboundary = []
688  lumilist = []
689 
690  ConfigFile = opts.inputconfig
691 
692  if ConfigFile is not None:
693 
694  print("********************************************************")
695  print("* Parsing from input file:", ConfigFile," ")
696 
697  config = BetterConfigParser()
698  config.read(ConfigFile)
699 
700  print("Parsed the following configuration \n\n")
701  inputDict = as_dict(config)
702  pprint.pprint(inputDict)
703 
704  if(not bool(inputDict)):
705  raise SystemExit("\n\n ERROR! Could not parse any input file, perhaps you are submitting this from the wrong folder? \n\n")
706 
707  #print config.sections()
708 
709  # please notice: since in principle one wants to run on several different samples simultaneously,
710  # all these inputs are vectors
711 
712  doRunBased = opts.doRunBased
713 
714  listOfValidations = config.getResultingSection("validations")
715 
716  for item in listOfValidations:
717  if (bool(listOfValidations[item]) == True):
718 
719  jobName.append(ConfigSectionMap(config,"Conditions:"+item)['jobname'])
720  isDA.append(ConfigSectionMap(config,"Job")['isda'])
721  isMC.append(ConfigSectionMap(config,"Job")['ismc'])
722  maxevents.append(ConfigSectionMap(config,"Job")['maxevents'])
723 
724  gt.append(ConfigSectionMap(config,"Conditions:"+item)['gt'])
725  allFromGT.append(ConfigSectionMap(config,"Conditions:"+item)['allFromGT'])
726  applyEXTRACOND.append(ConfigSectionMap(config,"Conditions:"+item)['applyextracond'])
727  conditions.append(config.getResultingSection("ExtraConditions"))
728 
729  alignmentDB.append(ConfigSectionMap(config,"Conditions:"+item)['alignmentdb'])
730  alignmentTAG.append(ConfigSectionMap(config,"Conditions:"+item)['alignmenttag'])
731  apeDB.append(ConfigSectionMap(config,"Conditions:"+item)['apedb'])
732  apeTAG.append(ConfigSectionMap(config,"Conditions:"+item)['apetag'])
733  applyBOWS.append(ConfigSectionMap(config,"Conditions:"+item)['applybows'])
734  bowDB.append(ConfigSectionMap(config,"Conditions:"+item)['bowdb'])
735  bowTAG.append(ConfigSectionMap(config,"Conditions:"+item)['bowtag'])
736 
737  vertextype.append(ConfigSectionMap(config,"Type")['vertextype'])
738  tracktype.append(ConfigSectionMap(config,"Type")['tracktype'])
739 
740 
741 
742  if(config.exists("Refit","refittertype")):
743  refittertype.append(ConfigSectionMap(config,"Refit")['refittertype'])
744  else:
745  refittertype.append(str(RefitType.COMMON))
746 
747  if(config.exists("Refit","ttrhtype")):
748  ttrhtype.append(ConfigSectionMap(config,"Refit")['ttrhtype'])
749  else:
750  ttrhtype.append("WithAngleAndTemplate")
751 
752  applyruncontrol.append(ConfigSectionMap(config,"Selection")['applyruncontrol'])
753  ptcut.append(ConfigSectionMap(config,"Selection")['ptcut'])
754  runboundary.append(ConfigSectionMap(config,"Selection")['runboundary'])
755  lumilist.append(ConfigSectionMap(config,"Selection")['lumilist'])
756  else :
757 
758  print("********************************************************")
759  print("* Parsing from command line *")
760  print("********************************************************")
761 
762  jobName = ['testing']
763  isDA = ['True']
764  isMC = ['True']
765  doRunBased = opts.doRunBased
766  maxevents = ['10000']
767 
768  gt = ['74X_dataRun2_Prompt_v4']
769  allFromGT = ['False']
770  applyEXTRACOND = ['False']
771  conditions = [[('SiPixelTemplateDBObjectRcd','frontier://FrontierProd/CMS_CONDITIONS','SiPixelTemplateDBObject_38T_2015_v3_hltvalidation')]]
772  alignmentDB = ['frontier://FrontierProd/CMS_CONDITIONS']
773  alignmentTAG = ['TrackerAlignment_Prompt']
774  apeDB = ['frontier://FrontierProd/CMS_CONDITIONS']
775  apeTAG = ['TrackerAlignmentExtendedErr_2009_v2_express_IOVs']
776  applyBOWS = ['True']
777  bowDB = ['frontier://FrontierProd/CMS_CONDITIONS']
778  bowTAG = ['TrackerSurafceDeformations_v1_express']
779 
780  vertextype = ['offlinePrimaryVertices']
781  tracktype = ['ALCARECOTkAlMinBias']
782 
783  applyruncontrol = ['False']
784  ptcut = ['3']
785  runboundary = ['1']
786  lumilist = ['']
787 
788  # print some of the configuration
789 
790  print("********************************************************")
791  print("* Configuration info *")
792  print("********************************************************")
793  print("- submitted : ",opts.submit)
794  print("- taskname : ",opts.taskname)
795  print("- Jobname : ",jobName)
796  print("- use DA : ",isDA)
797  print("- is MC : ",isMC)
798  print("- is run-based: ",doRunBased)
799  print("- evts/job : ",maxevents)
800  print("- GlobatTag : ",gt)
801  print("- allFromGT? : ",allFromGT)
802  print("- extraCond? : ",applyEXTRACOND)
803  print("- extraCond : ",conditions)
804  print("- Align db : ",alignmentDB)
805  print("- Align tag : ",alignmentTAG)
806  print("- APE db : ",apeDB)
807  print("- APE tag : ",apeTAG)
808  print("- use bows? : ",applyBOWS)
809  print("- K&B db : ",bowDB)
810  print("- K&B tag : ",bowTAG)
811  print("- VertexColl : ",vertextype)
812  print("- TrackColl : ",tracktype)
813  print("- RefitterSeq : ",refittertype)
814  print("- TTRHBuilder : ",ttrhtype)
815  print("- RunControl? : ",applyruncontrol)
816  print("- Pt> ",ptcut)
817  print("- run= ",runboundary)
818  print("- JSON : ",lumilist)
819  print("- Out Dir : ",eosdir)
820 
821  print("********************************************************")
822  print("Will run on",len(jobName),"workflows")
823 
824  myRuns = []
825  mylist = {}
827  if(doRunBased):
828  print(">>>> This is Data!")
829  print(">>>> Doing run based selection")
830  cmd = 'dasgoclient -limit=0 -query \'run dataset='+opts.data + (' instance='+opts.instance+'\'' if (opts.instance is not None) else '\'')
831  p = Popen(cmd , shell=True, stdout=PIPE, stderr=PIPE)
832  out, err = p.communicate()
833  #print(out)
834  listOfRuns=out.decode().split("\n")
835  listOfRuns.pop()
836  listOfRuns.sort()
837  print("Will run on ",len(listOfRuns),"runs: \n",listOfRuns)
838 
839  mytuple=[]
841  print("first run:",opts.start,"last run:",opts.end)
842 
843  for run in listOfRuns:
844  if (int(run)<int(opts.start) or int(run)>int(opts.end)):
845  print("excluding",run)
846  continue
847 
848  if not isInJSON(run,lumilist[0]):
849  continue
850 
851  else:
852  print("'======> taking",run)
853  #print "preparing run",run
854  #if(int(run)%100==0):
855  mytuple.append((run,opts.data))
856 
857  #print mytuple
858 
859  instances=[opts.instance for entry in mytuple]
860  pool = multiprocessing.Pool(processes=20) # start 20 worker processes
861  count = pool.map(getFilesForRun,zip(mytuple,instances))
862  file_info = dict(zip(listOfRuns, count))
864  #print file_info
865 
866  for run in listOfRuns:
867  if (int(run)<int(opts.start) or int(run)>int(opts.end)):
868  print('rejecting run',run,' becasue outside of boundaries')
869  continue
870 
871  if not isInJSON(run,lumilist[0]):
872  print('rejecting run',run,' becasue outside not in JSON')
873  continue
874 
875  #if(int(run)%100==0):
876  # print "preparing run",run
877  myRuns.append(run)
878  #cmd2 = ' das_client --limit=0 --query \'file run='+run+' dataset='+opts.data+'\''
879  #q = Popen(cmd2 , shell=True, stdout=PIPE, stderr=PIPE)
880  #out2, err2 = q.communicate()
881 
882  #out2=getFilesForRun((run,opts.data))
883  #print out2
884  #pool.map(getFilesForRun,run,opts.data)
885 
886 
887  #if run in file_info:
888  #mylist[run] = file_info[run]
889  #print run,mylist[run]
890  #mylist[run] = out2.split('\n')
891  #print mylist
892  #mylist[run].pop()
893  #print mylist
894 
895  od = collections.OrderedDict(sorted(file_info.items()))
896  # print od
897 
898 
899  if(len(myRuns)==0):
900  if(opts.isUnitTest):
901  print('\n')
902  print('=' * 70)
903  print("|| WARNING: won't run on any run, probably DAS returned an empty query,\n|| but that's fine because this is a unit test!")
904  print('=' * 70)
905  print('\n')
906  sys.exit(0)
907  else:
908  raise Exception('Will not run on any run.... please check again the configuration')
909  else:
910  # get from the DB the int luminosities
911  myLumiDB = getLuminosity(HOME,myRuns[0],myRuns[-1],doRunBased,opts.verbose)
913  if(opts.verbose):
914  pprint.pprint(myLumiDB)
915 
916  # start loop on samples
917  for iConf in range(len(jobName)):
918  print("This is Task n.",iConf+1,"of",len(jobName))
919 
920 
921 
922  # for hadd script
923  scripts_dir = "scripts"
924  if not os.path.exists(scripts_dir):
925  os.makedirs(scripts_dir)
926  hadd_script_file = os.path.join(scripts_dir,jobName[iConf]+"_"+opts.taskname+".sh")
927  fout = open(hadd_script_file,'w')
928 
929  output_file_list1=list()
930  output_file_list2=list()
931  output_file_list2.append("hadd ")
932 
933  inputFiles = []
935  if (to_bool(isMC[iConf]) or (not to_bool(doRunBased))):
936  if(to_bool(isMC[iConf])):
937  print("this is MC")
938  cmd = 'dasgoclient -query \'file dataset='+opts.data+ (' instance='+opts.instance+'\'' if (opts.instance is not None) else '\'')
939  s = Popen(cmd , shell=True, stdout=PIPE, stderr=PIPE)
940  out,err = s.communicate()
941  mylist = out.decode().split('\n')
942  mylist.pop()
943  #print mylist
944 
945  splitList = split(mylist,10)
946  for files in splitList:
947  inputFiles.append(files)
948  myRuns.append(str(1))
949  else:
950  print("this is DATA (not doing full run-based selection)")
951  print(runboundary[iConf])
952  cmd = 'dasgoclient -query \'file dataset='+opts.data+' run='+runboundary[iConf]+ (' instance='+opts.instance+'\'' if (opts.instance is not None) else '\'')
953  #print cmd
954  s = Popen(cmd , shell=True, stdout=PIPE, stderr=PIPE)
955  out,err = s.communicate()
956  #print(out)
957  mylist = out.decode().split('\n')
958  mylist.pop()
959  #print "len(mylist):",len(mylist)
960  print("mylist:",mylist)
961 
962  splitList = split(mylist,10)
963  for files in splitList:
964  inputFiles.append(files)
965  myRuns.append(str(runboundary[iConf]))
966 
967  myLumiDB = getLuminosity(HOME,myRuns[0],myRuns[-1],True,opts.verbose)
968 
969  else:
970  #pass
971  for element in od:
972  #print mylist[element]
973  inputFiles.append(od[element])
974  #print element,od[element]
975  #print mylist
976 
977  #print inputFiles
978 
979 
980  batchJobIds = []
981  mergedFile = None
983  if(opts.verbose):
984  print("myRuns =====>",myRuns)
985 
986  totalJobs=0
987  theBashDir=None
988  theBaseName=None
990  for jobN,theSrcFiles in enumerate(inputFiles):
991  if(opts.verbose):
992  print("JOB:",jobN,"run",myRuns[jobN],theSrcFiles)
993  else:
994  print("JOB:",jobN,"run",myRuns[jobN])
995  thejobIndex=None
996  theLumi='1'
998  #if(to_bool(isMC[iConf]) and (not to_bool(doRunBased))):
999  if(to_bool(isMC[iConf])):
1000  thejobIndex=jobN
1001  else:
1002  if(doRunBased):
1003  thejobIndex=myRuns[jobN]
1004  else:
1005  thejobIndex=myRuns[jobN]+"_"+str(jobN)
1006 
1007  if (myRuns[jobN]) in myLumiDB:
1008  theLumi = myLumiDB[myRuns[jobN]]
1009  else:
1010  print("=====> COULD NOT FIND LUMI, setting default = 1/pb")
1011  theLumi='1'
1012  print("int. lumi:",theLumi,"/pb")
1013 
1014  #print 'the configuration is:',iConf,' theJobIndex is:',thejobIndex
1015  #print applyBOWS[iConf],applyEXTRACOND[iConf],conditions[iConf]
1016 
1017  runInfo = {}
1018  runInfo["run"] = myRuns[jobN]
1019  #runInfo["runevents"] = getNEvents(myRuns[jobN],opts.data)
1020  runInfo["conf"] = jobName[iConf]
1021  runInfo["gt"] = gt[iConf]
1022  runInfo["allFromGT"] = allFromGT[iConf]
1023  runInfo["alignmentDB"] = alignmentDB[iConf]
1024  runInfo["alignmentTag"] = alignmentTAG[iConf]
1025  runInfo["apeDB"] = apeDB[iConf]
1026  runInfo["apeTag"] = apeTAG[iConf]
1027  runInfo["applyBows"] = applyBOWS[iConf]
1028  runInfo["bowDB"] = bowDB[iConf]
1029  runInfo["bowTag"] = bowTAG[iConf]
1030  runInfo["ptCut"] = ptcut[iConf]
1031  runInfo["lumilist"] = lumilist[iConf]
1032  runInfo["applyEXTRACOND"] = applyEXTRACOND[iConf]
1033  runInfo["conditions"] = conditions[iConf]
1034  runInfo["nfiles"] = len(theSrcFiles)
1035  runInfo["srcFiles"] = theSrcFiles
1036  runInfo["intLumi"] = theLumi
1037 
1038  updateDB(((iConf+1)*10)+(jobN+1),runInfo)
1039 
1040  totalJobs=totalJobs+1
1041 
1042  aJob = Job(opts.data,
1043  jobN,
1044  thejobIndex,
1045  jobName[iConf],isDA[iConf],isMC[iConf],
1046  applyBOWS[iConf],applyEXTRACOND[iConf],conditions[iConf],
1047  myRuns[jobN], lumilist[iConf], theLumi, maxevents[iConf],
1048  gt[iConf],allFromGT[iConf],
1049  alignmentDB[iConf], alignmentTAG[iConf],
1050  apeDB[iConf], apeTAG[iConf],
1051  bowDB[iConf], bowTAG[iConf],
1052  vertextype[iConf], tracktype[iConf],
1053  refittertype[iConf], ttrhtype[iConf],
1054  applyruncontrol[iConf],
1055  ptcut[iConf],input_CMSSW_BASE,os.getcwd())
1056 
1057  aJob.setEOSout(eosdir)
1058  aJob.createTheCfgFile(theSrcFiles)
1059  aJob.createTheBashFile(opts.isUnitTest)
1060 
1061  output_file_list1.append("xrdcp root://eoscms//eos/cms"+aJob.getOutputFileName()+" /tmp/$USER/"+opts.taskname+" \n")
1062  if jobN == 0:
1063  theBashDir=aJob.BASH_dir
1064  theBaseName=aJob.getOutputBaseNameWithData()
1065  mergedFile = "/tmp/$USER/"+opts.taskname+"/"+aJob.getOutputBaseName()+" "+opts.taskname+".root"
1066  output_file_list2.append("/tmp/$USER/"+opts.taskname+"/"+aJob.getOutputBaseName()+opts.taskname+".root ")
1067  output_file_list2.append("/tmp/$USER/"+opts.taskname+"/"+os.path.split(aJob.getOutputFileName())[1]+" ")
1068  del aJob
1069 
1070 
1071  theLogDir = os.path.join(os.getcwd(),"log")
1072  if not os.path.exists(theLogDir):
1073  os.makedirs(theLogDir)
1074 
1075  job_submit_file = write_HTCondor_submit_file(theBashDir,theLogDir,theBaseName,totalJobs,None)
1076  os.system("chmod u+x "+theBashDir+"/*.sh")
1077 
1078  if opts.submit:
1079  submissionCommand = "condor_submit "+job_submit_file
1080  submissionOutput = getCommandOutput(submissionCommand)
1081  print(submissionOutput)
1082 
1083  fout.write("#!/bin/bash \n")
1084  fout.write("MAIL=$USER@mail.cern.ch \n")
1085  fout.write("OUT_DIR="+eosdir+"\n")
1086  fout.write("FILE="+str(mergedFile)+"\n")
1087  fout.write("echo $HOST | mail -s \"Harvesting job started\" $USER@mail.cern.ch \n")
1088  fout.write("cd "+os.path.join(input_CMSSW_BASE,"src")+"\n")
1089  fout.write("eval `scram r -sh` \n")
1090  fout.write("mkdir -p /tmp/$USER/"+opts.taskname+" \n")
1091  fout.writelines(output_file_list1)
1092  fout.writelines(output_file_list2)
1093  fout.write("\n")
1094  fout.write("echo \"xrdcp -f $FILE root://eoscms//eos/cms$OUT_DIR\" \n")
1095  fout.write("xrdcp -f $FILE root://eoscms//eos/cms$OUT_DIR \n")
1096  fout.write("echo \"Harvesting for "+opts.taskname+" task is complete; please find output at $OUT_DIR \" | mail -s \"Harvesting for " +opts.taskname +" completed\" $MAIL \n")
1097 
1098  os.system("chmod u+x "+hadd_script_file)
1099 
1100  harvest_conditions = '"' + " && ".join(["ended(" + jobId + ")" for jobId in batchJobIds]) + '"'
1101  print(harvest_conditions)
1102  lastJobCommand = "bsub -o harvester"+opts.taskname+".tmp -q 1nh -w "+harvest_conditions+" "+hadd_script_file
1103  print(lastJobCommand)
1104  if opts.submit:
1105  lastJobOutput = getCommandOutput(lastJobCommand)
1106  print(lastJobOutput)
1107 
1108  fout.close()
1109  del output_file_list1
1110 
1111 
1112 if __name__ == "__main__":
1113  main()
1114 
1115 
1116 
1117 
def write_HTCondor_submit_file(path, logs, name, nruns, proxy_path=None)
— Classes —############################
const bool isValid(const Frame &aFrame, const FrameQuality &aQuality, const uint16_t aExpectedPos)
def getLuminosity(homedir, minRun, maxRun, isRunBased, verbose)
ALPAKA_FN_HOST_ACC ALPAKA_FN_INLINE constexpr float zip(ConstView const &tracks, int32_t i)
Definition: TracksSoA.h:90
def __init__(self, dataset, job_number, job_id, job_name, isDA, isMC, applyBOWS, applyEXTRACOND, extraconditions, runboundary, lumilist, intlumi, maxevents, gt, allFromGT, alignmentDB, alignmentTAG, apeDB, apeTAG, bowDB, bowTAG, vertextype, tracktype, refittertype, ttrhtype, applyruncontrol, ptcut, CMSSW_dir, the_dir)
def replace(string, replacements)
def getResultingSection(self, section, defaultDict={}, demandPars=[])
def mkdir_eos(out_path)
method to create recursively directories on EOS #############
def createTheBashFile(self, isUnitTest)
def ConfigSectionMap(config, section)
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
static std::string join(char **cmd)
Definition: RemoteFile.cc:19
Definition: main.py:1
#define str(s)
if(threadIdxLocalY==0 &&threadIdxLocalX==0)
def setEOSout(self, theEOSdir)