CMS 3D CMS Logo

submitPVValidationJobs.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 
3 '''Script that submits CMS Tracker Alignment Primary Vertex Validation workflows,
4 usage:
5 
6 submitPVValidationJobs.py -j TEST -D /HLTPhysics/Run2016C-TkAlMinBias-07Dec2018-v1/ALCARECO -i testPVValidation_Relvals_DATA.ini -r
7 '''
8 
9 from __future__ import print_function
10 from builtins import range
11 
12 __author__ = 'Marco Musich'
13 __copyright__ = 'Copyright 2020, CERN CMS'
14 __credits__ = ['Ernesto Migliore', 'Salvatore Di Guida']
15 __license__ = 'Unknown'
16 __maintainer__ = 'Marco Musich'
17 __email__ = 'marco.musich@cern.ch'
18 __version__ = 1
19 
20 import datetime,time
21 import os,sys
22 import copy
23 import pickle
24 import string, re
25 import configparser as ConfigParser
26 import json
27 import pprint
28 import subprocess
29 from optparse import OptionParser
30 from subprocess import Popen, PIPE
31 import collections
32 import warnings
33 import shutil
34 import multiprocessing
35 from enum import Enum
36 
37 class RefitType(Enum):
38  STANDARD = 1
39  COMMON = 2
40 
41 CopyRights = '##################################\n'
42 CopyRights += '# submitPVValidationJobs.py #\n'
43 CopyRights += '# marco.musich@cern.ch #\n'
44 CopyRights += '# April 2020 #\n'
45 CopyRights += '##################################\n'
46 
47 
49 
50  """Check if GRID proxy has been initialized."""
51 
52  try:
53  with open(os.devnull, "w") as dump:
54  subprocess.check_call(["voms-proxy-info", "--exists"],
55  stdout = dump, stderr = dump)
56  except subprocess.CalledProcessError:
57  return False
58  return True
59 
60 
61 def forward_proxy(rundir):
62 
63  """Forward proxy to location visible from the batch system.
64  Arguments:
65  - `rundir`: directory for storing the forwarded proxy
66  """
67 
68  if not check_proxy():
69  print("Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
70  sys.exit(1)
71 
72  local_proxy = subprocess.check_output(["voms-proxy-info", "--path"]).strip()
73  shutil.copyfile(local_proxy, os.path.join(rundir,".user_proxy"))
74 
75 
76 def write_HTCondor_submit_file(path, name, nruns, proxy_path=None):
77 
78  """Writes 'job.submit' file in `path`.
79  Arguments:
80  - `path`: job directory
81  - `script`: script to be executed
82  - `proxy_path`: path to proxy (only used in case of requested proxy forward)
83  """
84 
85  job_submit_template="""\
86 universe = vanilla
87 requirements = (OpSysAndVer =?= "CentOS7")
88 executable = {script:s}
89 output = {jobm:s}/{out:s}.out
90 error = {jobm:s}/{out:s}.err
91 log = {jobm:s}/{out:s}.log
92 transfer_output_files = ""
93 +JobFlavour = "{flavour:s}"
94 queue {njobs:s}
95 """
96  if proxy_path is not None:
97  job_submit_template += """\
98 +x509userproxy = "{proxy:s}"
99 """
100 
101  job_submit_file = os.path.join(path, "job_"+name+".submit")
102  with open(job_submit_file, "w") as f:
103  f.write(job_submit_template.format(script = os.path.join(path,name+"_$(ProcId).sh"),
104  out = name+"_$(ProcId)",
105  jobm = os.path.abspath(path),
106  flavour = "tomorrow",
107  njobs = str(nruns),
108  proxy = proxy_path))
109 
110  return job_submit_file
111 
112 
113 def getCommandOutput(command):
114 
115  """This function executes `command` and returns it output.
116  Arguments:
117  - `command`: Shell command to be invoked by this function.
118  """
119  child = os.popen(command)
120  data = child.read()
121  err = child.close()
122  if err:
123  print('%s failed w/ exit code %d' % (command, err))
124  return data
125 
126 
127 def getFilesForRun(blob):
128 
129  cmd2 = ' dasgoclient -limit=0 -query \'file run='+blob[0][0]+' dataset='+blob[0][1]+ (' instance='+blob[1]+'\'' if (blob[1] is not None) else '\'')
130  #cmd2 = 'dasgoclient -query \'file run='+blob[0]+' dataset='+blob[1]+'\''
131  q = Popen(cmd2 , shell=True, stdout=PIPE, stderr=PIPE)
132  out, err = q.communicate()
133  #print(cmd2,'\n',out.rstrip('\n'))
134  outputList = out.decode().split('\n')
135  outputList.pop()
136  return outputList #,err
137 
138 
139 def getNEvents(run, dataset):
140 
141  nEvents = subprocess.check_output(["das_client", "--limit", "0", "--query", "summary run={} dataset={} | grep summary.nevents".format(run, dataset)])
142  return 0 if nEvents == "[]\n" else int(nEvents)
143 
144 
145 def getLuminosity(homedir,minRun,maxRun,isRunBased,verbose):
146 
147  """Expects something like
148  +-------+------+--------+--------+-------------------+------------------+
149  | nfill | nrun | nls | ncms | totdelivered(/fb) | totrecorded(/fb) |
150  +-------+------+--------+--------+-------------------+------------------+
151  | 73 | 327 | 142418 | 138935 | 19.562 | 18.036 |
152  +-------+------+--------+--------+-------------------+------------------+
153  And extracts the total recorded luminosity (/b).
154  """
155  myCachedLumi={}
156  if(not isRunBased):
157  return myCachedLumi
158 
159  try:
160  #output = subprocess.check_output([homedir+"/.local/bin/brilcalc", "lumi", "-b", "STABLE BEAMS", "--normtag=/afs/cern.ch/user/l/lumipro/public/normtag_file/normtag_BRIL.json", "-u", "/pb", "--begin", str(minRun),"--end",str(maxRun),"--output-style","csv"])
161 
162  output = subprocess.check_output([homedir+"/.local/bin/brilcalc", "lumi", "-b", "STABLE BEAMS","-u", "/pb", "--begin", str(minRun),"--end",str(maxRun),"--output-style","csv","-c","web"])
163  except:
164  warnings.warn('ATTENTION! Impossible to query the BRIL DB!')
165  return myCachedLumi
166 
167  if(verbose):
168  print("INSIDE GET LUMINOSITY")
169  print(output)
170 
171  for line in output.decode().split("\n"):
172  if ("#" not in line):
173  runToCache = line.split(",")[0].split(":")[0]
174  lumiToCache = line.split(",")[-1].replace("\r", "")
175  #print "run",runToCache
176  #print "lumi",lumiToCache
177  myCachedLumi[runToCache] = lumiToCache
178 
179  if(verbose):
180  print(myCachedLumi)
181  return myCachedLumi
182 
183 
184 def isInJSON(run,jsonfile):
185 
186  try:
187  with open(jsonfile, 'r') as myJSON: jsonDATA = json.load(myJSON)
188  return (run in jsonDATA)
189  except:
190  warnings.warn('ATTENTION! Impossible to find lumi mask! All runs will be used.')
191  return True
192 
193 
194 def as_dict(config):
196  dictionary = {}
197  for section in config.sections():
198  dictionary[section] = {}
199  for option in config.options(section):
200  dictionary[section][option] = config.get(section, option)
201 
202  return dictionary
203 
204 
205 def to_bool(value):
207  """
208  Converts 'something' to boolean. Raises exception for invalid formats
209  Possible True values: 1, True, "1", "TRue", "yes", "y", "t"
210  Possible False values: 0, False, None, [], {}, "", "0", "faLse", "no", "n", "f", 0.0, ...
211  """
212  if str(value).lower() in ("yes", "y", "true", "t", "1"): return True
213  if str(value).lower() in ("no", "n", "false", "f", "0", "0.0", "", "none", "[]", "{}"): return False
214  raise Exception('Invalid value for boolean conversion: ' + str(value))
215 
216 
217 def updateDB2():
219  dbName = "runInfo.pkl"
220  infos = {}
221  if os.path.exists(dbName):
222  with open(dbName,'rb') as f:
223  infos = pickle.load(f)
224 
225  for f in glob.glob("root-files/Run*.root"):
226  run = runFromFilename(f)
227  if run not in infos:
228  infos[run] = {}
229  infos[run]["start_time"] = getRunStartTime(run)
230  infos["isValid"] = isValid(f)
231 
232  with open(dbName, "wb") as f:
233  pickle.dump(infos, f)
234 
235 
236 def updateDB(run,runInfo):
238  dbName = "runInfo.pkl"
239  infos = {}
240  if os.path.exists(dbName):
241  with open(dbName,'rb') as f:
242  infos = pickle.load(f)
243 
244  if run not in infos:
245  infos[run] = runInfo
246 
247  with open(dbName, "wb") as f:
248  pickle.dump(infos, f)
249 
250 
251 class BetterConfigParser(ConfigParser.ConfigParser):
253 
254  def optionxform(self, optionstr):
255  return optionstr
256 
257 
258  def exists( self, section, option):
259  try:
260  items = self.items(section)
261  except ConfigParser.NoSectionError:
262  return False
263  for item in items:
264  if item[0] == option:
265  return True
266  return False
267 
268 
269  def __updateDict( self, dictionary, section ):
270  result = dictionary
271  try:
272  for option in self.options( section ):
273  result[option] = self.get( section, option )
274  if "local"+section.title() in self.sections():
275  for option in self.options( "local"+section.title() ):
276  result[option] = self.get( "local"+section.title(),option )
277  except ConfigParser.NoSectionError as section:
278  msg = ("%s in configuration files. This section is mandatory."
279  %(str(section).replace(":", "", 1)))
280  #raise AllInOneError(msg)
281  return result
282 
283 
284  def getResultingSection( self, section, defaultDict = {}, demandPars = [] ):
285  result = copy.deepcopy(defaultDict)
286  for option in demandPars:
287  try:
288  result[option] = self.get( section, option )
289  except ConfigParser.NoOptionError as globalSectionError:
290  globalSection = str( globalSectionError ).split( "'" )[-2]
291  splittedSectionName = section.split( ":" )
292  if len( splittedSectionName ) > 1:
293  localSection = ("local"+section.split( ":" )[0].title()+":"
294  +section.split(":")[1])
295  else:
296  localSection = ("local"+section.split( ":" )[0].title())
297  if self.has_section( localSection ):
298  try:
299  result[option] = self.get( localSection, option )
300  except ConfigParser.NoOptionError as option:
301  msg = ("%s. This option is mandatory."
302  %(str(option).replace(":", "", 1).replace(
303  "section",
304  "section '"+globalSection+"' or", 1)))
305  #raise AllInOneError(msg)
306  else:
307  msg = ("%s. This option is mandatory."
308  %(str(globalSectionError).replace(":", "", 1)))
309  #raise AllInOneError(msg)
310  result = self.__updateDict( result, section )
311  #print(result)
312  return result
313 
314 
315 def ConfigSectionMap(config, section):
316  the_dict = {}
317  options = config.options(section)
318  for option in options:
319  try:
320  the_dict[option] = config.get(section, option)
321  if the_dict[option] == -1:
322  DebugPrint("skip: %s" % option)
323  except:
324  print("exception on %s!" % option)
325  the_dict[option] = None
326  return the_dict
327 
328 
329 def mkdir_eos(out_path):
330  print("creating",out_path)
331  newpath='/'
332  for dir in out_path.split('/'):
333  newpath=os.path.join(newpath,dir)
334  # do not issue mkdir from very top of the tree
335  if newpath.find('test_out') > 0:
336  #getCommandOutput("eos mkdir"+newpath)
337  command="/afs/cern.ch/project/eos/installation/cms/bin/eos.select mkdir "+newpath
338  p = subprocess.Popen(command,shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
339  (out, err) = p.communicate()
340  #print(out,err)
341  p.wait()
342 
343  # now check that the directory exists
344  command2="/afs/cern.ch/project/eos/installation/cms/bin/eos.select ls "+out_path
345  p = subprocess.Popen(command2,shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
346  (out, err) = p.communicate()
347  p.wait()
348  if p.returncode !=0:
349  print(out)
350 
351 def split(sequence, size):
357  for i in range(0, len(sequence), size):
358  yield sequence[i:i+size]
359 
360 
361 class Job:
363 
364  def __init__(self,dataset, job_number, job_id, job_name, isDA, isMC, applyBOWS, applyEXTRACOND, extraconditions, runboundary, lumilist, intlumi, maxevents, gt, allFromGT, alignmentDB, alignmentTAG, apeDB, apeTAG, bowDB, bowTAG, vertextype, tracktype, refittertype, ttrhtype, applyruncontrol, ptcut, CMSSW_dir ,the_dir):
366 
367  theDataSet = dataset.split("/")[1]+"_"+(dataset.split("/")[2]).split("-")[0]
369  self.data = theDataSet
370  self.job_number = job_number
371  self.job_id = job_id
372  self.batch_job_id = None
373  self.job_name = job_name
374 
375  self.isDA = isDA
376  self.isMC = isMC
377  self.applyBOWS = applyBOWS
378  self.applyEXTRACOND = applyEXTRACOND
379  self.extraCondVect = extraconditions
380  self.runboundary = runboundary
381  self.lumilist = lumilist
382  self.intlumi = intlumi
383  self.maxevents = maxevents
384  self.gt = gt
385  self.allFromGT = allFromGT
386  self.alignmentDB = alignmentDB
387  self.alignmentTAG = alignmentTAG
388  self.apeDB = apeDB
389  self.apeTAG = apeTAG
390  self.bowDB = bowDB
391  self.bowTAG = bowTAG
392  self.vertextype = vertextype
393  self.tracktype = tracktype
394  self.refittertype = refittertype
395  self.ttrhtype = ttrhtype
396  self.applyruncontrol = applyruncontrol
397  self.ptcut = ptcut
399  self.the_dir=the_dir
400  self.CMSSW_dir=CMSSW_dir
402  self.output_full_name=self.getOutputBaseName()+"_"+str(self.job_id)
403  self.output_number_name=self.getOutputBaseNameWithData()+"_"+str(self.job_number)
404 
405  self.cfg_dir=None
406  self.outputCfgName=None
407 
408  # LSF variables
409  self.LSF_dir=None
410  self.BASH_dir=None
411  self.output_LSF_name=None
412  self.output_BASH_name=None
414  self.lfn_list=list()
416  def __del__(self):
418  del self.lfn_list
419 
420  def setEOSout(self,theEOSdir):
422  self.OUTDIR = theEOSdir
423 
424  def getOutputBaseName(self):
426  return "PVValidation_"+self.job_name
427 
428  def getOutputBaseNameWithData(self):
430  return "PVValidation_"+self.job_name+"_"+self.data
431 
432  def createTheCfgFile(self,lfn):
434 
435  global CopyRights
436  # write the cfg file
437 
438  self.cfg_dir = os.path.join(self.the_dir,"cfg")
439  if not os.path.exists(self.cfg_dir):
440  os.makedirs(self.cfg_dir)
441 
442  self.outputCfgName=self.output_full_name+"_cfg.py"
443  fout=open(os.path.join(self.cfg_dir,self.outputCfgName),'w')
445  template_cfg_file = os.path.join(self.CMSSW_dir,"src/Alignment/OfflineValidation/test","PVValidation_T_cfg.py")
446  file = open(template_cfg_file,'r')
448  config_txt = '\n\n' + CopyRights + '\n\n'
449  config_txt += file.read()
450  config_txt=config_txt.replace("ISDATEMPLATE",self.isDA)
451  config_txt=config_txt.replace("ISMCTEMPLATE",self.isMC)
452  config_txt=config_txt.replace("APPLYBOWSTEMPLATE",self.applyBOWS)
453  config_txt=config_txt.replace("EXTRACONDTEMPLATE",self.applyEXTRACOND)
454  config_txt=config_txt.replace("USEFILELISTTEMPLATE","True")
455  config_txt=config_txt.replace("RUNBOUNDARYTEMPLATE",self.runboundary)
456  config_txt=config_txt.replace("LUMILISTTEMPLATE",self.lumilist)
457  config_txt=config_txt.replace("MAXEVENTSTEMPLATE",self.maxevents)
458  config_txt=config_txt.replace("GLOBALTAGTEMPLATE",self.gt)
459  config_txt=config_txt.replace("ALLFROMGTTEMPLATE",self.allFromGT)
460  config_txt=config_txt.replace("ALIGNOBJTEMPLATE",self.alignmentDB)
461  config_txt=config_txt.replace("GEOMTAGTEMPLATE",self.alignmentTAG)
462  config_txt=config_txt.replace("APEOBJTEMPLATE",self.apeDB)
463  config_txt=config_txt.replace("ERRORTAGTEMPLATE",self.apeTAG)
464  config_txt=config_txt.replace("BOWSOBJECTTEMPLATE",self.bowDB)
465  config_txt=config_txt.replace("BOWSTAGTEMPLATE",self.bowTAG)
466  config_txt=config_txt.replace("VERTEXTYPETEMPLATE",self.vertextype)
467  config_txt=config_txt.replace("TRACKTYPETEMPLATE",self.tracktype)
468  config_txt=config_txt.replace("REFITTERTEMPLATE",self.refittertype)
469  config_txt=config_txt.replace("TTRHBUILDERTEMPLATE",self.ttrhtype)
470  config_txt=config_txt.replace("PTCUTTEMPLATE",self.ptcut)
471  config_txt=config_txt.replace("INTLUMITEMPLATE",self.intlumi)
472  config_txt=config_txt.replace("RUNCONTROLTEMPLATE",self.applyruncontrol)
473  lfn_with_quotes = map(lambda x: "\'"+x+"\'",lfn)
474  config_txt=config_txt.replace("FILESOURCETEMPLATE","["+",".join(lfn_with_quotes)+"]")
475  config_txt=config_txt.replace("OUTFILETEMPLATE",self.output_full_name+".root")
476 
477 
478  textToWrite=''
479  for element in self.extraCondVect :
480  if("Rcd" in element):
481  params = self.extraCondVect[element].split(',')
482  text = '''\n
483  process.conditionsIn{record} = CalibTracker.Configuration.Common.PoolDBESSource_cfi.poolDBESSource.clone(
484  connect = cms.string('{database}'),
485  toGet = cms.VPSet(cms.PSet(record = cms.string('{record}'),
486  tag = cms.string('{tag}'),
487  label = cms.untracked.string('{label}')
488  )
489  )
490  )
491  process.prefer_conditionsIn{record} = cms.ESPrefer("PoolDBESSource", "conditionsIn{record}")
492  '''.format(record = element, database = params[0], tag = params[1], label = (params[2] if len(params)>2 else ''))
493  textToWrite+=text
494 
495  if(self.applyEXTRACOND=="True"):
496  if not self.extraCondVect:
497  raise Exception('Requested extra conditions, but none provided')
498 
499  config_txt=config_txt.replace("END OF EXTRA CONDITIONS",textToWrite)
500  else:
501  print("INFO: Will not apply any extra conditions")
502  pass
503 
504  fout.write(config_txt)
505 
506  file.close()
507  fout.close()
508 
509  def createTheLSFFile(self):
511 
512  # directory to store the LSF to be submitted
513  self.LSF_dir = os.path.join(self.the_dir,"LSF")
514  if not os.path.exists(self.LSF_dir):
515  os.makedirs(self.LSF_dir)
516 
517  self.output_LSF_name=self.output_full_name+".lsf"
518  fout=open(os.path.join(self.LSF_dir,self.output_LSF_name),'w')
519 
520  job_name = self.output_full_name
521 
522  log_dir = os.path.join(self.the_dir,"log")
523  if not os.path.exists(log_dir):
524  os.makedirs(log_dir)
525 
526  fout.write("#!/bin/sh \n")
527  fout.write("#BSUB -L /bin/sh\n")
528  fout.write("#BSUB -J "+job_name+"\n")
529  fout.write("#BSUB -o "+os.path.join(log_dir,job_name+".log")+"\n")
530  fout.write("#BSUB -q cmscaf1nd \n")
531  fout.write("JobName="+job_name+" \n")
532  fout.write("OUT_DIR="+self.OUTDIR+" \n")
533  fout.write("LXBATCH_DIR=`pwd` \n")
534  fout.write("cd "+os.path.join(self.CMSSW_dir,"src")+" \n")
535  fout.write("eval `scram runtime -sh` \n")
536  fout.write("cd $LXBATCH_DIR \n")
537  fout.write("cmsRun "+os.path.join(self.cfg_dir,self.outputCfgName)+" \n")
538  fout.write("ls -lh . \n")
539  fout.write("for RootOutputFile in $(ls *root ); do xrdcp -f ${RootOutputFile} root://eoscms//eos/cms${OUT_DIR}/${RootOutputFile} ; done \n")
540  fout.write("for TxtOutputFile in $(ls *txt ); do xrdcp -f ${TxtOutputFile} root://eoscms//eos/cms${OUT_DIR}/${TxtOutputFile} ; done \n")
541 
542  fout.close()
543 
544 
545  def createTheBashFile(self):
547 
548  # directory to store the BASH to be submitted
549  self.BASH_dir = os.path.join(self.the_dir,"BASH")
550  if not os.path.exists(self.BASH_dir):
551  os.makedirs(self.BASH_dir)
552 
553  self.output_BASH_name=self.output_number_name+".sh"
554  fout=open(os.path.join(self.BASH_dir,self.output_BASH_name),'w')
555 
556  job_name = self.output_full_name
557 
558  log_dir = os.path.join(self.the_dir,"log")
559  if not os.path.exists(log_dir):
560  os.makedirs(log_dir)
561 
562  fout.write("#!/bin/bash \n")
563  #fout.write("export EOS_MGM_URL=root://eoscms.cern.ch \n")
564  fout.write("JobName="+job_name+" \n")
565  fout.write("echo \"Job started at \" `date` \n")
566  fout.write("CMSSW_DIR="+os.path.join(self.CMSSW_dir,"src")+" \n")
567  fout.write("export X509_USER_PROXY=$CMSSW_DIR/Alignment/OfflineValidation/test/.user_proxy \n")
568  fout.write("OUT_DIR="+self.OUTDIR+" \n")
569  fout.write("LXBATCH_DIR=$PWD \n")
570  #fout.write("cd "+os.path.join(self.CMSSW_dir,"src")+" \n")
571  fout.write("cd ${CMSSW_DIR} \n")
572  fout.write("eval `scramv1 runtime -sh` \n")
573  fout.write("echo \"batch dir: $LXBATCH_DIR release: $CMSSW_DIR release base: $CMSSW_RELEASE_BASE\" \n")
574  fout.write("cd $LXBATCH_DIR \n")
575  fout.write("cp "+os.path.join(self.cfg_dir,self.outputCfgName)+" . \n")
576  fout.write("echo \"cmsRun "+self.outputCfgName+"\" \n")
577  fout.write("cmsRun "+self.outputCfgName+" \n")
578  fout.write("echo \"Content of working dir is \"`ls -lh` \n")
579  #fout.write("less condor_exec.exe \n")
580  fout.write("for RootOutputFile in $(ls *root ); do xrdcp -f ${RootOutputFile} root://eoscms//eos/cms${OUT_DIR}/${RootOutputFile} ; done \n")
581  #fout.write("mv ${JobName}.out ${CMSSW_DIR}/BASH \n")
582  fout.write("echo \"Job ended at \" `date` \n")
583  fout.write("exit 0 \n")
584 
585  fout.close()
586 
587  def getOutputFileName(self):
589  return os.path.join(self.OUTDIR,self.output_full_name+".root")
590 
591  def submit(self):
593  print("submit job", self.job_id)
594  job_name = self.output_full_name
595  submitcommand1 = "chmod u+x " + os.path.join(self.LSF_dir,self.output_LSF_name)
596  child1 = os.system(submitcommand1)
597  #submitcommand2 = "bsub < "+os.path.join(self.LSF_dir,self.output_LSF_name)
598  #child2 = os.system(submitcommand2)
599  self.batch_job_id = getCommandOutput("bsub < "+os.path.join(self.LSF_dir,self.output_LSF_name))
600 
601  def getBatchjobId(self):
603  return self.batch_job_id.split("<")[1].split(">")[0]
604 
605 
606 def main():
608 
609 
610  if not check_proxy():
611  print("Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
612  sys.exit(1)
613 
614 
615  forward_proxy(".")
616 
617  global CopyRights
618  print('\n'+CopyRights)
619 
620  HOME = os.environ.get('HOME')
622  # CMSSW section
623  input_CMSSW_BASE = os.environ.get('CMSSW_BASE')
624  lib_path = os.path.abspath(os.path.join(input_CMSSW_BASE,"src/Alignment/OfflineValidation/test"))
625  sys.path.append(lib_path)
626 
627 
628  srcFiles = []
630  desc="""This is a description of %prog."""
631  parser = OptionParser(description=desc,version='%prog version 0.1')
632  parser.add_option('-s','--submit', help='job submitted', dest='submit', action='store_true', default=False)
633  parser.add_option('-j','--jobname', help='task name', dest='taskname', action='store', default='myTask')
634  parser.add_option('-D','--dataset', help='selected dataset', dest='data', action='store', default='')
635  parser.add_option('-r','--doRunBased',help='selected dataset', dest='doRunBased', action='store_true' , default=False)
636  parser.add_option('-i','--input', help='set input configuration (overrides default)', dest='inputconfig',action='store',default=None)
637  parser.add_option('-b','--begin', help='starting point', dest='start', action='store', default='1')
638  parser.add_option('-e','--end', help='ending point', dest='end', action='store', default='999999')
639  parser.add_option('-v','--verbose', help='verbose output', dest='verbose', action='store_true', default=False)
640  parser.add_option('-u','--unitTest', help='unit tests?', dest='isUnitTest', action='store_true', default=False)
641  parser.add_option('-I','--instance', help='DAS instance to use', dest='instance', action='store', default=None)
642  (opts, args) = parser.parse_args()
644  now = datetime.datetime.now()
645  #t = now.strftime("test_%Y_%m_%d_%H_%M_%S_")
646  #t = "2016UltraLegacy"
647  #t = "2017UltraLegacy"
648  #t = "2018UltraLegacy"
649  t=""
650  t+=opts.taskname
651 
652  USER = os.environ.get('USER')
653  eosdir=os.path.join("/store/group/alca_trackeralign",USER,"test_out",t)
654 
655  if opts.submit:
656  mkdir_eos(eosdir)
657  else:
658  print("Not going to create EOS folder. -s option has not been chosen")
659 
660 
661 
662  jobName = []
663  isMC = []
664  isDA = []
665  doRunBased = []
666  maxevents = []
667 
668  gt = []
669  allFromGT = []
670  applyEXTRACOND = []
671  extraCondVect = []
672  alignmentDB = []
673  alignmentTAG = []
674  apeDB = []
675  apeTAG = []
676  applyBOWS = []
677  bowDB = []
678  bowTAG = []
679  conditions = []
680 
681  vertextype = []
682  tracktype = []
683  refittertype = []
684  ttrhtype = []
685 
686  applyruncontrol = []
687  ptcut = []
688  runboundary = []
689  lumilist = []
690 
691  ConfigFile = opts.inputconfig
692 
693  if ConfigFile is not None:
694 
695  print("********************************************************")
696  print("* Parsing from input file:", ConfigFile," ")
697 
698  config = BetterConfigParser()
699  config.read(ConfigFile)
700 
701  print("Parsed the following configuration \n\n")
702  inputDict = as_dict(config)
703  pprint.pprint(inputDict)
704 
705  if(not bool(inputDict)):
706  raise SystemExit("\n\n ERROR! Could not parse any input file, perhaps you are submitting this from the wrong folder? \n\n")
707 
708  #print config.sections()
709 
710  # please notice: since in principle one wants to run on several different samples simultaneously,
711  # all these inputs are vectors
712 
713  doRunBased = opts.doRunBased
714 
715  listOfValidations = config.getResultingSection("validations")
716 
717  for item in listOfValidations:
718  if (bool(listOfValidations[item]) == True):
719 
720  jobName.append(ConfigSectionMap(config,"Conditions:"+item)['jobname'])
721  isDA.append(ConfigSectionMap(config,"Job")['isda'])
722  isMC.append(ConfigSectionMap(config,"Job")['ismc'])
723  maxevents.append(ConfigSectionMap(config,"Job")['maxevents'])
724 
725  gt.append(ConfigSectionMap(config,"Conditions:"+item)['gt'])
726  allFromGT.append(ConfigSectionMap(config,"Conditions:"+item)['allFromGT'])
727  applyEXTRACOND.append(ConfigSectionMap(config,"Conditions:"+item)['applyextracond'])
728  conditions.append(config.getResultingSection("ExtraConditions"))
729 
730  alignmentDB.append(ConfigSectionMap(config,"Conditions:"+item)['alignmentdb'])
731  alignmentTAG.append(ConfigSectionMap(config,"Conditions:"+item)['alignmenttag'])
732  apeDB.append(ConfigSectionMap(config,"Conditions:"+item)['apedb'])
733  apeTAG.append(ConfigSectionMap(config,"Conditions:"+item)['apetag'])
734  applyBOWS.append(ConfigSectionMap(config,"Conditions:"+item)['applybows'])
735  bowDB.append(ConfigSectionMap(config,"Conditions:"+item)['bowdb'])
736  bowTAG.append(ConfigSectionMap(config,"Conditions:"+item)['bowtag'])
737 
738  vertextype.append(ConfigSectionMap(config,"Type")['vertextype'])
739  tracktype.append(ConfigSectionMap(config,"Type")['tracktype'])
740 
741 
742 
743  if(config.exists("Refit","refittertype")):
744  refittertype.append(ConfigSectionMap(config,"Refit")['refittertype'])
745  else:
746  refittertype.append(str(RefitType.COMMON))
747 
748  if(config.exists("Refit","ttrhtype")):
749  ttrhtype.append(ConfigSectionMap(config,"Refit")['ttrhtype'])
750  else:
751  ttrhtype.append("WithAngleAndTemplate")
752 
753  applyruncontrol.append(ConfigSectionMap(config,"Selection")['applyruncontrol'])
754  ptcut.append(ConfigSectionMap(config,"Selection")['ptcut'])
755  runboundary.append(ConfigSectionMap(config,"Selection")['runboundary'])
756  lumilist.append(ConfigSectionMap(config,"Selection")['lumilist'])
757  else :
758 
759  print("********************************************************")
760  print("* Parsing from command line *")
761  print("********************************************************")
762 
763  jobName = ['testing']
764  isDA = ['True']
765  isMC = ['True']
766  doRunBased = opts.doRunBased
767  maxevents = ['10000']
768 
769  gt = ['74X_dataRun2_Prompt_v4']
770  allFromGT = ['False']
771  applyEXTRACOND = ['False']
772  conditions = [[('SiPixelTemplateDBObjectRcd','frontier://FrontierProd/CMS_CONDITIONS','SiPixelTemplateDBObject_38T_2015_v3_hltvalidation')]]
773  alignmentDB = ['frontier://FrontierProd/CMS_CONDITIONS']
774  alignmentTAG = ['TrackerAlignment_Prompt']
775  apeDB = ['frontier://FrontierProd/CMS_CONDITIONS']
776  apeTAG = ['TrackerAlignmentExtendedErr_2009_v2_express_IOVs']
777  applyBOWS = ['True']
778  bowDB = ['frontier://FrontierProd/CMS_CONDITIONS']
779  bowTAG = ['TrackerSurafceDeformations_v1_express']
780 
781  vertextype = ['offlinePrimaryVertices']
782  tracktype = ['ALCARECOTkAlMinBias']
783 
784  applyruncontrol = ['False']
785  ptcut = ['3']
786  runboundary = ['1']
787  lumilist = ['']
788 
789  # print some of the configuration
790 
791  print("********************************************************")
792  print("* Configuration info *")
793  print("********************************************************")
794  print("- submitted : ",opts.submit)
795  print("- taskname : ",opts.taskname)
796  print("- Jobname : ",jobName)
797  print("- use DA : ",isDA)
798  print("- is MC : ",isMC)
799  print("- is run-based: ",doRunBased)
800  print("- evts/job : ",maxevents)
801  print("- GlobatTag : ",gt)
802  print("- allFromGT? : ",allFromGT)
803  print("- extraCond? : ",applyEXTRACOND)
804  print("- extraCond : ",conditions)
805  print("- Align db : ",alignmentDB)
806  print("- Align tag : ",alignmentTAG)
807  print("- APE db : ",apeDB)
808  print("- APE tag : ",apeTAG)
809  print("- use bows? : ",applyBOWS)
810  print("- K&B db : ",bowDB)
811  print("- K&B tag : ",bowTAG)
812  print("- VertexColl : ",vertextype)
813  print("- TrackColl : ",tracktype)
814  print("- RefitterSeq : ",refittertype)
815  print("- TTRHBuilder : ",ttrhtype)
816  print("- RunControl? : ",applyruncontrol)
817  print("- Pt> ",ptcut)
818  print("- run= ",runboundary)
819  print("- JSON : ",lumilist)
820  print("- Out Dir : ",eosdir)
821 
822  print("********************************************************")
823  print("Will run on",len(jobName),"workflows")
824 
825  myRuns = []
826  mylist = {}
828  if(doRunBased):
829  print(">>>> This is Data!")
830  print(">>>> Doing run based selection")
831  cmd = 'dasgoclient -limit=0 -query \'run dataset='+opts.data + (' instance='+opts.instance+'\'' if (opts.instance is not None) else '\'')
832  p = Popen(cmd , shell=True, stdout=PIPE, stderr=PIPE)
833  out, err = p.communicate()
834  #print(out)
835  listOfRuns=out.decode().split("\n")
836  listOfRuns.pop()
837  listOfRuns.sort()
838  print("Will run on ",len(listOfRuns),"runs: \n",listOfRuns)
839 
840  mytuple=[]
842  print("first run:",opts.start,"last run:",opts.end)
843 
844  for run in listOfRuns:
845  if (int(run)<int(opts.start) or int(run)>int(opts.end)):
846  print("excluding",run)
847  continue
848 
849  if not isInJSON(run,lumilist[0]):
850  continue
851 
852  else:
853  print("'======> taking",run)
854  #print "preparing run",run
855  #if(int(run)%100==0):
856  mytuple.append((run,opts.data))
857 
858  #print mytuple
859 
860  instances=[opts.instance for entry in mytuple]
861  pool = multiprocessing.Pool(processes=20) # start 20 worker processes
862  count = pool.map(getFilesForRun,zip(mytuple,instances))
863  file_info = dict(zip(listOfRuns, count))
865  #print file_info
866 
867  for run in listOfRuns:
868  if (int(run)<int(opts.start) or int(run)>int(opts.end)):
869  print('rejecting run',run,' becasue outside of boundaries')
870  continue
871 
872  if not isInJSON(run,lumilist[0]):
873  print('rejecting run',run,' becasue outside not in JSON')
874  continue
875 
876  #if(int(run)%100==0):
877  # print "preparing run",run
878  myRuns.append(run)
879  #cmd2 = ' das_client --limit=0 --query \'file run='+run+' dataset='+opts.data+'\''
880  #q = Popen(cmd2 , shell=True, stdout=PIPE, stderr=PIPE)
881  #out2, err2 = q.communicate()
882 
883  #out2=getFilesForRun((run,opts.data))
884  #print out2
885  #pool.map(getFilesForRun,run,opts.data)
886 
887 
888  #if run in file_info:
889  #mylist[run] = file_info[run]
890  #print run,mylist[run]
891  #mylist[run] = out2.split('\n')
892  #print mylist
893  #mylist[run].pop()
894  #print mylist
895 
896  od = collections.OrderedDict(sorted(file_info.items()))
897  # print od
898 
899 
900  if(len(myRuns)==0):
901  if(opts.isUnitTest):
902  print('\n')
903  print('=' * 70)
904  print("|| WARNING: won't run on any run, probably DAS returned an empty query,\n|| but that's fine because this is a unit test!")
905  print('=' * 70)
906  print('\n')
907  sys.exit(0)
908  else:
909  raise Exception('Will not run on any run.... please check again the configuration')
910  else:
911  # get from the DB the int luminosities
912  myLumiDB = getLuminosity(HOME,myRuns[0],myRuns[-1],doRunBased,opts.verbose)
914  if(opts.verbose):
915  pprint.pprint(myLumiDB)
916 
917  # start loop on samples
918  for iConf in range(len(jobName)):
919  print("This is Task n.",iConf+1,"of",len(jobName))
920 
921 
922 
923  # for hadd script
924  scripts_dir = "scripts"
925  if not os.path.exists(scripts_dir):
926  os.makedirs(scripts_dir)
927  hadd_script_file = os.path.join(scripts_dir,jobName[iConf]+"_"+opts.taskname+".sh")
928  fout = open(hadd_script_file,'w')
929 
930  output_file_list1=list()
931  output_file_list2=list()
932  output_file_list2.append("hadd ")
933 
934  inputFiles = []
936  if (to_bool(isMC[iConf]) or (not to_bool(doRunBased))):
937  if(to_bool(isMC[iConf])):
938  print("this is MC")
939  cmd = 'dasgoclient -query \'file dataset='+opts.data+ (' instance='+opts.instance+'\'' if (opts.instance is not None) else '\'')
940  s = Popen(cmd , shell=True, stdout=PIPE, stderr=PIPE)
941  out,err = s.communicate()
942  mylist = out.decode().split('\n')
943  mylist.pop()
944  #print mylist
945 
946  splitList = split(mylist,10)
947  for files in splitList:
948  inputFiles.append(files)
949  myRuns.append(str(1))
950  else:
951  print("this is DATA (not doing full run-based selection)")
952  print(runboundary[iConf])
953  cmd = 'dasgoclient -query \'file dataset='+opts.data+' run='+runboundary[iConf]+ (' instance='+opts.instance+'\'' if (opts.instance is not None) else '\'')
954  #print cmd
955  s = Popen(cmd , shell=True, stdout=PIPE, stderr=PIPE)
956  out,err = s.communicate()
957  #print(out)
958  mylist = out.decode().split('\n')
959  mylist.pop()
960  #print "len(mylist):",len(mylist)
961  print("mylist:",mylist)
962 
963  splitList = split(mylist,10)
964  for files in splitList:
965  inputFiles.append(files)
966  myRuns.append(str(runboundary[iConf]))
967 
968  myLumiDB = getLuminosity(HOME,myRuns[0],myRuns[-1],True,opts.verbose)
969 
970  else:
971  #pass
972  for element in od:
973  #print mylist[element]
974  inputFiles.append(od[element])
975  #print element,od[element]
976  #print mylist
977 
978  #print inputFiles
979 
980 
981  batchJobIds = []
982  mergedFile = None
984  if(opts.verbose):
985  print("myRuns =====>",myRuns)
986 
987  totalJobs=0
988  theBashDir=None
989  theBaseName=None
991  for jobN,theSrcFiles in enumerate(inputFiles):
992  if(opts.verbose):
993  print("JOB:",jobN,"run",myRuns[jobN],theSrcFiles)
994  else:
995  print("JOB:",jobN,"run",myRuns[jobN])
996  thejobIndex=None
997  theLumi='1'
999  #if(to_bool(isMC[iConf]) and (not to_bool(doRunBased))):
1000  if(to_bool(isMC[iConf])):
1001  thejobIndex=jobN
1002  else:
1003  if(doRunBased):
1004  thejobIndex=myRuns[jobN]
1005  else:
1006  thejobIndex=myRuns[jobN]+"_"+str(jobN)
1007 
1008  if (myRuns[jobN]) in myLumiDB:
1009  theLumi = myLumiDB[myRuns[jobN]]
1010  else:
1011  print("=====> COULD NOT FIND LUMI, setting default = 1/pb")
1012  theLumi='1'
1013  print("int. lumi:",theLumi,"/pb")
1014 
1015  #print 'the configuration is:',iConf,' theJobIndex is:',thejobIndex
1016  #print applyBOWS[iConf],applyEXTRACOND[iConf],conditions[iConf]
1017 
1018  runInfo = {}
1019  runInfo["run"] = myRuns[jobN]
1020  #runInfo["runevents"] = getNEvents(myRuns[jobN],opts.data)
1021  runInfo["conf"] = jobName[iConf]
1022  runInfo["gt"] = gt[iConf]
1023  runInfo["allFromGT"] = allFromGT[iConf]
1024  runInfo["alignmentDB"] = alignmentDB[iConf]
1025  runInfo["alignmentTag"] = alignmentTAG[iConf]
1026  runInfo["apeDB"] = apeDB[iConf]
1027  runInfo["apeTag"] = apeTAG[iConf]
1028  runInfo["applyBows"] = applyBOWS[iConf]
1029  runInfo["bowDB"] = bowDB[iConf]
1030  runInfo["bowTag"] = bowTAG[iConf]
1031  runInfo["ptCut"] = ptcut[iConf]
1032  runInfo["lumilist"] = lumilist[iConf]
1033  runInfo["applyEXTRACOND"] = applyEXTRACOND[iConf]
1034  runInfo["conditions"] = conditions[iConf]
1035  runInfo["nfiles"] = len(theSrcFiles)
1036  runInfo["srcFiles"] = theSrcFiles
1037  runInfo["intLumi"] = theLumi
1038 
1039  updateDB(((iConf+1)*10)+(jobN+1),runInfo)
1040 
1041  totalJobs=totalJobs+1
1042 
1043  aJob = Job(opts.data,
1044  jobN,
1045  thejobIndex,
1046  jobName[iConf],isDA[iConf],isMC[iConf],
1047  applyBOWS[iConf],applyEXTRACOND[iConf],conditions[iConf],
1048  myRuns[jobN], lumilist[iConf], theLumi, maxevents[iConf],
1049  gt[iConf],allFromGT[iConf],
1050  alignmentDB[iConf], alignmentTAG[iConf],
1051  apeDB[iConf], apeTAG[iConf],
1052  bowDB[iConf], bowTAG[iConf],
1053  vertextype[iConf], tracktype[iConf],
1054  refittertype[iConf], ttrhtype[iConf],
1055  applyruncontrol[iConf],
1056  ptcut[iConf],input_CMSSW_BASE,'.')
1057 
1058  aJob.setEOSout(eosdir)
1059  aJob.createTheCfgFile(theSrcFiles)
1060  aJob.createTheBashFile()
1061 
1062  output_file_list1.append("xrdcp root://eoscms//eos/cms"+aJob.getOutputFileName()+" /tmp/$USER/"+opts.taskname+" \n")
1063  if jobN == 0:
1064  theBashDir=aJob.BASH_dir
1065  theBaseName=aJob.getOutputBaseNameWithData()
1066  mergedFile = "/tmp/$USER/"+opts.taskname+"/"+aJob.getOutputBaseName()+" "+opts.taskname+".root"
1067  output_file_list2.append("/tmp/$USER/"+opts.taskname+"/"+aJob.getOutputBaseName()+opts.taskname+".root ")
1068  output_file_list2.append("/tmp/$USER/"+opts.taskname+"/"+os.path.split(aJob.getOutputFileName())[1]+" ")
1069  del aJob
1070 
1071  job_submit_file = write_HTCondor_submit_file(theBashDir,theBaseName,totalJobs,None)
1072 
1073  if opts.submit:
1074  os.system("chmod u+x "+theBashDir+"/*.sh")
1075  submissionCommand = "condor_submit "+job_submit_file
1076  submissionOutput = getCommandOutput(submissionCommand)
1077  print(submissionOutput)
1078 
1079  fout.write("#!/bin/bash \n")
1080  fout.write("MAIL=$USER@mail.cern.ch \n")
1081  fout.write("OUT_DIR="+eosdir+"\n")
1082  fout.write("FILE="+str(mergedFile)+"\n")
1083  fout.write("echo $HOST | mail -s \"Harvesting job started\" $USER@mail.cern.ch \n")
1084  fout.write("cd "+os.path.join(input_CMSSW_BASE,"src")+"\n")
1085  fout.write("eval `scram r -sh` \n")
1086  fout.write("mkdir -p /tmp/$USER/"+opts.taskname+" \n")
1087  fout.writelines(output_file_list1)
1088  fout.writelines(output_file_list2)
1089  fout.write("\n")
1090  fout.write("echo \"xrdcp -f $FILE root://eoscms//eos/cms$OUT_DIR\" \n")
1091  fout.write("xrdcp -f $FILE root://eoscms//eos/cms$OUT_DIR \n")
1092  fout.write("echo \"Harvesting for "+opts.taskname+" task is complete; please find output at $OUT_DIR \" | mail -s \"Harvesting for " +opts.taskname +" completed\" $MAIL \n")
1093 
1094  os.system("chmod u+x "+hadd_script_file)
1095 
1096  harvest_conditions = '"' + " && ".join(["ended(" + jobId + ")" for jobId in batchJobIds]) + '"'
1097  print(harvest_conditions)
1098  lastJobCommand = "bsub -o harvester"+opts.taskname+".tmp -q 1nh -w "+harvest_conditions+" "+hadd_script_file
1099  print(lastJobCommand)
1100  if opts.submit:
1101  lastJobOutput = getCommandOutput(lastJobCommand)
1102  print(lastJobOutput)
1103 
1104  fout.close()
1105  del output_file_list1
1106 
1107 
1108 if __name__ == "__main__":
1109  main()
1110 
1111 
1112 
1113 
— Classes —############################
const bool isValid(const Frame &aFrame, const FrameQuality &aQuality, const uint16_t aExpectedPos)
def getLuminosity(homedir, minRun, maxRun, isRunBased, verbose)
def __init__(self, dataset, job_number, job_id, job_name, isDA, isMC, applyBOWS, applyEXTRACOND, extraconditions, runboundary, lumilist, intlumi, maxevents, gt, allFromGT, alignmentDB, alignmentTAG, apeDB, apeTAG, bowDB, bowTAG, vertextype, tracktype, refittertype, ttrhtype, applyruncontrol, ptcut, CMSSW_dir, the_dir)
def replace(string, replacements)
def getResultingSection(self, section, defaultDict={}, demandPars=[])
def mkdir_eos(out_path)
method to create recursively directories on EOS #############
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
def ConfigSectionMap(config, section)
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
static std::string join(char **cmd)
Definition: RemoteFile.cc:19
def write_HTCondor_submit_file(path, name, nruns, proxy_path=None)
Definition: main.py:1
#define str(s)
def setEOSout(self, theEOSdir)