3 '''Script that submits CMS Tracker Alignment Primary Vertex Validation workflows, 6 submitPVValidationJobs.py -j TEST -D /HLTPhysics/Run2016C-TkAlMinBias-07Dec2018-v1/ALCARECO -i testPVValidation_Relvals_DATA.ini -r 9 from __future__
import print_function
10 from builtins
import range
12 __author__ =
'Marco Musich' 13 __copyright__ =
'Copyright 2020, CERN CMS' 14 __credits__ = [
'Ernesto Migliore',
'Salvatore Di Guida']
15 __license__ =
'Unknown' 16 __maintainer__ =
'Marco Musich' 17 __email__ =
'marco.musich@cern.ch' 25 import configparser
as ConfigParser
29 from optparse
import OptionParser
30 from subprocess
import Popen, PIPE
34 import multiprocessing
41 CopyRights =
'##################################\n' 42 CopyRights +=
'# submitPVValidationJobs.py #\n' 43 CopyRights +=
'# marco.musich@cern.ch #\n' 44 CopyRights +=
'# April 2020 #\n' 45 CopyRights +=
'##################################\n' 50 """Check if GRID proxy has been initialized.""" 53 with open(os.devnull,
"w")
as dump:
54 subprocess.check_call([
"voms-proxy-info",
"--exists"],
55 stdout = dump, stderr = dump)
56 except subprocess.CalledProcessError:
63 """Forward proxy to location visible from the batch system. 65 - `rundir`: directory for storing the forwarded proxy 69 print(
"Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
72 local_proxy = subprocess.check_output([
"voms-proxy-info",
"--path"]).
strip()
73 shutil.copyfile(local_proxy, os.path.join(rundir,
".user_proxy"))
78 """Writes 'job.submit' file in `path`. 80 - `path`: job directory 81 - `script`: script to be executed 82 - `proxy_path`: path to proxy (only used in case of requested proxy forward) 85 job_submit_template=
"""\ 87 requirements = (OpSysAndVer =?= "AlmaLinux9") 88 executable = {script:s} 89 output = {jobm:s}/{out:s}.out 90 error = {jobm:s}/{out:s}.err 91 log = {jobm:s}/{out:s}.log 92 transfer_output_files = "" 93 +JobFlavour = "{flavour:s}" 96 if proxy_path
is not None:
97 job_submit_template +=
"""\ 98 +x509userproxy = "{proxy:s}" 101 job_submit_file = os.path.join(path,
"job_"+name+
".submit")
102 with open(job_submit_file,
"w")
as f:
103 f.write(job_submit_template.format(script = os.path.join(path,name+
"_$(ProcId).sh"),
104 out = name+
"_$(ProcId)",
105 jobm = os.path.abspath(logs),
106 flavour =
"tomorrow",
110 return job_submit_file
115 """This function executes `command` and returns it output. 117 - `command`: Shell command to be invoked by this function. 119 child = os.popen(command)
123 print(
'%s failed w/ exit code %d' % (command, err))
129 cmd2 =
' dasgoclient -limit=0 -query \'file run='+blob[0][0]+
' dataset='+blob[0][1]+ (
' instance='+blob[1]+
'\'' if (blob[1]
is not None)
else '\'')
131 q = Popen(cmd2 , shell=
True, stdout=PIPE, stderr=PIPE)
132 out, err = q.communicate()
134 outputList = out.decode().
split(
'\n')
141 nEvents = subprocess.check_output([
"das_client",
"--limit",
"0",
"--query",
"summary run={} dataset={} | grep summary.nevents".
format(run, dataset)])
142 return 0
if nEvents ==
"[]\n" else int(nEvents)
147 """Expects something like 148 +-------+------+--------+--------+-------------------+------------------+ 149 | nfill | nrun | nls | ncms | totdelivered(/fb) | totrecorded(/fb) | 150 +-------+------+--------+--------+-------------------+------------------+ 151 | 73 | 327 | 142418 | 138935 | 19.562 | 18.036 | 152 +-------+------+--------+--------+-------------------+------------------+ 153 And extracts the total recorded luminosity (/b). 162 output = subprocess.check_output([homedir+
"/.local/bin/brilcalc",
"lumi",
"-b",
"STABLE BEAMS",
"-u",
"/pb",
"--begin",
str(minRun),
"--end",
str(maxRun),
"--output-style",
"csv",
"-c",
"web"])
164 warnings.warn(
'ATTENTION! Impossible to query the BRIL DB!')
168 print(
"INSIDE GET LUMINOSITY")
171 for line
in output.decode().
split(
"\n"):
172 if (
"#" not in line):
173 runToCache = line.split(
",")[0].
split(
":")[0]
174 lumiToCache = line.split(
",")[-1].
replace(
"\r",
"")
177 myCachedLumi[runToCache] = lumiToCache
187 with open(jsonfile,
'r') as myJSON: jsonDATA = json.load(myJSON) 188 return (run
in jsonDATA)
190 warnings.warn(
'ATTENTION! Impossible to find lumi mask! All runs will be used.')
197 for section
in config.sections():
198 dictionary[section] = {}
199 for option
in config.options(section):
200 dictionary[section][option] = config.get(section, option)
208 Converts 'something' to boolean. Raises exception for invalid formats 209 Possible True values: 1, True, "1", "TRue", "yes", "y", "t" 210 Possible False values: 0, False, None, [], {}, "", "0", "faLse", "no", "n", "f", 0.0, ... 212 if str(value).lower()
in (
"yes",
"y",
"true",
"t",
"1"):
return True 213 if str(value).lower()
in (
"no",
"n",
"false",
"f",
"0",
"0.0",
"",
"none",
"[]",
"{}"):
return False 214 raise Exception(
'Invalid value for boolean conversion: ' +
str(value))
219 dbName =
"runInfo.pkl" 221 if os.path.exists(dbName):
222 with open(dbName,
'rb')
as f:
223 infos = pickle.load(f)
225 for f
in glob.glob(
"root-files/Run*.root"):
226 run = runFromFilename(f)
229 infos[run][
"start_time"] = getRunStartTime(run)
232 with open(dbName,
"wb")
as f:
233 pickle.dump(infos, f)
238 dbName =
"runInfo.pkl" 240 if os.path.exists(dbName):
241 with open(dbName,
'rb')
as f:
242 infos = pickle.load(f)
247 with open(dbName,
"wb")
as f:
248 pickle.dump(infos, f)
258 def exists( self, section, option):
260 items = self.items(section)
261 except ConfigParser.NoSectionError:
264 if item[0] == option:
272 for option
in self.options( section ):
273 result[option] = self.get( section, option )
274 if "local"+section.title()
in self.sections():
275 for option
in self.options(
"local"+section.title() ):
276 result[option] = self.get(
"local"+section.title(),option )
277 except ConfigParser.NoSectionError
as section:
278 msg = (
"%s in configuration files. This section is mandatory." 285 result = copy.deepcopy(defaultDict)
286 for option
in demandPars:
288 result[option] = self.get( section, option )
289 except ConfigParser.NoOptionError
as globalSectionError:
290 globalSection =
str( globalSectionError ).
split(
"'" )[-2]
291 splittedSectionName = section.split(
":" )
292 if len( splittedSectionName ) > 1:
293 localSection = (
"local"+section.split(
":" )[0].
title()+
":" 294 +section.split(
":")[1])
296 localSection = (
"local"+section.split(
":" )[0].
title())
297 if self.has_section( localSection ):
299 result[option] = self.get( localSection, option )
300 except ConfigParser.NoOptionError
as option:
301 msg = (
"%s. This option is mandatory." 304 "section '"+globalSection+
"' or", 1)))
307 msg = (
"%s. This option is mandatory." 308 %(
str(globalSectionError).
replace(
":",
"", 1)))
317 options = config.options(section)
318 for option
in options:
320 the_dict[option] = config.get(section, option)
321 if the_dict[option] == -1:
322 DebugPrint(
"skip: %s" % option)
324 print(
"exception on %s!" % option)
325 the_dict[option] =
None 330 print(
"============== creating",out_path)
332 for dir
in out_path.split(
'/'):
333 newpath=os.path.join(newpath,dir)
335 if newpath.find(
'test_out') > 0:
337 command=
"eos mkdir "+newpath
338 p = subprocess.Popen(command,shell=
True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
339 (out, err) = p.communicate()
340 print(
"============== created ",out_path)
345 command2=
"eos ls "+out_path
346 p = subprocess.Popen(command2,shell=
True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
347 (out, err) = p.communicate()
352 def split(sequence, size):
358 for i
in range(0, len(sequence), size):
359 yield sequence[i:i+size]
365 def __init__(self,dataset, job_number, job_id, job_name, isDA, isMC, applyBOWS, applyEXTRACOND, extraconditions, runboundary, lumilist, intlumi, maxevents, gt, allFromGT, alignmentDB, alignmentTAG, apeDB, apeTAG, bowDB, bowTAG, vertextype, tracktype, refittertype, ttrhtype, applyruncontrol, ptcut, CMSSW_dir ,the_dir):
368 theDataSet = dataset.split(
"/")[1]+
"_"+(dataset.split(
"/")[2]).
split(
"-")[0]
370 self.data = theDataSet
371 self.job_number = job_number
373 self.batch_job_id =
None 374 self.job_name = job_name
378 self.applyBOWS = applyBOWS
379 self.applyEXTRACOND = applyEXTRACOND
380 self.extraCondVect = extraconditions
381 self.runboundary = runboundary
382 self.lumilist = lumilist
383 self.intlumi = intlumi
384 self.maxevents = maxevents
386 self.allFromGT = allFromGT
387 self.alignmentDB = alignmentDB
388 self.alignmentTAG = alignmentTAG
393 self.vertextype = vertextype
394 self.tracktype = tracktype
395 self.refittertype = refittertype
396 self.ttrhtype = ttrhtype
397 self.applyruncontrol = applyruncontrol
401 self.CMSSW_dir=CMSSW_dir
403 self.output_full_name=self.getOutputBaseName()+
"_"+
str(self.job_id)
404 self.output_number_name=self.getOutputBaseNameWithData()+
"_"+
str(self.job_number)
407 self.outputCfgName=
None 412 self.output_LSF_name=
None 413 self.output_BASH_name=
None 423 self.OUTDIR = theEOSdir
427 return "PVValidation_"+self.job_name
431 return "PVValidation_"+self.job_name+
"_"+self.data
439 self.cfg_dir = os.path.join(self.the_dir,
"cfg")
440 if not os.path.exists(self.cfg_dir):
441 os.makedirs(self.cfg_dir)
443 self.outputCfgName=self.output_full_name+
"_cfg.py" 444 fout=open(os.path.join(self.cfg_dir,self.outputCfgName),
'w')
446 template_cfg_file = os.path.join(self.CMSSW_dir,
"src/Alignment/OfflineValidation/test",
"PVValidation_T_cfg.py")
447 file = open(template_cfg_file,
'r') 449 config_txt = '\n\n' + CopyRights +
'\n\n' 450 config_txt += file.read()
451 config_txt=config_txt.replace(
"ISDATEMPLATE",self.isDA)
452 config_txt=config_txt.replace(
"ISMCTEMPLATE",self.isMC)
453 config_txt=config_txt.replace(
"APPLYBOWSTEMPLATE",self.applyBOWS)
454 config_txt=config_txt.replace(
"EXTRACONDTEMPLATE",self.applyEXTRACOND)
455 config_txt=config_txt.replace(
"USEFILELISTTEMPLATE",
"True")
456 config_txt=config_txt.replace(
"RUNBOUNDARYTEMPLATE",self.runboundary)
457 config_txt=config_txt.replace(
"LUMILISTTEMPLATE",self.lumilist)
458 config_txt=config_txt.replace(
"MAXEVENTSTEMPLATE",self.maxevents)
459 config_txt=config_txt.replace(
"GLOBALTAGTEMPLATE",self.gt)
460 config_txt=config_txt.replace(
"ALLFROMGTTEMPLATE",self.allFromGT)
461 config_txt=config_txt.replace(
"ALIGNOBJTEMPLATE",self.alignmentDB)
462 config_txt=config_txt.replace(
"GEOMTAGTEMPLATE",self.alignmentTAG)
463 config_txt=config_txt.replace(
"APEOBJTEMPLATE",self.apeDB)
464 config_txt=config_txt.replace(
"ERRORTAGTEMPLATE",self.apeTAG)
465 config_txt=config_txt.replace(
"BOWSOBJECTTEMPLATE",self.bowDB)
466 config_txt=config_txt.replace(
"BOWSTAGTEMPLATE",self.bowTAG)
467 config_txt=config_txt.replace(
"VERTEXTYPETEMPLATE",self.vertextype)
468 config_txt=config_txt.replace(
"TRACKTYPETEMPLATE",self.tracktype)
469 config_txt=config_txt.replace(
"REFITTERTEMPLATE",self.refittertype)
470 config_txt=config_txt.replace(
"TTRHBUILDERTEMPLATE",self.ttrhtype)
471 config_txt=config_txt.replace(
"PTCUTTEMPLATE",self.ptcut)
472 config_txt=config_txt.replace(
"INTLUMITEMPLATE",self.intlumi)
473 config_txt=config_txt.replace(
"RUNCONTROLTEMPLATE",self.applyruncontrol)
474 lfn_with_quotes =
map(
lambda x:
"\'"+x+
"\'",lfn)
475 config_txt=config_txt.replace(
"FILESOURCETEMPLATE",
"["+
",".
join(lfn_with_quotes)+
"]")
476 config_txt=config_txt.replace(
"OUTFILETEMPLATE",self.output_full_name+
".root")
480 for element
in self.extraCondVect :
481 if(
"Rcd" in element):
482 params = self.extraCondVect[element].
split(
',')
484 process.conditionsIn{record} = CalibTracker.Configuration.Common.PoolDBESSource_cfi.poolDBESSource.clone( 485 connect = cms.string('{database}'), 486 toGet = cms.VPSet(cms.PSet(record = cms.string('{record}'), 487 tag = cms.string('{tag}'), 488 label = cms.untracked.string('{label}') 492 process.prefer_conditionsIn{record} = cms.ESPrefer("PoolDBESSource", "conditionsIn{record}") 493 '''.
format(record = element, database = params[0], tag = params[1], label = (params[2]
if len(params)>2
else ''))
496 if(self.applyEXTRACOND==
"True"):
497 if not self.extraCondVect:
498 raise Exception(
'Requested extra conditions, but none provided')
500 config_txt=config_txt.replace(
"END OF EXTRA CONDITIONS",textToWrite)
502 print(
"INFO: Will not apply any extra conditions")
505 fout.write(config_txt)
514 self.LSF_dir = os.path.join(self.the_dir,
"LSF")
515 if not os.path.exists(self.LSF_dir):
516 os.makedirs(self.LSF_dir)
518 self.output_LSF_name=self.output_full_name+
".lsf" 519 fout=open(os.path.join(self.LSF_dir,self.output_LSF_name),
'w')
521 job_name = self.output_full_name
523 log_dir = os.path.join(self.the_dir,
"log")
524 if not os.path.exists(log_dir):
527 fout.write(
"#!/bin/sh \n")
528 fout.write(
"#BSUB -L /bin/sh\n")
529 fout.write(
"#BSUB -J "+job_name+
"\n")
530 fout.write(
"#BSUB -o "+os.path.join(log_dir,job_name+
".log")+
"\n")
531 fout.write(
"#BSUB -q cmscaf1nd \n")
532 fout.write(
"JobName="+job_name+
" \n")
533 fout.write(
"OUT_DIR="+self.OUTDIR+
" \n")
534 fout.write(
"LXBATCH_DIR=`pwd` \n")
535 fout.write(
"cd "+os.path.join(self.CMSSW_dir,
"src")+
" \n")
536 fout.write(
"eval `scram runtime -sh` \n")
537 fout.write(
"cd $LXBATCH_DIR \n")
538 fout.write(
"cmsRun "+os.path.join(self.cfg_dir,self.outputCfgName)+
" \n")
539 fout.write(
"ls -lh . \n")
540 fout.write(
"for RootOutputFile in $(ls *root ); do xrdcp -f ${RootOutputFile} root://eoscms//eos/cms${OUT_DIR}/${RootOutputFile} ; done \n")
541 fout.write(
"for TxtOutputFile in $(ls *txt ); do xrdcp -f ${TxtOutputFile} root://eoscms//eos/cms${OUT_DIR}/${TxtOutputFile} ; done \n")
550 self.BASH_dir = os.path.join(self.the_dir,
"BASH")
551 if not os.path.exists(self.BASH_dir):
552 os.makedirs(self.BASH_dir)
554 self.output_BASH_name=self.output_number_name+
".sh" 555 fout=open(os.path.join(self.BASH_dir,self.output_BASH_name),
'w')
557 job_name = self.output_full_name
559 fout.write(
"#!/bin/bash \n")
561 fout.write(
"JobName="+job_name+
" \n")
562 fout.write(
"echo \"Job started at \" `date` \n")
563 fout.write(
"CMSSW_DIR="+os.path.join(self.CMSSW_dir,
"src")+
" \n")
564 fout.write(
"export X509_USER_PROXY=$CMSSW_DIR/Alignment/OfflineValidation/test/.user_proxy \n")
565 fout.write(
"OUT_DIR="+self.OUTDIR+
" \n")
566 fout.write(
"LXBATCH_DIR=$PWD \n")
568 fout.write(
"cd ${CMSSW_DIR} \n")
569 fout.write(
"eval `scramv1 runtime -sh` \n")
570 fout.write(
"echo \"batch dir: $LXBATCH_DIR release: $CMSSW_DIR release base: $CMSSW_RELEASE_BASE\" \n")
571 fout.write(
"cd $LXBATCH_DIR \n")
572 fout.write(
"cp "+os.path.join(self.cfg_dir,self.outputCfgName)+
" . \n")
573 fout.write(
"echo \"cmsRun "+self.outputCfgName+
"\" \n")
574 fout.write(
"cmsRun "+self.outputCfgName+
" \n")
575 fout.write(
"echo \"Content of working dir is:\" \n")
576 fout.write(
"ls -lh | sort \n")
579 fout.write(
"for RootOutputFile in $(ls *root ); do xrdcp -f ${RootOutputFile} root://eoscms//eos/cms${OUT_DIR}/${RootOutputFile} ; done \n")
581 fout.write(
"echo \"Job ended at \" `date` \n")
582 fout.write(
"exit 0 \n")
588 return os.path.join(self.OUTDIR,self.output_full_name+
".root")
592 print(
"submit job", self.job_id)
593 job_name = self.output_full_name
594 submitcommand1 =
"chmod u+x " + os.path.join(self.LSF_dir,self.output_LSF_name)
595 child1 = os.system(submitcommand1)
598 self.batch_job_id =
getCommandOutput(
"bsub < "+os.path.join(self.LSF_dir,self.output_LSF_name))
602 return self.batch_job_id.
split(
"<")[1].
split(
">")[0]
610 print(
"Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
617 print(
'\n'+CopyRights)
619 HOME = os.environ.get(
'HOME')
622 input_CMSSW_BASE = os.environ.get(
'CMSSW_BASE')
623 lib_path = os.path.abspath(os.path.join(input_CMSSW_BASE,
"src/Alignment/OfflineValidation/test"))
624 sys.path.append(lib_path)
629 desc=
"""This is a description of %prog.""" 630 parser = OptionParser(description=desc,version=
'%prog version 0.1')
631 parser.add_option(
'-s',
'--submit', help=
'job submitted', dest=
'submit', action=
'store_true', default=
False)
632 parser.add_option(
'-j',
'--jobname', help=
'task name', dest=
'taskname', action=
'store', default=
'myTask')
633 parser.add_option(
'-D',
'--dataset', help=
'selected dataset', dest=
'data', action=
'store', default=
'')
634 parser.add_option(
'-r',
'--doRunBased',help=
'selected dataset', dest=
'doRunBased', action=
'store_true' , default=
False)
635 parser.add_option(
'-i',
'--input', help=
'set input configuration (overrides default)', dest=
'inputconfig',action=
'store',default=
None)
636 parser.add_option(
'-b',
'--begin', help=
'starting point', dest=
'start', action=
'store', default=
'1')
637 parser.add_option(
'-e',
'--end', help=
'ending point', dest=
'end', action=
'store', default=
'999999')
638 parser.add_option(
'-v',
'--verbose', help=
'verbose output', dest=
'verbose', action=
'store_true', default=
False)
639 parser.add_option(
'-u',
'--unitTest', help=
'unit tests?', dest=
'isUnitTest', action=
'store_true', default=
False)
640 parser.add_option(
'-I',
'--instance', help=
'DAS instance to use', dest=
'instance', action=
'store', default=
None)
641 (opts, args) = parser.parse_args()
643 now = datetime.datetime.now()
651 USER = os.environ.get(
'USER')
652 eosdir=os.path.join(
"/store/group/alca_trackeralign",USER,
"test_out",t)
657 print(
"Not going to create EOS folder. -s option has not been chosen")
690 ConfigFile = opts.inputconfig
692 if ConfigFile
is not None:
694 print(
"********************************************************")
695 print(
"* Parsing from input file:", ConfigFile,
" ")
698 config.read(ConfigFile)
700 print(
"Parsed the following configuration \n\n")
702 pprint.pprint(inputDict)
705 raise SystemExit(
"\n\n ERROR! Could not parse any input file, perhaps you are submitting this from the wrong folder? \n\n")
712 doRunBased = opts.doRunBased
714 listOfValidations = config.getResultingSection(
"validations")
716 for item
in listOfValidations:
717 if (
bool(listOfValidations[item]) ==
True):
726 applyEXTRACOND.append(
ConfigSectionMap(config,
"Conditions:"+item)[
'applyextracond'])
727 conditions.append(config.getResultingSection(
"ExtraConditions"))
729 alignmentDB.append(
ConfigSectionMap(config,
"Conditions:"+item)[
'alignmentdb'])
730 alignmentTAG.append(
ConfigSectionMap(config,
"Conditions:"+item)[
'alignmenttag'])
742 if(config.exists(
"Refit",
"refittertype")):
745 refittertype.append(
str(RefitType.COMMON))
747 if(config.exists(
"Refit",
"ttrhtype")):
750 ttrhtype.append(
"WithAngleAndTemplate")
752 applyruncontrol.append(
ConfigSectionMap(config,
"Selection")[
'applyruncontrol'])
758 print(
"********************************************************")
759 print(
"* Parsing from command line *")
760 print(
"********************************************************")
762 jobName = [
'testing']
765 doRunBased = opts.doRunBased
766 maxevents = [
'10000']
768 gt = [
'74X_dataRun2_Prompt_v4']
769 allFromGT = [
'False']
770 applyEXTRACOND = [
'False']
771 conditions = [[(
'SiPixelTemplateDBObjectRcd',
'frontier://FrontierProd/CMS_CONDITIONS',
'SiPixelTemplateDBObject_38T_2015_v3_hltvalidation')]]
772 alignmentDB = [
'frontier://FrontierProd/CMS_CONDITIONS']
773 alignmentTAG = [
'TrackerAlignment_Prompt']
774 apeDB = [
'frontier://FrontierProd/CMS_CONDITIONS']
775 apeTAG = [
'TrackerAlignmentExtendedErr_2009_v2_express_IOVs']
777 bowDB = [
'frontier://FrontierProd/CMS_CONDITIONS']
778 bowTAG = [
'TrackerSurafceDeformations_v1_express']
780 vertextype = [
'offlinePrimaryVertices']
781 tracktype = [
'ALCARECOTkAlMinBias']
783 applyruncontrol = [
'False']
790 print(
"********************************************************")
791 print(
"* Configuration info *")
792 print(
"********************************************************")
793 print(
"- submitted : ",opts.submit)
794 print(
"- taskname : ",opts.taskname)
795 print(
"- Jobname : ",jobName)
796 print(
"- use DA : ",isDA)
797 print(
"- is MC : ",isMC)
798 print(
"- is run-based: ",doRunBased)
799 print(
"- evts/job : ",maxevents)
800 print(
"- GlobatTag : ",gt)
801 print(
"- allFromGT? : ",allFromGT)
802 print(
"- extraCond? : ",applyEXTRACOND)
803 print(
"- extraCond : ",conditions)
804 print(
"- Align db : ",alignmentDB)
805 print(
"- Align tag : ",alignmentTAG)
806 print(
"- APE db : ",apeDB)
807 print(
"- APE tag : ",apeTAG)
808 print(
"- use bows? : ",applyBOWS)
809 print(
"- K&B db : ",bowDB)
810 print(
"- K&B tag : ",bowTAG)
811 print(
"- VertexColl : ",vertextype)
812 print(
"- TrackColl : ",tracktype)
813 print(
"- RefitterSeq : ",refittertype)
814 print(
"- TTRHBuilder : ",ttrhtype)
815 print(
"- RunControl? : ",applyruncontrol)
816 print(
"- Pt> ",ptcut)
817 print(
"- run= ",runboundary)
818 print(
"- JSON : ",lumilist)
819 print(
"- Out Dir : ",eosdir)
821 print(
"********************************************************")
822 print(
"Will run on",len(jobName),
"workflows")
828 print(
">>>> This is Data!")
829 print(
">>>> Doing run based selection")
830 cmd =
'dasgoclient -limit=0 -query \'run dataset='+opts.data + (
' instance='+opts.instance+
'\'' if (opts.instance
is not None)
else '\'')
831 p = Popen(cmd , shell=
True, stdout=PIPE, stderr=PIPE)
832 out, err = p.communicate()
834 listOfRuns=out.decode().
split(
"\n")
837 print(
"Will run on ",len(listOfRuns),
"runs: \n",listOfRuns)
841 print(
"first run:",opts.start,
"last run:",opts.end)
843 for run
in listOfRuns:
844 if (
int(run)<
int(opts.start)
or int(run)>
int(opts.end)):
845 print(
"excluding",run)
852 print(
"'======> taking",run)
855 mytuple.append((run,opts.data))
859 instances=[opts.instance
for entry
in mytuple]
860 pool = multiprocessing.Pool(processes=20)
861 count = pool.map(getFilesForRun,
zip(mytuple,instances))
862 file_info = dict(
zip(listOfRuns, count))
866 for run
in listOfRuns:
867 if (
int(run)<
int(opts.start)
or int(run)>
int(opts.end)):
868 print(
'rejecting run',run,
' becasue outside of boundaries')
872 print(
'rejecting run',run,
' becasue outside not in JSON')
895 od = collections.OrderedDict(sorted(file_info.items()))
903 print(
"|| WARNING: won't run on any run, probably DAS returned an empty query,\n|| but that's fine because this is a unit test!")
908 raise Exception(
'Will not run on any run.... please check again the configuration')
911 myLumiDB =
getLuminosity(HOME,myRuns[0],myRuns[-1],doRunBased,opts.verbose)
914 pprint.pprint(myLumiDB)
917 for iConf
in range(len(jobName)):
918 print(
"This is Task n.",iConf+1,
"of",len(jobName))
923 scripts_dir =
"scripts" 924 if not os.path.exists(scripts_dir):
925 os.makedirs(scripts_dir)
926 hadd_script_file = os.path.join(scripts_dir,jobName[iConf]+
"_"+opts.taskname+
".sh")
927 fout = open(hadd_script_file,
'w')
929 output_file_list1=list()
930 output_file_list2=list()
931 output_file_list2.append(
"hadd ")
938 cmd =
'dasgoclient -query \'file dataset='+opts.data+ (
' instance='+opts.instance+
'\'' if (opts.instance
is not None)
else '\'')
939 s = Popen(cmd , shell=
True, stdout=PIPE, stderr=PIPE)
940 out,err = s.communicate()
941 mylist = out.decode().
split(
'\n')
945 splitList =
split(mylist,10)
946 for files
in splitList:
947 inputFiles.append(files)
948 myRuns.append(
str(1))
950 print(
"this is DATA (not doing full run-based selection)")
951 print(runboundary[iConf])
952 cmd =
'dasgoclient -query \'file dataset='+opts.data+
' run='+runboundary[iConf]+ (
' instance='+opts.instance+
'\'' if (opts.instance
is not None)
else '\'')
954 s = Popen(cmd , shell=
True, stdout=PIPE, stderr=PIPE)
955 out,err = s.communicate()
957 mylist = out.decode().
split(
'\n')
960 print(
"mylist:",mylist)
962 splitList =
split(mylist,10)
963 for files
in splitList:
964 inputFiles.append(files)
965 myRuns.append(
str(runboundary[iConf]))
967 myLumiDB =
getLuminosity(HOME,myRuns[0],myRuns[-1],
True,opts.verbose)
973 inputFiles.append(od[element])
984 print(
"myRuns =====>",myRuns)
990 for jobN,theSrcFiles
in enumerate(inputFiles):
992 print(
"JOB:",jobN,
"run",myRuns[jobN],theSrcFiles)
994 print(
"JOB:",jobN,
"run",myRuns[jobN])
1003 thejobIndex=myRuns[jobN]
1005 thejobIndex=myRuns[jobN]+
"_"+
str(jobN)
1007 if (myRuns[jobN])
in myLumiDB:
1008 theLumi = myLumiDB[myRuns[jobN]]
1010 print(
"=====> COULD NOT FIND LUMI, setting default = 1/pb")
1012 print(
"int. lumi:",theLumi,
"/pb")
1018 runInfo[
"run"] = myRuns[jobN]
1020 runInfo[
"conf"] = jobName[iConf]
1021 runInfo[
"gt"] = gt[iConf]
1022 runInfo[
"allFromGT"] = allFromGT[iConf]
1023 runInfo[
"alignmentDB"] = alignmentDB[iConf]
1024 runInfo[
"alignmentTag"] = alignmentTAG[iConf]
1025 runInfo[
"apeDB"] = apeDB[iConf]
1026 runInfo[
"apeTag"] = apeTAG[iConf]
1027 runInfo[
"applyBows"] = applyBOWS[iConf]
1028 runInfo[
"bowDB"] = bowDB[iConf]
1029 runInfo[
"bowTag"] = bowTAG[iConf]
1030 runInfo[
"ptCut"] = ptcut[iConf]
1031 runInfo[
"lumilist"] = lumilist[iConf]
1032 runInfo[
"applyEXTRACOND"] = applyEXTRACOND[iConf]
1033 runInfo[
"conditions"] = conditions[iConf]
1034 runInfo[
"nfiles"] = len(theSrcFiles)
1035 runInfo[
"srcFiles"] = theSrcFiles
1036 runInfo[
"intLumi"] = theLumi
1038 updateDB(((iConf+1)*10)+(jobN+1),runInfo)
1040 totalJobs=totalJobs+1
1042 aJob =
Job(opts.data,
1045 jobName[iConf],isDA[iConf],isMC[iConf],
1046 applyBOWS[iConf],applyEXTRACOND[iConf],conditions[iConf],
1047 myRuns[jobN], lumilist[iConf], theLumi, maxevents[iConf],
1048 gt[iConf],allFromGT[iConf],
1049 alignmentDB[iConf], alignmentTAG[iConf],
1050 apeDB[iConf], apeTAG[iConf],
1051 bowDB[iConf], bowTAG[iConf],
1052 vertextype[iConf], tracktype[iConf],
1053 refittertype[iConf], ttrhtype[iConf],
1054 applyruncontrol[iConf],
1055 ptcut[iConf],input_CMSSW_BASE,os.getcwd())
1057 aJob.setEOSout(eosdir)
1058 aJob.createTheCfgFile(theSrcFiles)
1059 aJob.createTheBashFile(opts.isUnitTest)
1061 output_file_list1.append(
"xrdcp root://eoscms//eos/cms"+aJob.getOutputFileName()+
" /tmp/$USER/"+opts.taskname+
" \n")
1063 theBashDir=aJob.BASH_dir
1064 theBaseName=aJob.getOutputBaseNameWithData()
1065 mergedFile =
"/tmp/$USER/"+opts.taskname+
"/"+aJob.getOutputBaseName()+
" "+opts.taskname+
".root" 1066 output_file_list2.append(
"/tmp/$USER/"+opts.taskname+
"/"+aJob.getOutputBaseName()+opts.taskname+
".root ")
1067 output_file_list2.append(
"/tmp/$USER/"+opts.taskname+
"/"+os.path.split(aJob.getOutputFileName())[1]+
" ")
1071 theLogDir = os.path.join(os.getcwd(),
"log")
1072 if not os.path.exists(theLogDir):
1073 os.makedirs(theLogDir)
1076 os.system(
"chmod u+x "+theBashDir+
"/*.sh")
1079 submissionCommand =
"condor_submit "+job_submit_file
1083 fout.write(
"#!/bin/bash \n")
1084 fout.write(
"MAIL=$USER@mail.cern.ch \n")
1085 fout.write(
"OUT_DIR="+eosdir+
"\n")
1086 fout.write(
"FILE="+
str(mergedFile)+
"\n")
1087 fout.write(
"echo $HOST | mail -s \"Harvesting job started\" $USER@mail.cern.ch \n")
1088 fout.write(
"cd "+os.path.join(input_CMSSW_BASE,
"src")+
"\n")
1089 fout.write(
"eval `scram r -sh` \n")
1090 fout.write(
"mkdir -p /tmp/$USER/"+opts.taskname+
" \n")
1091 fout.writelines(output_file_list1)
1092 fout.writelines(output_file_list2)
1094 fout.write(
"echo \"xrdcp -f $FILE root://eoscms//eos/cms$OUT_DIR\" \n")
1095 fout.write(
"xrdcp -f $FILE root://eoscms//eos/cms$OUT_DIR \n")
1096 fout.write(
"echo \"Harvesting for "+opts.taskname+
" task is complete; please find output at $OUT_DIR \" | mail -s \"Harvesting for " +opts.taskname +
" completed\" $MAIL \n")
1098 os.system(
"chmod u+x "+hadd_script_file)
1100 harvest_conditions =
'"' +
" && ".
join([
"ended(" + jobId +
")" for jobId
in batchJobIds]) +
'"' 1102 lastJobCommand =
"bsub -o harvester"+opts.taskname+
".tmp -q 1nh -w "+harvest_conditions+
" "+hadd_script_file
1109 del output_file_list1
1112 if __name__ ==
"__main__":
1117 def write_HTCondor_submit_file(path, logs, name, nruns, proxy_path=None)
def forward_proxy(rundir)
def isInJSON(run, jsonfile)
— Classes —############################
const bool isValid(const Frame &aFrame, const FrameQuality &aQuality, const uint16_t aExpectedPos)
def getLuminosity(homedir, minRun, maxRun, isRunBased, verbose)
def getNEvents(run, dataset)
ALPAKA_FN_HOST_ACC ALPAKA_FN_INLINE constexpr float zip(ConstView const &tracks, int32_t i)
def __init__(self, dataset, job_number, job_id, job_name, isDA, isMC, applyBOWS, applyEXTRACOND, extraconditions, runboundary, lumilist, intlumi, maxevents, gt, allFromGT, alignmentDB, alignmentTAG, apeDB, apeTAG, bowDB, bowTAG, vertextype, tracktype, refittertype, ttrhtype, applyruncontrol, ptcut, CMSSW_dir, the_dir)
def replace(string, replacements)
def getResultingSection(self, section, defaultDict={}, demandPars=[])
def createTheCfgFile(self, lfn)
def createTheLSFFile(self)
def getCommandOutput(command)
def optionxform(self, optionstr)
def mkdir_eos(out_path)
method to create recursively directories on EOS #############
def getOutputBaseName(self)
def createTheBashFile(self, isUnitTest)
def ConfigSectionMap(config, section)
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
def __updateDict(self, dictionary, section)
def split(sequence, size)
def updateDB(run, runInfo)
static std::string join(char **cmd)
def getOutputFileName(self)
def exists(self, section, option)
def getOutputBaseNameWithData(self)
if(threadIdxLocalY==0 &&threadIdxLocalX==0)
def setEOSout(self, theEOSdir)