CMS 3D CMS Logo

submitPVValidationJobs.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 
3 '''Script that submits CMS Tracker Alignment Primary Vertex Validation workflows,
4 usage:
5 
6 submitPVValidationJobs.py -j TEST -D /HLTPhysics/Run2016C-TkAlMinBias-07Dec2018-v1/ALCARECO -i testPVValidation_Relvals_DATA.ini -r
7 '''
8 
9 from __future__ import print_function
10 from builtins import range
11 
12 __author__ = 'Marco Musich'
13 __copyright__ = 'Copyright 2020, CERN CMS'
14 __credits__ = ['Ernesto Migliore', 'Salvatore Di Guida']
15 __license__ = 'Unknown'
16 __maintainer__ = 'Marco Musich'
17 __email__ = 'marco.musich@cern.ch'
18 __version__ = 1
19 
20 import datetime,time
21 import os,sys
22 import copy
23 import pickle
24 import string, re
25 import configparser as ConfigParser
26 import json
27 import pprint
28 import subprocess
29 from optparse import OptionParser
30 from subprocess import Popen, PIPE
31 import collections
32 import warnings
33 import shutil
34 import multiprocessing
35 from enum import Enum
36 
37 class RefitType(Enum):
38  STANDARD = 1
39  COMMON = 2
40 
41 CopyRights = '##################################\n'
42 CopyRights += '# submitPVValidationJobs.py #\n'
43 CopyRights += '# marco.musich@cern.ch #\n'
44 CopyRights += '# April 2020 #\n'
45 CopyRights += '##################################\n'
46 
47 
49 
50  """Check if GRID proxy has been initialized."""
51 
52  try:
53  with open(os.devnull, "w") as dump:
54  subprocess.check_call(["voms-proxy-info", "--exists"],
55  stdout = dump, stderr = dump)
56  except subprocess.CalledProcessError:
57  return False
58  return True
59 
60 
61 def forward_proxy(rundir):
62 
63  """Forward proxy to location visible from the batch system.
64  Arguments:
65  - `rundir`: directory for storing the forwarded proxy
66  """
67 
68  if not check_proxy():
69  print("Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
70  sys.exit(1)
71 
72  local_proxy = subprocess.check_output(["voms-proxy-info", "--path"]).strip()
73  shutil.copyfile(local_proxy, os.path.join(rundir,".user_proxy"))
74 
75 
76 def write_HTCondor_submit_file(path, name, nruns, proxy_path=None):
77 
78  """Writes 'job.submit' file in `path`.
79  Arguments:
80  - `path`: job directory
81  - `script`: script to be executed
82  - `proxy_path`: path to proxy (only used in case of requested proxy forward)
83  """
84 
85  job_submit_template="""\
86 universe = vanilla
87 requirements = (OpSysAndVer =?= "CentOS7")
88 executable = {script:s}
89 output = {jobm:s}/{out:s}.out
90 error = {jobm:s}/{out:s}.err
91 log = {jobm:s}/{out:s}.log
92 transfer_output_files = ""
93 +JobFlavour = "{flavour:s}"
94 queue {njobs:s}
95 """
96  if proxy_path is not None:
97  job_submit_template += """\
98 +x509userproxy = "{proxy:s}"
99 """
100 
101  job_submit_file = os.path.join(path, "job_"+name+".submit")
102  with open(job_submit_file, "w") as f:
103  f.write(job_submit_template.format(script = os.path.join(path,name+"_$(ProcId).sh"),
104  out = name+"_$(ProcId)",
105  jobm = os.path.abspath(path),
106  flavour = "tomorrow",
107  njobs = str(nruns),
108  proxy = proxy_path))
109 
110  return job_submit_file
111 
112 
113 def getCommandOutput(command):
114 
115  """This function executes `command` and returns it output.
116  Arguments:
117  - `command`: Shell command to be invoked by this function.
118  """
119  child = os.popen(command)
120  data = child.read()
121  err = child.close()
122  if err:
123  print('%s failed w/ exit code %d' % (command, err))
124  return data
125 
126 
127 def getFilesForRun(blob):
128 
129  cmd2 = ' dasgoclient -limit=0 -query \'file run='+blob[0][0]+' dataset='+blob[0][1]+ (' instance='+blob[1]+'\'' if (blob[1] is not None) else '\'')
130  #cmd2 = 'dasgoclient -query \'file run='+blob[0]+' dataset='+blob[1]+'\''
131  q = Popen(cmd2 , shell=True, stdout=PIPE, stderr=PIPE)
132  out, err = q.communicate()
133  #print(cmd2,'\n',out.rstrip('\n'))
134  outputList = out.decode().split('\n')
135  outputList.pop()
136  return outputList #,err
137 
138 
139 def getNEvents(run, dataset):
140 
141  nEvents = subprocess.check_output(["das_client", "--limit", "0", "--query", "summary run={} dataset={} | grep summary.nevents".format(run, dataset)])
142  return 0 if nEvents == "[]\n" else int(nEvents)
143 
144 
145 def getLuminosity(homedir,minRun,maxRun,isRunBased,verbose):
146 
147  """Expects something like
148  +-------+------+--------+--------+-------------------+------------------+
149  | nfill | nrun | nls | ncms | totdelivered(/fb) | totrecorded(/fb) |
150  +-------+------+--------+--------+-------------------+------------------+
151  | 73 | 327 | 142418 | 138935 | 19.562 | 18.036 |
152  +-------+------+--------+--------+-------------------+------------------+
153  And extracts the total recorded luminosity (/b).
154  """
155  myCachedLumi={}
156  if(not isRunBased):
157  return myCachedLumi
158 
159  try:
160  #output = subprocess.check_output([homedir+"/.local/bin/brilcalc", "lumi", "-b", "STABLE BEAMS", "--normtag=/afs/cern.ch/user/l/lumipro/public/normtag_file/normtag_BRIL.json", "-u", "/pb", "--begin", str(minRun),"--end",str(maxRun),"--output-style","csv"])
161 
162  output = subprocess.check_output([homedir+"/.local/bin/brilcalc", "lumi", "-b", "STABLE BEAMS","-u", "/pb", "--begin", str(minRun),"--end",str(maxRun),"--output-style","csv","-c","web"])
163  except:
164  warnings.warn('ATTENTION! Impossible to query the BRIL DB!')
165  return myCachedLumi
166 
167  if(verbose):
168  print("INSIDE GET LUMINOSITY")
169  print(output)
170 
171  for line in output.decode().split("\n"):
172  if ("#" not in line):
173  runToCache = line.split(",")[0].split(":")[0]
174  lumiToCache = line.split(",")[-1].replace("\r", "")
175  #print "run",runToCache
176  #print "lumi",lumiToCache
177  myCachedLumi[runToCache] = lumiToCache
178 
179  if(verbose):
180  print(myCachedLumi)
181  return myCachedLumi
182 
183 
184 def isInJSON(run,jsonfile):
185 
186  try:
187  with open(jsonfile, 'r') as myJSON: jsonDATA = json.load(myJSON)
188  return (run in jsonDATA)
189  except:
190  warnings.warn('ATTENTION! Impossible to find lumi mask! All runs will be used.')
191  return True
192 
193 
194 def as_dict(config):
196  dictionary = {}
197  for section in config.sections():
198  dictionary[section] = {}
199  for option in config.options(section):
200  dictionary[section][option] = config.get(section, option)
201 
202  return dictionary
203 
204 
205 def to_bool(value):
207  """
208  Converts 'something' to boolean. Raises exception for invalid formats
209  Possible True values: 1, True, "1", "TRue", "yes", "y", "t"
210  Possible False values: 0, False, None, [], {}, "", "0", "faLse", "no", "n", "f", 0.0, ...
211  """
212  if str(value).lower() in ("yes", "y", "true", "t", "1"): return True
213  if str(value).lower() in ("no", "n", "false", "f", "0", "0.0", "", "none", "[]", "{}"): return False
214  raise Exception('Invalid value for boolean conversion: ' + str(value))
215 
216 
217 def updateDB2():
219  dbName = "runInfo.pkl"
220  infos = {}
221  if os.path.exists(dbName):
222  with open(dbName,'rb') as f:
223  infos = pickle.load(f)
224 
225  for f in glob.glob("root-files/Run*.root"):
226  run = runFromFilename(f)
227  if run not in infos:
228  infos[run] = {}
229  infos[run]["start_time"] = getRunStartTime(run)
230  infos["isValid"] = isValid(f)
231 
232  with open(dbName, "wb") as f:
233  pickle.dump(infos, f)
234 
235 
236 def updateDB(run,runInfo):
238  dbName = "runInfo.pkl"
239  infos = {}
240  if os.path.exists(dbName):
241  with open(dbName,'rb') as f:
242  infos = pickle.load(f)
243 
244  if run not in infos:
245  infos[run] = runInfo
246 
247  with open(dbName, "wb") as f:
248  pickle.dump(infos, f)
249 
250 
251 class BetterConfigParser(ConfigParser.ConfigParser):
253 
254  def optionxform(self, optionstr):
255  return optionstr
256 
257 
258  def exists( self, section, option):
259  try:
260  items = self.items(section)
261  except ConfigParser.NoSectionError:
262  return False
263  for item in items:
264  if item[0] == option:
265  return True
266  return False
267 
268 
269  def __updateDict( self, dictionary, section ):
270  result = dictionary
271  try:
272  for option in self.options( section ):
273  result[option] = self.get( section, option )
274  if "local"+section.title() in self.sections():
275  for option in self.options( "local"+section.title() ):
276  result[option] = self.get( "local"+section.title(),option )
277  except ConfigParser.NoSectionError as section:
278  msg = ("%s in configuration files. This section is mandatory."
279  %(str(section).replace(":", "", 1)))
280  #raise AllInOneError(msg)
281  return result
282 
283 
284  def getResultingSection( self, section, defaultDict = {}, demandPars = [] ):
285  result = copy.deepcopy(defaultDict)
286  for option in demandPars:
287  try:
288  result[option] = self.get( section, option )
289  except ConfigParser.NoOptionError as globalSectionError:
290  globalSection = str( globalSectionError ).split( "'" )[-2]
291  splittedSectionName = section.split( ":" )
292  if len( splittedSectionName ) > 1:
293  localSection = ("local"+section.split( ":" )[0].title()+":"
294  +section.split(":")[1])
295  else:
296  localSection = ("local"+section.split( ":" )[0].title())
297  if self.has_section( localSection ):
298  try:
299  result[option] = self.get( localSection, option )
300  except ConfigParser.NoOptionError as option:
301  msg = ("%s. This option is mandatory."
302  %(str(option).replace(":", "", 1).replace(
303  "section",
304  "section '"+globalSection+"' or", 1)))
305  #raise AllInOneError(msg)
306  else:
307  msg = ("%s. This option is mandatory."
308  %(str(globalSectionError).replace(":", "", 1)))
309  #raise AllInOneError(msg)
310  result = self.__updateDict( result, section )
311  #print(result)
312  return result
313 
314 
315 def ConfigSectionMap(config, section):
316  the_dict = {}
317  options = config.options(section)
318  for option in options:
319  try:
320  the_dict[option] = config.get(section, option)
321  if the_dict[option] == -1:
322  DebugPrint("skip: %s" % option)
323  except:
324  print("exception on %s!" % option)
325  the_dict[option] = None
326  return the_dict
327 
328 
329 def mkdir_eos(out_path):
330  print("creating",out_path)
331  newpath='/'
332  for dir in out_path.split('/'):
333  newpath=os.path.join(newpath,dir)
334  # do not issue mkdir from very top of the tree
335  if newpath.find('test_out') > 0:
336  #getCommandOutput("eos mkdir"+newpath)
337  command="/afs/cern.ch/project/eos/installation/cms/bin/eos.select mkdir "+newpath
338  p = subprocess.Popen(command,shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
339  (out, err) = p.communicate()
340  #print(out,err)
341  p.wait()
342 
343  # now check that the directory exists
344  command2="/afs/cern.ch/project/eos/installation/cms/bin/eos.select ls "+out_path
345  p = subprocess.Popen(command2,shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
346  (out, err) = p.communicate()
347  p.wait()
348  if p.returncode !=0:
349  print(out)
350 
351 def split(sequence, size):
357  for i in range(0, len(sequence), size):
358  yield sequence[i:i+size]
359 
360 
361 class Job:
363 
364  def __init__(self,dataset, job_number, job_id, job_name, isDA, isMC, applyBOWS, applyEXTRACOND, extraconditions, runboundary, lumilist, intlumi, maxevents, gt, allFromGT, alignmentDB, alignmentTAG, apeDB, apeTAG, bowDB, bowTAG, vertextype, tracktype, refittertype, ttrhtype, applyruncontrol, ptcut, CMSSW_dir ,the_dir):
366 
367  theDataSet = dataset.split("/")[1]+"_"+(dataset.split("/")[2]).split("-")[0]
369  self.data = theDataSet
370  self.job_number = job_number
371  self.job_id = job_id
372  self.batch_job_id = None
373  self.job_name = job_name
374 
375  self.isDA = isDA
376  self.isMC = isMC
377  self.applyBOWS = applyBOWS
378  self.applyEXTRACOND = applyEXTRACOND
379  self.extraCondVect = extraconditions
380  self.runboundary = runboundary
381  self.lumilist = lumilist
382  self.intlumi = intlumi
383  self.maxevents = maxevents
384  self.gt = gt
385  self.allFromGT = allFromGT
386  self.alignmentDB = alignmentDB
387  self.alignmentTAG = alignmentTAG
388  self.apeDB = apeDB
389  self.apeTAG = apeTAG
390  self.bowDB = bowDB
391  self.bowTAG = bowTAG
392  self.vertextype = vertextype
393  self.tracktype = tracktype
394  self.refittertype = refittertype
395  self.ttrhtype = ttrhtype
396  self.applyruncontrol = applyruncontrol
397  self.ptcut = ptcut
399  self.the_dir=the_dir
400  self.CMSSW_dir=CMSSW_dir
402  self.output_full_name=self.getOutputBaseName()+"_"+str(self.job_id)
403  self.output_number_name=self.getOutputBaseNameWithData()+"_"+str(self.job_number)
404 
405  self.cfg_dir=None
406  self.outputCfgName=None
407 
408  # LSF variables
409  self.LSF_dir=None
410  self.BASH_dir=None
411  self.output_LSF_name=None
412  self.output_BASH_name=None
414  self.lfn_list=list()
416  def __del__(self):
418  del self.lfn_list
419 
420  def setEOSout(self,theEOSdir):
422  self.OUTDIR = theEOSdir
423 
424  def getOutputBaseName(self):
426  return "PVValidation_"+self.job_name
427 
428  def getOutputBaseNameWithData(self):
430  return "PVValidation_"+self.job_name+"_"+self.data
431 
432  def createTheCfgFile(self,lfn):
434 
435  global CopyRights
436  # write the cfg file
437 
438  self.cfg_dir = os.path.join(self.the_dir,"cfg")
439  if not os.path.exists(self.cfg_dir):
440  os.makedirs(self.cfg_dir)
441 
442  self.outputCfgName=self.output_full_name+"_cfg.py"
443  fout=open(os.path.join(self.cfg_dir,self.outputCfgName),'w')
445  template_cfg_file = os.path.join(self.the_dir,"PVValidation_T_cfg.py")
446  file = open(template_cfg_file,'r')
448  config_txt = '\n\n' + CopyRights + '\n\n'
449  config_txt += file.read()
450  config_txt=config_txt.replace("ISDATEMPLATE",self.isDA)
451  config_txt=config_txt.replace("ISMCTEMPLATE",self.isMC)
452  config_txt=config_txt.replace("APPLYBOWSTEMPLATE",self.applyBOWS)
453  config_txt=config_txt.replace("EXTRACONDTEMPLATE",self.applyEXTRACOND)
454  config_txt=config_txt.replace("USEFILELISTTEMPLATE","True")
455  config_txt=config_txt.replace("RUNBOUNDARYTEMPLATE",self.runboundary)
456  config_txt=config_txt.replace("LUMILISTTEMPLATE",self.lumilist)
457  config_txt=config_txt.replace("MAXEVENTSTEMPLATE",self.maxevents)
458  config_txt=config_txt.replace("GLOBALTAGTEMPLATE",self.gt)
459  config_txt=config_txt.replace("ALLFROMGTTEMPLATE",self.allFromGT)
460  config_txt=config_txt.replace("ALIGNOBJTEMPLATE",self.alignmentDB)
461  config_txt=config_txt.replace("GEOMTAGTEMPLATE",self.alignmentTAG)
462  config_txt=config_txt.replace("APEOBJTEMPLATE",self.apeDB)
463  config_txt=config_txt.replace("ERRORTAGTEMPLATE",self.apeTAG)
464  config_txt=config_txt.replace("BOWSOBJECTTEMPLATE",self.bowDB)
465  config_txt=config_txt.replace("BOWSTAGTEMPLATE",self.bowTAG)
466  config_txt=config_txt.replace("VERTEXTYPETEMPLATE",self.vertextype)
467  config_txt=config_txt.replace("TRACKTYPETEMPLATE",self.tracktype)
468  config_txt=config_txt.replace("REFITTERTEMPLATE",self.refittertype)
469  config_txt=config_txt.replace("TTRHBUILDERTEMPLATE",self.ttrhtype)
470  config_txt=config_txt.replace("PTCUTTEMPLATE",self.ptcut)
471  config_txt=config_txt.replace("INTLUMITEMPLATE",self.intlumi)
472  config_txt=config_txt.replace("RUNCONTROLTEMPLATE",self.applyruncontrol)
473  lfn_with_quotes = map(lambda x: "\'"+x+"\'",lfn)
474  config_txt=config_txt.replace("FILESOURCETEMPLATE","["+",".join(lfn_with_quotes)+"]")
475  config_txt=config_txt.replace("OUTFILETEMPLATE",self.output_full_name+".root")
476 
477 
478  textToWrite=''
479  for element in self.extraCondVect :
480  if("Rcd" in element):
481  params = self.extraCondVect[element].split(',')
482  text = '''\n
483  process.conditionsIn{record} = CalibTracker.Configuration.Common.PoolDBESSource_cfi.poolDBESSource.clone(
484  connect = cms.string('{database}'),
485  toGet = cms.VPSet(cms.PSet(record = cms.string('{record}'),
486  tag = cms.string('{tag}'),
487  label = cms.untracked.string('{label}')
488  )
489  )
490  )
491  process.prefer_conditionsIn{record} = cms.ESPrefer("PoolDBESSource", "conditionsIn{record}")
492  '''.format(record = element, database = params[0], tag = params[1], label = (params[2] if len(params)>2 else ''))
493  textToWrite+=text
494 
495  if(self.applyEXTRACOND=="True"):
496  if not self.extraCondVect:
497  raise Exception('Requested extra conditions, but none provided')
498 
499  config_txt=config_txt.replace("END OF EXTRA CONDITIONS",textToWrite)
500  else:
501  print("INFO: Will not apply any extra conditions")
502  pass
503 
504  fout.write(config_txt)
505 
506  file.close()
507  fout.close()
508 
509  def createTheLSFFile(self):
511 
512  # directory to store the LSF to be submitted
513  self.LSF_dir = os.path.join(self.the_dir,"LSF")
514  if not os.path.exists(self.LSF_dir):
515  os.makedirs(self.LSF_dir)
516 
517  self.output_LSF_name=self.output_full_name+".lsf"
518  fout=open(os.path.join(self.LSF_dir,self.output_LSF_name),'w')
519 
520  job_name = self.output_full_name
521 
522  log_dir = os.path.join(self.the_dir,"log")
523  if not os.path.exists(log_dir):
524  os.makedirs(log_dir)
525 
526  fout.write("#!/bin/sh \n")
527  fout.write("#BSUB -L /bin/sh\n")
528  fout.write("#BSUB -J "+job_name+"\n")
529  fout.write("#BSUB -o "+os.path.join(log_dir,job_name+".log")+"\n")
530  fout.write("#BSUB -q cmscaf1nd \n")
531  fout.write("JobName="+job_name+" \n")
532  fout.write("OUT_DIR="+self.OUTDIR+" \n")
533  fout.write("LXBATCH_DIR=`pwd` \n")
534  fout.write("cd "+os.path.join(self.CMSSW_dir,"src")+" \n")
535  fout.write("eval `scram runtime -sh` \n")
536  fout.write("cd $LXBATCH_DIR \n")
537  fout.write("cmsRun "+os.path.join(self.cfg_dir,self.outputCfgName)+" \n")
538  fout.write("ls -lh . \n")
539  fout.write("for RootOutputFile in $(ls *root ); do xrdcp -f ${RootOutputFile} root://eoscms//eos/cms${OUT_DIR}/${RootOutputFile} ; done \n")
540  fout.write("for TxtOutputFile in $(ls *txt ); do xrdcp -f ${TxtOutputFile} root://eoscms//eos/cms${OUT_DIR}/${TxtOutputFile} ; done \n")
541 
542  fout.close()
543 
544 
545  def createTheBashFile(self):
547 
548  # directory to store the BASH to be submitted
549  self.BASH_dir = os.path.join(self.the_dir,"BASH")
550  if not os.path.exists(self.BASH_dir):
551  os.makedirs(self.BASH_dir)
552 
553  self.output_BASH_name=self.output_number_name+".sh"
554  fout=open(os.path.join(self.BASH_dir,self.output_BASH_name),'w')
555 
556  job_name = self.output_full_name
557 
558  log_dir = os.path.join(self.the_dir,"log")
559  if not os.path.exists(log_dir):
560  os.makedirs(log_dir)
561 
562  fout.write("#!/bin/bash \n")
563  #fout.write("export EOS_MGM_URL=root://eoscms.cern.ch \n")
564  fout.write("JobName="+job_name+" \n")
565  fout.write("echo \"Job started at \" `date` \n")
566  fout.write("CMSSW_DIR="+os.path.join(self.CMSSW_dir,"src")+" \n")
567  fout.write("export X509_USER_PROXY=$CMSSW_DIR/Alignment/OfflineValidation/test/.user_proxy \n")
568  fout.write("OUT_DIR="+self.OUTDIR+" \n")
569  fout.write("LXBATCH_DIR=$PWD \n")
570  #fout.write("cd "+os.path.join(self.CMSSW_dir,"src")+" \n")
571  fout.write("cd ${CMSSW_DIR} \n")
572  fout.write("eval `scramv1 runtime -sh` \n")
573  fout.write("echo \"batch dir: $LXBATCH_DIR release: $CMSSW_DIR release base: $CMSSW_RELEASE_BASE\" \n")
574  fout.write("cd $LXBATCH_DIR \n")
575  fout.write("cp "+os.path.join(self.cfg_dir,self.outputCfgName)+" . \n")
576  fout.write("echo \"cmsRun "+self.outputCfgName+"\" \n")
577  fout.write("cmsRun "+self.outputCfgName+" \n")
578  fout.write("echo \"Content of working dir is \"`ls -lh` \n")
579  #fout.write("less condor_exec.exe \n")
580  fout.write("for RootOutputFile in $(ls *root ); do xrdcp -f ${RootOutputFile} root://eoscms//eos/cms${OUT_DIR}/${RootOutputFile} ; done \n")
581  #fout.write("mv ${JobName}.out ${CMSSW_DIR}/BASH \n")
582  fout.write("echo \"Job ended at \" `date` \n")
583  fout.write("exit 0 \n")
584 
585  fout.close()
586 
587  def getOutputFileName(self):
589  return os.path.join(self.OUTDIR,self.output_full_name+".root")
590 
591  def submit(self):
593  print("submit job", self.job_id)
594  job_name = self.output_full_name
595  submitcommand1 = "chmod u+x " + os.path.join(self.LSF_dir,self.output_LSF_name)
596  child1 = os.system(submitcommand1)
597  #submitcommand2 = "bsub < "+os.path.join(self.LSF_dir,self.output_LSF_name)
598  #child2 = os.system(submitcommand2)
599  self.batch_job_id = getCommandOutput("bsub < "+os.path.join(self.LSF_dir,self.output_LSF_name))
600 
601  def getBatchjobId(self):
603  return self.batch_job_id.split("<")[1].split(">")[0]
604 
605 
606 def main():
608 
609 
610  if not check_proxy():
611  print("Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
612  sys.exit(1)
613 
614 
615  forward_proxy(".")
616 
617  global CopyRights
618  print('\n'+CopyRights)
619 
620  HOME = os.environ.get('HOME')
622  # CMSSW section
623  input_CMSSW_BASE = os.environ.get('CMSSW_BASE')
624  AnalysisStep_dir = os.path.join(input_CMSSW_BASE,"src/Alignment/OfflineValidation/test")
625  lib_path = os.path.abspath(AnalysisStep_dir)
626  sys.path.append(lib_path)
627 
628 
629  srcFiles = []
631  desc="""This is a description of %prog."""
632  parser = OptionParser(description=desc,version='%prog version 0.1')
633  parser.add_option('-s','--submit', help='job submitted', dest='submit', action='store_true', default=False)
634  parser.add_option('-j','--jobname', help='task name', dest='taskname', action='store', default='myTask')
635  parser.add_option('-D','--dataset', help='selected dataset', dest='data', action='store', default='')
636  parser.add_option('-r','--doRunBased',help='selected dataset', dest='doRunBased', action='store_true' , default=False)
637  parser.add_option('-i','--input', help='set input configuration (overrides default)', dest='inputconfig',action='store',default=None)
638  parser.add_option('-b','--begin', help='starting point', dest='start', action='store', default='1')
639  parser.add_option('-e','--end', help='ending point', dest='end', action='store', default='999999')
640  parser.add_option('-v','--verbose', help='verbose output', dest='verbose', action='store_true', default=False)
641  parser.add_option('-u','--unitTest', help='unit tests?', dest='isUnitTest', action='store_true', default=False)
642  parser.add_option('-I','--instance', help='DAS instance to use', dest='instance', action='store', default=None)
643  (opts, args) = parser.parse_args()
645  now = datetime.datetime.now()
646  #t = now.strftime("test_%Y_%m_%d_%H_%M_%S_")
647  #t = "2016UltraLegacy"
648  #t = "2017UltraLegacy"
649  #t = "2018UltraLegacy"
650  t=""
651  t+=opts.taskname
652 
653  USER = os.environ.get('USER')
654  eosdir=os.path.join("/store/group/alca_trackeralign",USER,"test_out",t)
655 
656  if opts.submit:
657  mkdir_eos(eosdir)
658  else:
659  print("Not going to create EOS folder. -s option has not been chosen")
660 
661 
662 
663  jobName = []
664  isMC = []
665  isDA = []
666  doRunBased = []
667  maxevents = []
668 
669  gt = []
670  allFromGT = []
671  applyEXTRACOND = []
672  extraCondVect = []
673  alignmentDB = []
674  alignmentTAG = []
675  apeDB = []
676  apeTAG = []
677  applyBOWS = []
678  bowDB = []
679  bowTAG = []
680  conditions = []
681 
682  vertextype = []
683  tracktype = []
684  refittertype = []
685  ttrhtype = []
686 
687  applyruncontrol = []
688  ptcut = []
689  runboundary = []
690  lumilist = []
691 
692  ConfigFile = opts.inputconfig
693 
694  if ConfigFile is not None:
695 
696  print("********************************************************")
697  print("* Parsing from input file:", ConfigFile," ")
698 
699  config = BetterConfigParser()
700  config.read(ConfigFile)
701 
702  print("Parsed the following configuration \n\n")
703  inputDict = as_dict(config)
704  pprint.pprint(inputDict)
705 
706  if(not bool(inputDict)):
707  raise SystemExit("\n\n ERROR! Could not parse any input file, perhaps you are submitting this from the wrong folder? \n\n")
708 
709  #print config.sections()
710 
711  # please notice: since in principle one wants to run on several different samples simultaneously,
712  # all these inputs are vectors
713 
714  doRunBased = opts.doRunBased
715 
716  listOfValidations = config.getResultingSection("validations")
717 
718  for item in listOfValidations:
719  if (bool(listOfValidations[item]) == True):
720 
721  jobName.append(ConfigSectionMap(config,"Conditions:"+item)['jobname'])
722  isDA.append(ConfigSectionMap(config,"Job")['isda'])
723  isMC.append(ConfigSectionMap(config,"Job")['ismc'])
724  maxevents.append(ConfigSectionMap(config,"Job")['maxevents'])
725 
726  gt.append(ConfigSectionMap(config,"Conditions:"+item)['gt'])
727  allFromGT.append(ConfigSectionMap(config,"Conditions:"+item)['allFromGT'])
728  applyEXTRACOND.append(ConfigSectionMap(config,"Conditions:"+item)['applyextracond'])
729  conditions.append(config.getResultingSection("ExtraConditions"))
730 
731  alignmentDB.append(ConfigSectionMap(config,"Conditions:"+item)['alignmentdb'])
732  alignmentTAG.append(ConfigSectionMap(config,"Conditions:"+item)['alignmenttag'])
733  apeDB.append(ConfigSectionMap(config,"Conditions:"+item)['apedb'])
734  apeTAG.append(ConfigSectionMap(config,"Conditions:"+item)['apetag'])
735  applyBOWS.append(ConfigSectionMap(config,"Conditions:"+item)['applybows'])
736  bowDB.append(ConfigSectionMap(config,"Conditions:"+item)['bowdb'])
737  bowTAG.append(ConfigSectionMap(config,"Conditions:"+item)['bowtag'])
738 
739  vertextype.append(ConfigSectionMap(config,"Type")['vertextype'])
740  tracktype.append(ConfigSectionMap(config,"Type")['tracktype'])
741 
742 
743 
744  if(config.exists("Refit","refittertype")):
745  refittertype.append(ConfigSectionMap(config,"Refit")['refittertype'])
746  else:
747  refittertype.append(str(RefitType.COMMON))
748 
749  if(config.exists("Refit","ttrhtype")):
750  ttrhtype.append(ConfigSectionMap(config,"Refit")['ttrhtype'])
751  else:
752  ttrhtype.append("WithAngleAndTemplate")
753 
754  applyruncontrol.append(ConfigSectionMap(config,"Selection")['applyruncontrol'])
755  ptcut.append(ConfigSectionMap(config,"Selection")['ptcut'])
756  runboundary.append(ConfigSectionMap(config,"Selection")['runboundary'])
757  lumilist.append(ConfigSectionMap(config,"Selection")['lumilist'])
758  else :
759 
760  print("********************************************************")
761  print("* Parsing from command line *")
762  print("********************************************************")
763 
764  jobName = ['testing']
765  isDA = ['True']
766  isMC = ['True']
767  doRunBased = opts.doRunBased
768  maxevents = ['10000']
769 
770  gt = ['74X_dataRun2_Prompt_v4']
771  allFromGT = ['False']
772  applyEXTRACOND = ['False']
773  conditions = [[('SiPixelTemplateDBObjectRcd','frontier://FrontierProd/CMS_CONDITIONS','SiPixelTemplateDBObject_38T_2015_v3_hltvalidation')]]
774  alignmentDB = ['frontier://FrontierProd/CMS_CONDITIONS']
775  alignmentTAG = ['TrackerAlignment_Prompt']
776  apeDB = ['frontier://FrontierProd/CMS_CONDITIONS']
777  apeTAG = ['TrackerAlignmentExtendedErr_2009_v2_express_IOVs']
778  applyBOWS = ['True']
779  bowDB = ['frontier://FrontierProd/CMS_CONDITIONS']
780  bowTAG = ['TrackerSurafceDeformations_v1_express']
781 
782  vertextype = ['offlinePrimaryVertices']
783  tracktype = ['ALCARECOTkAlMinBias']
784 
785  applyruncontrol = ['False']
786  ptcut = ['3']
787  runboundary = ['1']
788  lumilist = ['']
789 
790  # print some of the configuration
791 
792  print("********************************************************")
793  print("* Configuration info *")
794  print("********************************************************")
795  print("- submitted : ",opts.submit)
796  print("- taskname : ",opts.taskname)
797  print("- Jobname : ",jobName)
798  print("- use DA : ",isDA)
799  print("- is MC : ",isMC)
800  print("- is run-based: ",doRunBased)
801  print("- evts/job : ",maxevents)
802  print("- GlobatTag : ",gt)
803  print("- allFromGT? : ",allFromGT)
804  print("- extraCond? : ",applyEXTRACOND)
805  print("- extraCond : ",conditions)
806  print("- Align db : ",alignmentDB)
807  print("- Align tag : ",alignmentTAG)
808  print("- APE db : ",apeDB)
809  print("- APE tag : ",apeTAG)
810  print("- use bows? : ",applyBOWS)
811  print("- K&B db : ",bowDB)
812  print("- K&B tag : ",bowTAG)
813  print("- VertexColl : ",vertextype)
814  print("- TrackColl : ",tracktype)
815  print("- RefitterSeq : ",refittertype)
816  print("- TTRHBuilder : ",ttrhtype)
817  print("- RunControl? : ",applyruncontrol)
818  print("- Pt> ",ptcut)
819  print("- run= ",runboundary)
820  print("- JSON : ",lumilist)
821  print("- Out Dir : ",eosdir)
822 
823  print("********************************************************")
824  print("Will run on",len(jobName),"workflows")
825 
826  myRuns = []
827  mylist = {}
829  if(doRunBased):
830  print(">>>> This is Data!")
831  print(">>>> Doing run based selection")
832  cmd = 'dasgoclient -limit=0 -query \'run dataset='+opts.data + (' instance='+opts.instance+'\'' if (opts.instance is not None) else '\'')
833  p = Popen(cmd , shell=True, stdout=PIPE, stderr=PIPE)
834  out, err = p.communicate()
835  #print(out)
836  listOfRuns=out.decode().split("\n")
837  listOfRuns.pop()
838  listOfRuns.sort()
839  print("Will run on ",len(listOfRuns),"runs: \n",listOfRuns)
840 
841  mytuple=[]
843  print("first run:",opts.start,"last run:",opts.end)
844 
845  for run in listOfRuns:
846  if (int(run)<int(opts.start) or int(run)>int(opts.end)):
847  print("excluding",run)
848  continue
849 
850  if not isInJSON(run,lumilist[0]):
851  continue
852 
853  else:
854  print("'======> taking",run)
855  #print "preparing run",run
856  #if(int(run)%100==0):
857  mytuple.append((run,opts.data))
858 
859  #print mytuple
860 
861  instances=[opts.instance for entry in mytuple]
862  pool = multiprocessing.Pool(processes=20) # start 20 worker processes
863  count = pool.map(getFilesForRun,zip(mytuple,instances))
864  file_info = dict(zip(listOfRuns, count))
866  #print file_info
867 
868  for run in listOfRuns:
869  if (int(run)<int(opts.start) or int(run)>int(opts.end)):
870  print('rejecting run',run,' becasue outside of boundaries')
871  continue
872 
873  if not isInJSON(run,lumilist[0]):
874  print('rejecting run',run,' becasue outside not in JSON')
875  continue
876 
877  #if(int(run)%100==0):
878  # print "preparing run",run
879  myRuns.append(run)
880  #cmd2 = ' das_client --limit=0 --query \'file run='+run+' dataset='+opts.data+'\''
881  #q = Popen(cmd2 , shell=True, stdout=PIPE, stderr=PIPE)
882  #out2, err2 = q.communicate()
883 
884  #out2=getFilesForRun((run,opts.data))
885  #print out2
886  #pool.map(getFilesForRun,run,opts.data)
887 
888 
889  #if run in file_info:
890  #mylist[run] = file_info[run]
891  #print run,mylist[run]
892  #mylist[run] = out2.split('\n')
893  #print mylist
894  #mylist[run].pop()
895  #print mylist
896 
897  od = collections.OrderedDict(sorted(file_info.items()))
898  # print od
899 
900 
901  if(len(myRuns)==0):
902  if(opts.isUnitTest):
903  print('\n')
904  print('=' * 70)
905  print("|| WARNING: won't run on any run, probably DAS returned an empty query,\n|| but that's fine because this is a unit test!")
906  print('=' * 70)
907  print('\n')
908  sys.exit(0)
909  else:
910  raise Exception('Will not run on any run.... please check again the configuration')
911  else:
912  # get from the DB the int luminosities
913  myLumiDB = getLuminosity(HOME,myRuns[0],myRuns[-1],doRunBased,opts.verbose)
915  if(opts.verbose):
916  pprint.pprint(myLumiDB)
917 
918  # start loop on samples
919  for iConf in range(len(jobName)):
920  print("This is Task n.",iConf+1,"of",len(jobName))
921 
922 
923 
924  # for hadd script
925  scripts_dir = os.path.join(AnalysisStep_dir,"scripts")
926  if not os.path.exists(scripts_dir):
927  os.makedirs(scripts_dir)
928  hadd_script_file = os.path.join(scripts_dir,jobName[iConf]+"_"+opts.taskname+".sh")
929  fout = open(hadd_script_file,'w')
930 
931  output_file_list1=list()
932  output_file_list2=list()
933  output_file_list2.append("hadd ")
934 
935  inputFiles = []
937  if (to_bool(isMC[iConf]) or (not to_bool(doRunBased))):
938  if(to_bool(isMC[iConf])):
939  print("this is MC")
940  cmd = 'dasgoclient -query \'file dataset='+opts.data+ (' instance='+opts.instance+'\'' if (opts.instance is not None) else '\'')
941  s = Popen(cmd , shell=True, stdout=PIPE, stderr=PIPE)
942  out,err = s.communicate()
943  mylist = out.decode().split('\n')
944  mylist.pop()
945  #print mylist
946 
947  splitList = split(mylist,10)
948  for files in splitList:
949  inputFiles.append(files)
950  myRuns.append(str(1))
951  else:
952  print("this is DATA (not doing full run-based selection)")
953  print(runboundary[iConf])
954  cmd = 'dasgoclient -query \'file dataset='+opts.data+' run='+runboundary[iConf]+ (' instance='+opts.instance+'\'' if (opts.instance is not None) else '\'')
955  #print cmd
956  s = Popen(cmd , shell=True, stdout=PIPE, stderr=PIPE)
957  out,err = s.communicate()
958  #print(out)
959  mylist = out.decode().split('\n')
960  mylist.pop()
961  #print "len(mylist):",len(mylist)
962  print("mylist:",mylist)
963 
964  splitList = split(mylist,10)
965  for files in splitList:
966  inputFiles.append(files)
967  myRuns.append(str(runboundary[iConf]))
968 
969  myLumiDB = getLuminosity(HOME,myRuns[0],myRuns[-1],True,opts.verbose)
970 
971  else:
972  #pass
973  for element in od:
974  #print mylist[element]
975  inputFiles.append(od[element])
976  #print element,od[element]
977  #print mylist
978 
979  #print inputFiles
980 
981 
982  batchJobIds = []
983  mergedFile = None
985  if(opts.verbose):
986  print("myRuns =====>",myRuns)
987 
988  totalJobs=0
989  theBashDir=None
990  theBaseName=None
992  for jobN,theSrcFiles in enumerate(inputFiles):
993  if(opts.verbose):
994  print("JOB:",jobN,"run",myRuns[jobN],theSrcFiles)
995  else:
996  print("JOB:",jobN,"run",myRuns[jobN])
997  thejobIndex=None
998  theLumi='1'
1000  #if(to_bool(isMC[iConf]) and (not to_bool(doRunBased))):
1001  if(to_bool(isMC[iConf])):
1002  thejobIndex=jobN
1003  else:
1004  if(doRunBased):
1005  thejobIndex=myRuns[jobN]
1006  else:
1007  thejobIndex=myRuns[jobN]+"_"+str(jobN)
1008 
1009  if (myRuns[jobN]) in myLumiDB:
1010  theLumi = myLumiDB[myRuns[jobN]]
1011  else:
1012  print("=====> COULD NOT FIND LUMI, setting default = 1/pb")
1013  theLumi='1'
1014  print("int. lumi:",theLumi,"/pb")
1015 
1016  #print 'the configuration is:',iConf,' theJobIndex is:',thejobIndex
1017  #print applyBOWS[iConf],applyEXTRACOND[iConf],conditions[iConf]
1018 
1019  runInfo = {}
1020  runInfo["run"] = myRuns[jobN]
1021  #runInfo["runevents"] = getNEvents(myRuns[jobN],opts.data)
1022  runInfo["conf"] = jobName[iConf]
1023  runInfo["gt"] = gt[iConf]
1024  runInfo["allFromGT"] = allFromGT[iConf]
1025  runInfo["alignmentDB"] = alignmentDB[iConf]
1026  runInfo["alignmentTag"] = alignmentTAG[iConf]
1027  runInfo["apeDB"] = apeDB[iConf]
1028  runInfo["apeTag"] = apeTAG[iConf]
1029  runInfo["applyBows"] = applyBOWS[iConf]
1030  runInfo["bowDB"] = bowDB[iConf]
1031  runInfo["bowTag"] = bowTAG[iConf]
1032  runInfo["ptCut"] = ptcut[iConf]
1033  runInfo["lumilist"] = lumilist[iConf]
1034  runInfo["applyEXTRACOND"] = applyEXTRACOND[iConf]
1035  runInfo["conditions"] = conditions[iConf]
1036  runInfo["nfiles"] = len(theSrcFiles)
1037  runInfo["srcFiles"] = theSrcFiles
1038  runInfo["intLumi"] = theLumi
1039 
1040  updateDB(((iConf+1)*10)+(jobN+1),runInfo)
1041 
1042  totalJobs=totalJobs+1
1043 
1044  aJob = Job(opts.data,
1045  jobN,
1046  thejobIndex,
1047  jobName[iConf],isDA[iConf],isMC[iConf],
1048  applyBOWS[iConf],applyEXTRACOND[iConf],conditions[iConf],
1049  myRuns[jobN], lumilist[iConf], theLumi, maxevents[iConf],
1050  gt[iConf],allFromGT[iConf],
1051  alignmentDB[iConf], alignmentTAG[iConf],
1052  apeDB[iConf], apeTAG[iConf],
1053  bowDB[iConf], bowTAG[iConf],
1054  vertextype[iConf], tracktype[iConf],
1055  refittertype[iConf], ttrhtype[iConf],
1056  applyruncontrol[iConf],
1057  ptcut[iConf],input_CMSSW_BASE,AnalysisStep_dir)
1058 
1059  aJob.setEOSout(eosdir)
1060  aJob.createTheCfgFile(theSrcFiles)
1061  aJob.createTheBashFile()
1062 
1063  output_file_list1.append("xrdcp root://eoscms//eos/cms"+aJob.getOutputFileName()+" /tmp/$USER/"+opts.taskname+" \n")
1064  if jobN == 0:
1065  theBashDir=aJob.BASH_dir
1066  theBaseName=aJob.getOutputBaseNameWithData()
1067  mergedFile = "/tmp/$USER/"+opts.taskname+"/"+aJob.getOutputBaseName()+" "+opts.taskname+".root"
1068  output_file_list2.append("/tmp/$USER/"+opts.taskname+"/"+aJob.getOutputBaseName()+opts.taskname+".root ")
1069  output_file_list2.append("/tmp/$USER/"+opts.taskname+"/"+os.path.split(aJob.getOutputFileName())[1]+" ")
1070  del aJob
1071 
1072  job_submit_file = write_HTCondor_submit_file(theBashDir,theBaseName,totalJobs,None)
1073 
1074  if opts.submit:
1075  os.system("chmod u+x "+theBashDir+"/*.sh")
1076  submissionCommand = "condor_submit "+job_submit_file
1077  submissionOutput = getCommandOutput(submissionCommand)
1078  print(submissionOutput)
1079 
1080  fout.write("#!/bin/bash \n")
1081  fout.write("MAIL=$USER@mail.cern.ch \n")
1082  fout.write("OUT_DIR="+eosdir+"\n")
1083  fout.write("FILE="+str(mergedFile)+"\n")
1084  fout.write("echo $HOST | mail -s \"Harvesting job started\" $USER@mail.cern.ch \n")
1085  fout.write("cd "+os.path.join(input_CMSSW_BASE,"src")+"\n")
1086  fout.write("eval `scram r -sh` \n")
1087  fout.write("mkdir -p /tmp/$USER/"+opts.taskname+" \n")
1088  fout.writelines(output_file_list1)
1089  fout.writelines(output_file_list2)
1090  fout.write("\n")
1091  fout.write("echo \"xrdcp -f $FILE root://eoscms//eos/cms$OUT_DIR\" \n")
1092  fout.write("xrdcp -f $FILE root://eoscms//eos/cms$OUT_DIR \n")
1093  fout.write("echo \"Harvesting for "+opts.taskname+" task is complete; please find output at $OUT_DIR \" | mail -s \"Harvesting for " +opts.taskname +" completed\" $MAIL \n")
1094 
1095  os.system("chmod u+x "+hadd_script_file)
1096 
1097  harvest_conditions = '"' + " && ".join(["ended(" + jobId + ")" for jobId in batchJobIds]) + '"'
1098  print(harvest_conditions)
1099  lastJobCommand = "bsub -o harvester"+opts.taskname+".tmp -q 1nh -w "+harvest_conditions+" "+hadd_script_file
1100  print(lastJobCommand)
1101  if opts.submit:
1102  lastJobOutput = getCommandOutput(lastJobCommand)
1103  print(lastJobOutput)
1104 
1105  fout.close()
1106  del output_file_list1
1107 
1108 
1109 if __name__ == "__main__":
1110  main()
1111 
1112 
1113 
1114 
— Classes —############################
const bool isValid(const Frame &aFrame, const FrameQuality &aQuality, const uint16_t aExpectedPos)
def getLuminosity(homedir, minRun, maxRun, isRunBased, verbose)
def __init__(self, dataset, job_number, job_id, job_name, isDA, isMC, applyBOWS, applyEXTRACOND, extraconditions, runboundary, lumilist, intlumi, maxevents, gt, allFromGT, alignmentDB, alignmentTAG, apeDB, apeTAG, bowDB, bowTAG, vertextype, tracktype, refittertype, ttrhtype, applyruncontrol, ptcut, CMSSW_dir, the_dir)
def replace(string, replacements)
def getResultingSection(self, section, defaultDict={}, demandPars=[])
def mkdir_eos(out_path)
method to create recursively directories on EOS #############
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
def ConfigSectionMap(config, section)
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
static std::string join(char **cmd)
Definition: RemoteFile.cc:19
def write_HTCondor_submit_file(path, name, nruns, proxy_path=None)
Definition: main.py:1
#define str(s)
def setEOSout(self, theEOSdir)