3 '''Script that submits CMS Tracker Alignment Primary Vertex Validation workflows, 6 submitPVValidationJobs.py -j TEST -D /HLTPhysics/Run2016C-TkAlMinBias-07Dec2018-v1/ALCARECO -i testPVValidation_Relvals_DATA.ini -r 9 from __future__
import print_function
10 from builtins
import range
12 __author__ =
'Marco Musich' 13 __copyright__ =
'Copyright 2020, CERN CMS' 14 __credits__ = [
'Ernesto Migliore',
'Salvatore Di Guida']
15 __license__ =
'Unknown' 16 __maintainer__ =
'Marco Musich' 17 __email__ =
'marco.musich@cern.ch' 25 import configparser
as ConfigParser
29 from optparse
import OptionParser
30 from subprocess
import Popen, PIPE
34 import multiprocessing
41 CopyRights =
'##################################\n' 42 CopyRights +=
'# submitPVValidationJobs.py #\n' 43 CopyRights +=
'# marco.musich@cern.ch #\n' 44 CopyRights +=
'# April 2020 #\n' 45 CopyRights +=
'##################################\n' 50 """Check if GRID proxy has been initialized.""" 53 with open(os.devnull,
"w")
as dump:
54 subprocess.check_call([
"voms-proxy-info",
"--exists"],
55 stdout = dump, stderr = dump)
56 except subprocess.CalledProcessError:
63 """Forward proxy to location visible from the batch system. 65 - `rundir`: directory for storing the forwarded proxy 69 print(
"Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
72 local_proxy = subprocess.check_output([
"voms-proxy-info",
"--path"]).
strip()
73 shutil.copyfile(local_proxy, os.path.join(rundir,
".user_proxy"))
78 """Writes 'job.submit' file in `path`. 80 - `path`: job directory 81 - `script`: script to be executed 82 - `proxy_path`: path to proxy (only used in case of requested proxy forward) 85 job_submit_template=
"""\ 87 requirements = (OpSysAndVer =?= "CentOS7") 88 executable = {script:s} 89 output = {jobm:s}/{out:s}.out 90 error = {jobm:s}/{out:s}.err 91 log = {jobm:s}/{out:s}.log 92 transfer_output_files = "" 93 +JobFlavour = "{flavour:s}" 96 if proxy_path
is not None:
97 job_submit_template +=
"""\ 98 +x509userproxy = "{proxy:s}" 101 job_submit_file = os.path.join(path,
"job_"+name+
".submit")
102 with open(job_submit_file,
"w")
as f:
103 f.write(job_submit_template.format(script = os.path.join(path,name+
"_$(ProcId).sh"),
104 out = name+
"_$(ProcId)",
105 jobm = os.path.abspath(path),
106 flavour =
"tomorrow",
110 return job_submit_file
115 """This function executes `command` and returns it output. 117 - `command`: Shell command to be invoked by this function. 119 child = os.popen(command)
123 print(
'%s failed w/ exit code %d' % (command, err))
129 cmd2 =
' dasgoclient -limit=0 -query \'file run='+blob[0][0]+
' dataset='+blob[0][1]+ (
' instance='+blob[1]+
'\'' if (blob[1]
is not None)
else '\'')
131 q = Popen(cmd2 , shell=
True, stdout=PIPE, stderr=PIPE)
132 out, err = q.communicate()
134 outputList = out.decode().
split(
'\n')
141 nEvents = subprocess.check_output([
"das_client",
"--limit",
"0",
"--query",
"summary run={} dataset={} | grep summary.nevents".
format(run, dataset)])
142 return 0
if nEvents ==
"[]\n" else int(nEvents)
147 """Expects something like 148 +-------+------+--------+--------+-------------------+------------------+ 149 | nfill | nrun | nls | ncms | totdelivered(/fb) | totrecorded(/fb) | 150 +-------+------+--------+--------+-------------------+------------------+ 151 | 73 | 327 | 142418 | 138935 | 19.562 | 18.036 | 152 +-------+------+--------+--------+-------------------+------------------+ 153 And extracts the total recorded luminosity (/b). 162 output = subprocess.check_output([homedir+
"/.local/bin/brilcalc",
"lumi",
"-b",
"STABLE BEAMS",
"-u",
"/pb",
"--begin",
str(minRun),
"--end",
str(maxRun),
"--output-style",
"csv",
"-c",
"web"])
164 warnings.warn(
'ATTENTION! Impossible to query the BRIL DB!')
168 print(
"INSIDE GET LUMINOSITY")
171 for line
in output.decode().
split(
"\n"):
172 if (
"#" not in line):
173 runToCache = line.split(
",")[0].
split(
":")[0]
174 lumiToCache = line.split(
",")[-1].
replace(
"\r",
"")
177 myCachedLumi[runToCache] = lumiToCache
187 with open(jsonfile,
'r') as myJSON: jsonDATA = json.load(myJSON) 188 return (run
in jsonDATA)
190 warnings.warn(
'ATTENTION! Impossible to find lumi mask! All runs will be used.')
197 for section
in config.sections():
198 dictionary[section] = {}
199 for option
in config.options(section):
200 dictionary[section][option] = config.get(section, option)
208 Converts 'something' to boolean. Raises exception for invalid formats 209 Possible True values: 1, True, "1", "TRue", "yes", "y", "t" 210 Possible False values: 0, False, None, [], {}, "", "0", "faLse", "no", "n", "f", 0.0, ... 212 if str(value).lower()
in (
"yes",
"y",
"true",
"t",
"1"):
return True 213 if str(value).lower()
in (
"no",
"n",
"false",
"f",
"0",
"0.0",
"",
"none",
"[]",
"{}"):
return False 214 raise Exception(
'Invalid value for boolean conversion: ' +
str(value))
219 dbName =
"runInfo.pkl" 221 if os.path.exists(dbName):
222 with open(dbName,
'rb')
as f:
223 infos = pickle.load(f)
225 for f
in glob.glob(
"root-files/Run*.root"):
226 run = runFromFilename(f)
229 infos[run][
"start_time"] = getRunStartTime(run)
232 with open(dbName,
"wb")
as f:
233 pickle.dump(infos, f)
238 dbName =
"runInfo.pkl" 240 if os.path.exists(dbName):
241 with open(dbName,
'rb')
as f:
242 infos = pickle.load(f)
247 with open(dbName,
"wb")
as f:
248 pickle.dump(infos, f)
258 def exists( self, section, option):
260 items = self.items(section)
261 except ConfigParser.NoSectionError:
264 if item[0] == option:
272 for option
in self.options( section ):
273 result[option] = self.get( section, option )
274 if "local"+section.title()
in self.sections():
275 for option
in self.options(
"local"+section.title() ):
276 result[option] = self.get(
"local"+section.title(),option )
277 except ConfigParser.NoSectionError
as section:
278 msg = (
"%s in configuration files. This section is mandatory." 285 result = copy.deepcopy(defaultDict)
286 for option
in demandPars:
288 result[option] = self.get( section, option )
289 except ConfigParser.NoOptionError
as globalSectionError:
290 globalSection =
str( globalSectionError ).
split(
"'" )[-2]
291 splittedSectionName = section.split(
":" )
292 if len( splittedSectionName ) > 1:
293 localSection = (
"local"+section.split(
":" )[0].
title()+
":" 294 +section.split(
":")[1])
296 localSection = (
"local"+section.split(
":" )[0].
title())
297 if self.has_section( localSection ):
299 result[option] = self.get( localSection, option )
300 except ConfigParser.NoOptionError
as option:
301 msg = (
"%s. This option is mandatory." 304 "section '"+globalSection+
"' or", 1)))
307 msg = (
"%s. This option is mandatory." 308 %(
str(globalSectionError).
replace(
":",
"", 1)))
317 options = config.options(section)
318 for option
in options:
320 the_dict[option] = config.get(section, option)
321 if the_dict[option] == -1:
322 DebugPrint(
"skip: %s" % option)
324 print(
"exception on %s!" % option)
325 the_dict[option] =
None 332 for dir
in out_path.split(
'/'):
333 newpath=os.path.join(newpath,dir)
335 if newpath.find(
'test_out') > 0:
337 command=
"/afs/cern.ch/project/eos/installation/cms/bin/eos.select mkdir "+newpath
338 p = subprocess.Popen(command,shell=
True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
339 (out, err) = p.communicate()
344 command2=
"/afs/cern.ch/project/eos/installation/cms/bin/eos.select ls "+out_path
345 p = subprocess.Popen(command2,shell=
True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
346 (out, err) = p.communicate()
351 def split(sequence, size):
357 for i
in range(0, len(sequence), size):
358 yield sequence[i:i+size]
364 def __init__(self,dataset, job_number, job_id, job_name, isDA, isMC, applyBOWS, applyEXTRACOND, extraconditions, runboundary, lumilist, intlumi, maxevents, gt, allFromGT, alignmentDB, alignmentTAG, apeDB, apeTAG, bowDB, bowTAG, vertextype, tracktype, refittertype, ttrhtype, applyruncontrol, ptcut, CMSSW_dir ,the_dir):
367 theDataSet = dataset.split(
"/")[1]+
"_"+(dataset.split(
"/")[2]).
split(
"-")[0]
369 self.data = theDataSet
370 self.job_number = job_number
372 self.batch_job_id =
None 373 self.job_name = job_name
377 self.applyBOWS = applyBOWS
378 self.applyEXTRACOND = applyEXTRACOND
379 self.extraCondVect = extraconditions
380 self.runboundary = runboundary
381 self.lumilist = lumilist
382 self.intlumi = intlumi
383 self.maxevents = maxevents
385 self.allFromGT = allFromGT
386 self.alignmentDB = alignmentDB
387 self.alignmentTAG = alignmentTAG
392 self.vertextype = vertextype
393 self.tracktype = tracktype
394 self.refittertype = refittertype
395 self.ttrhtype = ttrhtype
396 self.applyruncontrol = applyruncontrol
400 self.CMSSW_dir=CMSSW_dir
402 self.output_full_name=self.getOutputBaseName()+
"_"+
str(self.job_id)
403 self.output_number_name=self.getOutputBaseNameWithData()+
"_"+
str(self.job_number)
406 self.outputCfgName=
None 411 self.output_LSF_name=
None 412 self.output_BASH_name=
None 422 self.OUTDIR = theEOSdir
426 return "PVValidation_"+self.job_name
430 return "PVValidation_"+self.job_name+
"_"+self.data
438 self.cfg_dir = os.path.join(self.the_dir,
"cfg")
439 if not os.path.exists(self.cfg_dir):
440 os.makedirs(self.cfg_dir)
442 self.outputCfgName=self.output_full_name+
"_cfg.py" 443 fout=open(os.path.join(self.cfg_dir,self.outputCfgName),
'w')
445 template_cfg_file = os.path.join(self.CMSSW_dir,
"src/Alignment/OfflineValidation/test",
"PVValidation_T_cfg.py")
446 file = open(template_cfg_file,
'r') 448 config_txt = '\n\n' + CopyRights +
'\n\n' 449 config_txt += file.read()
450 config_txt=config_txt.replace(
"ISDATEMPLATE",self.isDA)
451 config_txt=config_txt.replace(
"ISMCTEMPLATE",self.isMC)
452 config_txt=config_txt.replace(
"APPLYBOWSTEMPLATE",self.applyBOWS)
453 config_txt=config_txt.replace(
"EXTRACONDTEMPLATE",self.applyEXTRACOND)
454 config_txt=config_txt.replace(
"USEFILELISTTEMPLATE",
"True")
455 config_txt=config_txt.replace(
"RUNBOUNDARYTEMPLATE",self.runboundary)
456 config_txt=config_txt.replace(
"LUMILISTTEMPLATE",self.lumilist)
457 config_txt=config_txt.replace(
"MAXEVENTSTEMPLATE",self.maxevents)
458 config_txt=config_txt.replace(
"GLOBALTAGTEMPLATE",self.gt)
459 config_txt=config_txt.replace(
"ALLFROMGTTEMPLATE",self.allFromGT)
460 config_txt=config_txt.replace(
"ALIGNOBJTEMPLATE",self.alignmentDB)
461 config_txt=config_txt.replace(
"GEOMTAGTEMPLATE",self.alignmentTAG)
462 config_txt=config_txt.replace(
"APEOBJTEMPLATE",self.apeDB)
463 config_txt=config_txt.replace(
"ERRORTAGTEMPLATE",self.apeTAG)
464 config_txt=config_txt.replace(
"BOWSOBJECTTEMPLATE",self.bowDB)
465 config_txt=config_txt.replace(
"BOWSTAGTEMPLATE",self.bowTAG)
466 config_txt=config_txt.replace(
"VERTEXTYPETEMPLATE",self.vertextype)
467 config_txt=config_txt.replace(
"TRACKTYPETEMPLATE",self.tracktype)
468 config_txt=config_txt.replace(
"REFITTERTEMPLATE",self.refittertype)
469 config_txt=config_txt.replace(
"TTRHBUILDERTEMPLATE",self.ttrhtype)
470 config_txt=config_txt.replace(
"PTCUTTEMPLATE",self.ptcut)
471 config_txt=config_txt.replace(
"INTLUMITEMPLATE",self.intlumi)
472 config_txt=config_txt.replace(
"RUNCONTROLTEMPLATE",self.applyruncontrol)
473 lfn_with_quotes =
map(
lambda x:
"\'"+x+
"\'",lfn)
474 config_txt=config_txt.replace(
"FILESOURCETEMPLATE",
"["+
",".
join(lfn_with_quotes)+
"]")
475 config_txt=config_txt.replace(
"OUTFILETEMPLATE",self.output_full_name+
".root")
479 for element
in self.extraCondVect :
480 if(
"Rcd" in element):
481 params = self.extraCondVect[element].
split(
',')
483 process.conditionsIn{record} = CalibTracker.Configuration.Common.PoolDBESSource_cfi.poolDBESSource.clone( 484 connect = cms.string('{database}'), 485 toGet = cms.VPSet(cms.PSet(record = cms.string('{record}'), 486 tag = cms.string('{tag}'), 487 label = cms.untracked.string('{label}') 491 process.prefer_conditionsIn{record} = cms.ESPrefer("PoolDBESSource", "conditionsIn{record}") 492 '''.
format(record = element, database = params[0], tag = params[1], label = (params[2]
if len(params)>2
else ''))
495 if(self.applyEXTRACOND==
"True"):
496 if not self.extraCondVect:
497 raise Exception(
'Requested extra conditions, but none provided')
499 config_txt=config_txt.replace(
"END OF EXTRA CONDITIONS",textToWrite)
501 print(
"INFO: Will not apply any extra conditions")
504 fout.write(config_txt)
513 self.LSF_dir = os.path.join(self.the_dir,
"LSF")
514 if not os.path.exists(self.LSF_dir):
515 os.makedirs(self.LSF_dir)
517 self.output_LSF_name=self.output_full_name+
".lsf" 518 fout=open(os.path.join(self.LSF_dir,self.output_LSF_name),
'w')
520 job_name = self.output_full_name
522 log_dir = os.path.join(self.the_dir,
"log")
523 if not os.path.exists(log_dir):
526 fout.write(
"#!/bin/sh \n")
527 fout.write(
"#BSUB -L /bin/sh\n")
528 fout.write(
"#BSUB -J "+job_name+
"\n")
529 fout.write(
"#BSUB -o "+os.path.join(log_dir,job_name+
".log")+
"\n")
530 fout.write(
"#BSUB -q cmscaf1nd \n")
531 fout.write(
"JobName="+job_name+
" \n")
532 fout.write(
"OUT_DIR="+self.OUTDIR+
" \n")
533 fout.write(
"LXBATCH_DIR=`pwd` \n")
534 fout.write(
"cd "+os.path.join(self.CMSSW_dir,
"src")+
" \n")
535 fout.write(
"eval `scram runtime -sh` \n")
536 fout.write(
"cd $LXBATCH_DIR \n")
537 fout.write(
"cmsRun "+os.path.join(self.cfg_dir,self.outputCfgName)+
" \n")
538 fout.write(
"ls -lh . \n")
539 fout.write(
"for RootOutputFile in $(ls *root ); do xrdcp -f ${RootOutputFile} root://eoscms//eos/cms${OUT_DIR}/${RootOutputFile} ; done \n")
540 fout.write(
"for TxtOutputFile in $(ls *txt ); do xrdcp -f ${TxtOutputFile} root://eoscms//eos/cms${OUT_DIR}/${TxtOutputFile} ; done \n")
549 self.BASH_dir = os.path.join(self.the_dir,
"BASH")
550 if not os.path.exists(self.BASH_dir):
551 os.makedirs(self.BASH_dir)
553 self.output_BASH_name=self.output_number_name+
".sh" 554 fout=open(os.path.join(self.BASH_dir,self.output_BASH_name),
'w')
556 job_name = self.output_full_name
558 log_dir = os.path.join(self.the_dir,
"log")
559 if not os.path.exists(log_dir):
562 fout.write(
"#!/bin/bash \n")
564 fout.write(
"JobName="+job_name+
" \n")
565 fout.write(
"echo \"Job started at \" `date` \n")
566 fout.write(
"CMSSW_DIR="+os.path.join(self.CMSSW_dir,
"src")+
" \n")
567 fout.write(
"export X509_USER_PROXY=$CMSSW_DIR/Alignment/OfflineValidation/test/.user_proxy \n")
568 fout.write(
"OUT_DIR="+self.OUTDIR+
" \n")
569 fout.write(
"LXBATCH_DIR=$PWD \n")
571 fout.write(
"cd ${CMSSW_DIR} \n")
572 fout.write(
"eval `scramv1 runtime -sh` \n")
573 fout.write(
"echo \"batch dir: $LXBATCH_DIR release: $CMSSW_DIR release base: $CMSSW_RELEASE_BASE\" \n")
574 fout.write(
"cd $LXBATCH_DIR \n")
575 fout.write(
"cp "+os.path.join(self.cfg_dir,self.outputCfgName)+
" . \n")
576 fout.write(
"echo \"cmsRun "+self.outputCfgName+
"\" \n")
577 fout.write(
"cmsRun "+self.outputCfgName+
" \n")
578 fout.write(
"echo \"Content of working dir is \"`ls -lh` \n")
580 fout.write(
"for RootOutputFile in $(ls *root ); do xrdcp -f ${RootOutputFile} root://eoscms//eos/cms${OUT_DIR}/${RootOutputFile} ; done \n")
582 fout.write(
"echo \"Job ended at \" `date` \n")
583 fout.write(
"exit 0 \n")
589 return os.path.join(self.OUTDIR,self.output_full_name+
".root")
593 print(
"submit job", self.job_id)
594 job_name = self.output_full_name
595 submitcommand1 =
"chmod u+x " + os.path.join(self.LSF_dir,self.output_LSF_name)
596 child1 = os.system(submitcommand1)
599 self.batch_job_id =
getCommandOutput(
"bsub < "+os.path.join(self.LSF_dir,self.output_LSF_name))
603 return self.batch_job_id.
split(
"<")[1].
split(
">")[0]
611 print(
"Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
618 print(
'\n'+CopyRights)
620 HOME = os.environ.get(
'HOME')
623 input_CMSSW_BASE = os.environ.get(
'CMSSW_BASE')
624 lib_path = os.path.abspath(os.path.join(input_CMSSW_BASE,
"src/Alignment/OfflineValidation/test"))
625 sys.path.append(lib_path)
630 desc=
"""This is a description of %prog.""" 631 parser = OptionParser(description=desc,version=
'%prog version 0.1')
632 parser.add_option(
'-s',
'--submit', help=
'job submitted', dest=
'submit', action=
'store_true', default=
False)
633 parser.add_option(
'-j',
'--jobname', help=
'task name', dest=
'taskname', action=
'store', default=
'myTask')
634 parser.add_option(
'-D',
'--dataset', help=
'selected dataset', dest=
'data', action=
'store', default=
'')
635 parser.add_option(
'-r',
'--doRunBased',help=
'selected dataset', dest=
'doRunBased', action=
'store_true' , default=
False)
636 parser.add_option(
'-i',
'--input', help=
'set input configuration (overrides default)', dest=
'inputconfig',action=
'store',default=
None)
637 parser.add_option(
'-b',
'--begin', help=
'starting point', dest=
'start', action=
'store', default=
'1')
638 parser.add_option(
'-e',
'--end', help=
'ending point', dest=
'end', action=
'store', default=
'999999')
639 parser.add_option(
'-v',
'--verbose', help=
'verbose output', dest=
'verbose', action=
'store_true', default=
False)
640 parser.add_option(
'-u',
'--unitTest', help=
'unit tests?', dest=
'isUnitTest', action=
'store_true', default=
False)
641 parser.add_option(
'-I',
'--instance', help=
'DAS instance to use', dest=
'instance', action=
'store', default=
None)
642 (opts, args) = parser.parse_args()
644 now = datetime.datetime.now()
652 USER = os.environ.get(
'USER')
653 eosdir=os.path.join(
"/store/group/alca_trackeralign",USER,
"test_out",t)
658 print(
"Not going to create EOS folder. -s option has not been chosen")
691 ConfigFile = opts.inputconfig
693 if ConfigFile
is not None:
695 print(
"********************************************************")
696 print(
"* Parsing from input file:", ConfigFile,
" ")
699 config.read(ConfigFile)
701 print(
"Parsed the following configuration \n\n")
703 pprint.pprint(inputDict)
706 raise SystemExit(
"\n\n ERROR! Could not parse any input file, perhaps you are submitting this from the wrong folder? \n\n")
713 doRunBased = opts.doRunBased
715 listOfValidations = config.getResultingSection(
"validations")
717 for item
in listOfValidations:
718 if (
bool(listOfValidations[item]) ==
True):
727 applyEXTRACOND.append(
ConfigSectionMap(config,
"Conditions:"+item)[
'applyextracond'])
728 conditions.append(config.getResultingSection(
"ExtraConditions"))
730 alignmentDB.append(
ConfigSectionMap(config,
"Conditions:"+item)[
'alignmentdb'])
731 alignmentTAG.append(
ConfigSectionMap(config,
"Conditions:"+item)[
'alignmenttag'])
743 if(config.exists(
"Refit",
"refittertype")):
746 refittertype.append(
str(RefitType.COMMON))
748 if(config.exists(
"Refit",
"ttrhtype")):
751 ttrhtype.append(
"WithAngleAndTemplate")
753 applyruncontrol.append(
ConfigSectionMap(config,
"Selection")[
'applyruncontrol'])
759 print(
"********************************************************")
760 print(
"* Parsing from command line *")
761 print(
"********************************************************")
763 jobName = [
'testing']
766 doRunBased = opts.doRunBased
767 maxevents = [
'10000']
769 gt = [
'74X_dataRun2_Prompt_v4']
770 allFromGT = [
'False']
771 applyEXTRACOND = [
'False']
772 conditions = [[(
'SiPixelTemplateDBObjectRcd',
'frontier://FrontierProd/CMS_CONDITIONS',
'SiPixelTemplateDBObject_38T_2015_v3_hltvalidation')]]
773 alignmentDB = [
'frontier://FrontierProd/CMS_CONDITIONS']
774 alignmentTAG = [
'TrackerAlignment_Prompt']
775 apeDB = [
'frontier://FrontierProd/CMS_CONDITIONS']
776 apeTAG = [
'TrackerAlignmentExtendedErr_2009_v2_express_IOVs']
778 bowDB = [
'frontier://FrontierProd/CMS_CONDITIONS']
779 bowTAG = [
'TrackerSurafceDeformations_v1_express']
781 vertextype = [
'offlinePrimaryVertices']
782 tracktype = [
'ALCARECOTkAlMinBias']
784 applyruncontrol = [
'False']
791 print(
"********************************************************")
792 print(
"* Configuration info *")
793 print(
"********************************************************")
794 print(
"- submitted : ",opts.submit)
795 print(
"- taskname : ",opts.taskname)
796 print(
"- Jobname : ",jobName)
797 print(
"- use DA : ",isDA)
798 print(
"- is MC : ",isMC)
799 print(
"- is run-based: ",doRunBased)
800 print(
"- evts/job : ",maxevents)
801 print(
"- GlobatTag : ",gt)
802 print(
"- allFromGT? : ",allFromGT)
803 print(
"- extraCond? : ",applyEXTRACOND)
804 print(
"- extraCond : ",conditions)
805 print(
"- Align db : ",alignmentDB)
806 print(
"- Align tag : ",alignmentTAG)
807 print(
"- APE db : ",apeDB)
808 print(
"- APE tag : ",apeTAG)
809 print(
"- use bows? : ",applyBOWS)
810 print(
"- K&B db : ",bowDB)
811 print(
"- K&B tag : ",bowTAG)
812 print(
"- VertexColl : ",vertextype)
813 print(
"- TrackColl : ",tracktype)
814 print(
"- RefitterSeq : ",refittertype)
815 print(
"- TTRHBuilder : ",ttrhtype)
816 print(
"- RunControl? : ",applyruncontrol)
817 print(
"- Pt> ",ptcut)
818 print(
"- run= ",runboundary)
819 print(
"- JSON : ",lumilist)
820 print(
"- Out Dir : ",eosdir)
822 print(
"********************************************************")
823 print(
"Will run on",len(jobName),
"workflows")
829 print(
">>>> This is Data!")
830 print(
">>>> Doing run based selection")
831 cmd =
'dasgoclient -limit=0 -query \'run dataset='+opts.data + (
' instance='+opts.instance+
'\'' if (opts.instance
is not None)
else '\'')
832 p = Popen(cmd , shell=
True, stdout=PIPE, stderr=PIPE)
833 out, err = p.communicate()
835 listOfRuns=out.decode().
split(
"\n")
838 print(
"Will run on ",len(listOfRuns),
"runs: \n",listOfRuns)
842 print(
"first run:",opts.start,
"last run:",opts.end)
844 for run
in listOfRuns:
845 if (
int(run)<
int(opts.start)
or int(run)>
int(opts.end)):
846 print(
"excluding",run)
853 print(
"'======> taking",run)
856 mytuple.append((run,opts.data))
860 instances=[opts.instance
for entry
in mytuple]
861 pool = multiprocessing.Pool(processes=20)
862 count = pool.map(getFilesForRun,
zip(mytuple,instances))
863 file_info = dict(
zip(listOfRuns, count))
867 for run
in listOfRuns:
868 if (
int(run)<
int(opts.start)
or int(run)>
int(opts.end)):
869 print(
'rejecting run',run,
' becasue outside of boundaries')
873 print(
'rejecting run',run,
' becasue outside not in JSON')
896 od = collections.OrderedDict(sorted(file_info.items()))
904 print(
"|| WARNING: won't run on any run, probably DAS returned an empty query,\n|| but that's fine because this is a unit test!")
909 raise Exception(
'Will not run on any run.... please check again the configuration')
912 myLumiDB =
getLuminosity(HOME,myRuns[0],myRuns[-1],doRunBased,opts.verbose)
915 pprint.pprint(myLumiDB)
918 for iConf
in range(len(jobName)):
919 print(
"This is Task n.",iConf+1,
"of",len(jobName))
924 scripts_dir =
"scripts" 925 if not os.path.exists(scripts_dir):
926 os.makedirs(scripts_dir)
927 hadd_script_file = os.path.join(scripts_dir,jobName[iConf]+
"_"+opts.taskname+
".sh")
928 fout = open(hadd_script_file,
'w')
930 output_file_list1=list()
931 output_file_list2=list()
932 output_file_list2.append(
"hadd ")
939 cmd =
'dasgoclient -query \'file dataset='+opts.data+ (
' instance='+opts.instance+
'\'' if (opts.instance
is not None)
else '\'')
940 s = Popen(cmd , shell=
True, stdout=PIPE, stderr=PIPE)
941 out,err = s.communicate()
942 mylist = out.decode().
split(
'\n')
946 splitList =
split(mylist,10)
947 for files
in splitList:
948 inputFiles.append(files)
949 myRuns.append(
str(1))
951 print(
"this is DATA (not doing full run-based selection)")
952 print(runboundary[iConf])
953 cmd =
'dasgoclient -query \'file dataset='+opts.data+
' run='+runboundary[iConf]+ (
' instance='+opts.instance+
'\'' if (opts.instance
is not None)
else '\'')
955 s = Popen(cmd , shell=
True, stdout=PIPE, stderr=PIPE)
956 out,err = s.communicate()
958 mylist = out.decode().
split(
'\n')
961 print(
"mylist:",mylist)
963 splitList =
split(mylist,10)
964 for files
in splitList:
965 inputFiles.append(files)
966 myRuns.append(
str(runboundary[iConf]))
968 myLumiDB =
getLuminosity(HOME,myRuns[0],myRuns[-1],
True,opts.verbose)
974 inputFiles.append(od[element])
985 print(
"myRuns =====>",myRuns)
991 for jobN,theSrcFiles
in enumerate(inputFiles):
993 print(
"JOB:",jobN,
"run",myRuns[jobN],theSrcFiles)
995 print(
"JOB:",jobN,
"run",myRuns[jobN])
1004 thejobIndex=myRuns[jobN]
1006 thejobIndex=myRuns[jobN]+
"_"+
str(jobN)
1008 if (myRuns[jobN])
in myLumiDB:
1009 theLumi = myLumiDB[myRuns[jobN]]
1011 print(
"=====> COULD NOT FIND LUMI, setting default = 1/pb")
1013 print(
"int. lumi:",theLumi,
"/pb")
1019 runInfo[
"run"] = myRuns[jobN]
1021 runInfo[
"conf"] = jobName[iConf]
1022 runInfo[
"gt"] = gt[iConf]
1023 runInfo[
"allFromGT"] = allFromGT[iConf]
1024 runInfo[
"alignmentDB"] = alignmentDB[iConf]
1025 runInfo[
"alignmentTag"] = alignmentTAG[iConf]
1026 runInfo[
"apeDB"] = apeDB[iConf]
1027 runInfo[
"apeTag"] = apeTAG[iConf]
1028 runInfo[
"applyBows"] = applyBOWS[iConf]
1029 runInfo[
"bowDB"] = bowDB[iConf]
1030 runInfo[
"bowTag"] = bowTAG[iConf]
1031 runInfo[
"ptCut"] = ptcut[iConf]
1032 runInfo[
"lumilist"] = lumilist[iConf]
1033 runInfo[
"applyEXTRACOND"] = applyEXTRACOND[iConf]
1034 runInfo[
"conditions"] = conditions[iConf]
1035 runInfo[
"nfiles"] = len(theSrcFiles)
1036 runInfo[
"srcFiles"] = theSrcFiles
1037 runInfo[
"intLumi"] = theLumi
1039 updateDB(((iConf+1)*10)+(jobN+1),runInfo)
1041 totalJobs=totalJobs+1
1043 aJob =
Job(opts.data,
1046 jobName[iConf],isDA[iConf],isMC[iConf],
1047 applyBOWS[iConf],applyEXTRACOND[iConf],conditions[iConf],
1048 myRuns[jobN], lumilist[iConf], theLumi, maxevents[iConf],
1049 gt[iConf],allFromGT[iConf],
1050 alignmentDB[iConf], alignmentTAG[iConf],
1051 apeDB[iConf], apeTAG[iConf],
1052 bowDB[iConf], bowTAG[iConf],
1053 vertextype[iConf], tracktype[iConf],
1054 refittertype[iConf], ttrhtype[iConf],
1055 applyruncontrol[iConf],
1056 ptcut[iConf],input_CMSSW_BASE,
'.')
1058 aJob.setEOSout(eosdir)
1059 aJob.createTheCfgFile(theSrcFiles)
1060 aJob.createTheBashFile()
1062 output_file_list1.append(
"xrdcp root://eoscms//eos/cms"+aJob.getOutputFileName()+
" /tmp/$USER/"+opts.taskname+
" \n")
1064 theBashDir=aJob.BASH_dir
1065 theBaseName=aJob.getOutputBaseNameWithData()
1066 mergedFile =
"/tmp/$USER/"+opts.taskname+
"/"+aJob.getOutputBaseName()+
" "+opts.taskname+
".root" 1067 output_file_list2.append(
"/tmp/$USER/"+opts.taskname+
"/"+aJob.getOutputBaseName()+opts.taskname+
".root ")
1068 output_file_list2.append(
"/tmp/$USER/"+opts.taskname+
"/"+os.path.split(aJob.getOutputFileName())[1]+
" ")
1074 os.system(
"chmod u+x "+theBashDir+
"/*.sh")
1075 submissionCommand =
"condor_submit "+job_submit_file
1079 fout.write(
"#!/bin/bash \n")
1080 fout.write(
"MAIL=$USER@mail.cern.ch \n")
1081 fout.write(
"OUT_DIR="+eosdir+
"\n")
1082 fout.write(
"FILE="+
str(mergedFile)+
"\n")
1083 fout.write(
"echo $HOST | mail -s \"Harvesting job started\" $USER@mail.cern.ch \n")
1084 fout.write(
"cd "+os.path.join(input_CMSSW_BASE,
"src")+
"\n")
1085 fout.write(
"eval `scram r -sh` \n")
1086 fout.write(
"mkdir -p /tmp/$USER/"+opts.taskname+
" \n")
1087 fout.writelines(output_file_list1)
1088 fout.writelines(output_file_list2)
1090 fout.write(
"echo \"xrdcp -f $FILE root://eoscms//eos/cms$OUT_DIR\" \n")
1091 fout.write(
"xrdcp -f $FILE root://eoscms//eos/cms$OUT_DIR \n")
1092 fout.write(
"echo \"Harvesting for "+opts.taskname+
" task is complete; please find output at $OUT_DIR \" | mail -s \"Harvesting for " +opts.taskname +
" completed\" $MAIL \n")
1094 os.system(
"chmod u+x "+hadd_script_file)
1096 harvest_conditions =
'"' +
" && ".
join([
"ended(" + jobId +
")" for jobId
in batchJobIds]) +
'"' 1098 lastJobCommand =
"bsub -o harvester"+opts.taskname+
".tmp -q 1nh -w "+harvest_conditions+
" "+hadd_script_file
1105 del output_file_list1
1108 if __name__ ==
"__main__":
1113 def forward_proxy(rundir)
def isInJSON(run, jsonfile)
— Classes —############################
const bool isValid(const Frame &aFrame, const FrameQuality &aQuality, const uint16_t aExpectedPos)
def getLuminosity(homedir, minRun, maxRun, isRunBased, verbose)
def getNEvents(run, dataset)
ALPAKA_FN_HOST_ACC ALPAKA_FN_INLINE constexpr float zip(ConstView const &tracks, int32_t i)
def __init__(self, dataset, job_number, job_id, job_name, isDA, isMC, applyBOWS, applyEXTRACOND, extraconditions, runboundary, lumilist, intlumi, maxevents, gt, allFromGT, alignmentDB, alignmentTAG, apeDB, apeTAG, bowDB, bowTAG, vertextype, tracktype, refittertype, ttrhtype, applyruncontrol, ptcut, CMSSW_dir, the_dir)
def replace(string, replacements)
def getResultingSection(self, section, defaultDict={}, demandPars=[])
def createTheCfgFile(self, lfn)
def createTheLSFFile(self)
def getCommandOutput(command)
def optionxform(self, optionstr)
def mkdir_eos(out_path)
method to create recursively directories on EOS #############
def getOutputBaseName(self)
def ConfigSectionMap(config, section)
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
def __updateDict(self, dictionary, section)
def split(sequence, size)
def updateDB(run, runInfo)
static std::string join(char **cmd)
def write_HTCondor_submit_file(path, name, nruns, proxy_path=None)
def createTheBashFile(self)
def getOutputFileName(self)
def exists(self, section, option)
def getOutputBaseNameWithData(self)
if(threadIdxLocalY==0 &&threadIdxLocalX==0)
def setEOSout(self, theEOSdir)