3 '''Script that submits CMS Tracker Alignment Primary Vertex Validation workflows, 6 submitPVValidationJobs.py -j TEST -D /HLTPhysics/Run2016C-TkAlMinBias-07Dec2018-v1/ALCARECO -i testPVValidation_Relvals_DATA.ini -r 9 from __future__
import print_function
10 from builtins
import range
12 __author__ =
'Marco Musich' 13 __copyright__ =
'Copyright 2020, CERN CMS' 14 __credits__ = [
'Ernesto Migliore',
'Salvatore Di Guida']
15 __license__ =
'Unknown' 16 __maintainer__ =
'Marco Musich' 17 __email__ =
'marco.musich@cern.ch' 25 import configparser
as ConfigParser
29 from optparse
import OptionParser
30 from subprocess
import Popen, PIPE
34 import multiprocessing
41 CopyRights =
'##################################\n' 42 CopyRights +=
'# submitPVValidationJobs.py #\n' 43 CopyRights +=
'# marco.musich@cern.ch #\n' 44 CopyRights +=
'# April 2020 #\n' 45 CopyRights +=
'##################################\n' 50 """Check if GRID proxy has been initialized.""" 53 with open(os.devnull,
"w")
as dump:
54 subprocess.check_call([
"voms-proxy-info",
"--exists"],
55 stdout = dump, stderr = dump)
56 except subprocess.CalledProcessError:
63 """Forward proxy to location visible from the batch system. 65 - `rundir`: directory for storing the forwarded proxy 69 print(
"Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
72 local_proxy = subprocess.check_output([
"voms-proxy-info",
"--path"]).
strip()
73 shutil.copyfile(local_proxy, os.path.join(rundir,
".user_proxy"))
78 """Writes 'job.submit' file in `path`. 80 - `path`: job directory 81 - `script`: script to be executed 82 - `proxy_path`: path to proxy (only used in case of requested proxy forward) 85 job_submit_template=
"""\ 87 requirements = (OpSysAndVer =?= "CentOS7") 88 executable = {script:s} 89 output = {jobm:s}/{out:s}.out 90 error = {jobm:s}/{out:s}.err 91 log = {jobm:s}/{out:s}.log 92 transfer_output_files = "" 93 +JobFlavour = "{flavour:s}" 96 if proxy_path
is not None:
97 job_submit_template +=
"""\ 98 +x509userproxy = "{proxy:s}" 101 job_submit_file = os.path.join(path,
"job_"+name+
".submit")
102 with open(job_submit_file,
"w")
as f:
103 f.write(job_submit_template.format(script = os.path.join(path,name+
"_$(ProcId).sh"),
104 out = name+
"_$(ProcId)",
105 jobm = os.path.abspath(path),
106 flavour =
"tomorrow",
110 return job_submit_file
115 """This function executes `command` and returns it output. 117 - `command`: Shell command to be invoked by this function. 119 child = os.popen(command)
123 print(
'%s failed w/ exit code %d' % (command, err))
129 cmd2 =
' dasgoclient -limit=0 -query \'file run='+blob[0][0]+
' dataset='+blob[0][1]+ (
' instance='+blob[1]+
'\'' if (blob[1]
is not None)
else '\'')
131 q = Popen(cmd2 , shell=
True, stdout=PIPE, stderr=PIPE)
132 out, err = q.communicate()
134 outputList = out.decode().
split(
'\n')
141 nEvents = subprocess.check_output([
"das_client",
"--limit",
"0",
"--query",
"summary run={} dataset={} | grep summary.nevents".
format(run, dataset)])
142 return 0
if nEvents ==
"[]\n" else int(nEvents)
147 """Expects something like 148 +-------+------+--------+--------+-------------------+------------------+ 149 | nfill | nrun | nls | ncms | totdelivered(/fb) | totrecorded(/fb) | 150 +-------+------+--------+--------+-------------------+------------------+ 151 | 73 | 327 | 142418 | 138935 | 19.562 | 18.036 | 152 +-------+------+--------+--------+-------------------+------------------+ 153 And extracts the total recorded luminosity (/b). 162 output = subprocess.check_output([homedir+
"/.local/bin/brilcalc",
"lumi",
"-b",
"STABLE BEAMS",
"-u",
"/pb",
"--begin",
str(minRun),
"--end",
str(maxRun),
"--output-style",
"csv",
"-c",
"web"])
164 warnings.warn(
'ATTENTION! Impossible to query the BRIL DB!')
168 print(
"INSIDE GET LUMINOSITY")
171 for line
in output.decode().
split(
"\n"):
172 if (
"#" not in line):
173 runToCache = line.split(
",")[0].
split(
":")[0]
174 lumiToCache = line.split(
",")[-1].
replace(
"\r",
"")
177 myCachedLumi[runToCache] = lumiToCache
187 with open(jsonfile,
'r') as myJSON: jsonDATA = json.load(myJSON) 188 return (run
in jsonDATA)
190 warnings.warn(
'ATTENTION! Impossible to find lumi mask! All runs will be used.')
197 for section
in config.sections():
198 dictionary[section] = {}
199 for option
in config.options(section):
200 dictionary[section][option] = config.get(section, option)
208 Converts 'something' to boolean. Raises exception for invalid formats 209 Possible True values: 1, True, "1", "TRue", "yes", "y", "t" 210 Possible False values: 0, False, None, [], {}, "", "0", "faLse", "no", "n", "f", 0.0, ... 212 if str(value).lower()
in (
"yes",
"y",
"true",
"t",
"1"):
return True 213 if str(value).lower()
in (
"no",
"n",
"false",
"f",
"0",
"0.0",
"",
"none",
"[]",
"{}"):
return False 214 raise Exception(
'Invalid value for boolean conversion: ' +
str(value))
219 dbName =
"runInfo.pkl" 221 if os.path.exists(dbName):
222 with open(dbName,
'rb')
as f:
223 infos = pickle.load(f)
225 for f
in glob.glob(
"root-files/Run*.root"):
226 run = runFromFilename(f)
229 infos[run][
"start_time"] = getRunStartTime(run)
232 with open(dbName,
"wb")
as f:
233 pickle.dump(infos, f)
238 dbName =
"runInfo.pkl" 240 if os.path.exists(dbName):
241 with open(dbName,
'rb')
as f:
242 infos = pickle.load(f)
247 with open(dbName,
"wb")
as f:
248 pickle.dump(infos, f)
258 def exists( self, section, option):
260 items = self.items(section)
261 except ConfigParser.NoSectionError:
264 if item[0] == option:
272 for option
in self.options( section ):
273 result[option] = self.get( section, option )
274 if "local"+section.title()
in self.sections():
275 for option
in self.options(
"local"+section.title() ):
276 result[option] = self.get(
"local"+section.title(),option )
277 except ConfigParser.NoSectionError
as section:
278 msg = (
"%s in configuration files. This section is mandatory." 285 result = copy.deepcopy(defaultDict)
286 for option
in demandPars:
288 result[option] = self.get( section, option )
289 except ConfigParser.NoOptionError
as globalSectionError:
290 globalSection =
str( globalSectionError ).
split(
"'" )[-2]
291 splittedSectionName = section.split(
":" )
292 if len( splittedSectionName ) > 1:
293 localSection = (
"local"+section.split(
":" )[0].
title()+
":" 294 +section.split(
":")[1])
296 localSection = (
"local"+section.split(
":" )[0].
title())
297 if self.has_section( localSection ):
299 result[option] = self.get( localSection, option )
300 except ConfigParser.NoOptionError
as option:
301 msg = (
"%s. This option is mandatory." 304 "section '"+globalSection+
"' or", 1)))
307 msg = (
"%s. This option is mandatory." 308 %(
str(globalSectionError).
replace(
":",
"", 1)))
317 options = config.options(section)
318 for option
in options:
320 the_dict[option] = config.get(section, option)
321 if the_dict[option] == -1:
322 DebugPrint(
"skip: %s" % option)
324 print(
"exception on %s!" % option)
325 the_dict[option] =
None 332 for dir
in out_path.split(
'/'):
333 newpath=os.path.join(newpath,dir)
335 if newpath.find(
'test_out') > 0:
337 command=
"/afs/cern.ch/project/eos/installation/cms/bin/eos.select mkdir "+newpath
338 p = subprocess.Popen(command,shell=
True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
339 (out, err) = p.communicate()
344 command2=
"/afs/cern.ch/project/eos/installation/cms/bin/eos.select ls "+out_path
345 p = subprocess.Popen(command2,shell=
True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
346 (out, err) = p.communicate()
351 def split(sequence, size):
357 for i
in range(0, len(sequence), size):
358 yield sequence[i:i+size]
364 def __init__(self,dataset, job_number, job_id, job_name, isDA, isMC, applyBOWS, applyEXTRACOND, extraconditions, runboundary, lumilist, intlumi, maxevents, gt, allFromGT, alignmentDB, alignmentTAG, apeDB, apeTAG, bowDB, bowTAG, vertextype, tracktype, refittertype, ttrhtype, applyruncontrol, ptcut, CMSSW_dir ,the_dir):
367 theDataSet = dataset.split(
"/")[1]+
"_"+(dataset.split(
"/")[2]).
split(
"-")[0]
369 self.data = theDataSet
370 self.job_number = job_number
372 self.batch_job_id =
None 373 self.job_name = job_name
377 self.applyBOWS = applyBOWS
378 self.applyEXTRACOND = applyEXTRACOND
379 self.extraCondVect = extraconditions
380 self.runboundary = runboundary
381 self.lumilist = lumilist
382 self.intlumi = intlumi
383 self.maxevents = maxevents
385 self.allFromGT = allFromGT
386 self.alignmentDB = alignmentDB
387 self.alignmentTAG = alignmentTAG
392 self.vertextype = vertextype
393 self.tracktype = tracktype
394 self.refittertype = refittertype
395 self.ttrhtype = ttrhtype
396 self.applyruncontrol = applyruncontrol
400 self.CMSSW_dir=CMSSW_dir
402 self.output_full_name=self.getOutputBaseName()+
"_"+
str(self.job_id)
403 self.output_number_name=self.getOutputBaseNameWithData()+
"_"+
str(self.job_number)
406 self.outputCfgName=
None 411 self.output_LSF_name=
None 412 self.output_BASH_name=
None 422 self.OUTDIR = theEOSdir
426 return "PVValidation_"+self.job_name
430 return "PVValidation_"+self.job_name+
"_"+self.data
438 self.cfg_dir = os.path.join(self.the_dir,
"cfg")
439 if not os.path.exists(self.cfg_dir):
440 os.makedirs(self.cfg_dir)
442 self.outputCfgName=self.output_full_name+
"_cfg.py" 443 fout=open(os.path.join(self.cfg_dir,self.outputCfgName),
'w')
445 template_cfg_file = os.path.join(self.the_dir,
"PVValidation_T_cfg.py")
447 fin = open(template_cfg_file)
449 config_txt =
'\n\n' + CopyRights +
'\n\n' 450 config_txt += fin.read()
452 config_txt=config_txt.replace(
"ISDATEMPLATE",self.isDA)
453 config_txt=config_txt.replace(
"ISMCTEMPLATE",self.isMC)
454 config_txt=config_txt.replace(
"APPLYBOWSTEMPLATE",self.applyBOWS)
455 config_txt=config_txt.replace(
"EXTRACONDTEMPLATE",self.applyEXTRACOND)
456 config_txt=config_txt.replace(
"USEFILELISTTEMPLATE",
"True")
457 config_txt=config_txt.replace(
"RUNBOUNDARYTEMPLATE",self.runboundary)
458 config_txt=config_txt.replace(
"LUMILISTTEMPLATE",self.lumilist)
459 config_txt=config_txt.replace(
"MAXEVENTSTEMPLATE",self.maxevents)
460 config_txt=config_txt.replace(
"GLOBALTAGTEMPLATE",self.gt)
461 config_txt=config_txt.replace(
"ALLFROMGTTEMPLATE",self.allFromGT)
462 config_txt=config_txt.replace(
"ALIGNOBJTEMPLATE",self.alignmentDB)
463 config_txt=config_txt.replace(
"GEOMTAGTEMPLATE",self.alignmentTAG)
464 config_txt=config_txt.replace(
"APEOBJTEMPLATE",self.apeDB)
465 config_txt=config_txt.replace(
"ERRORTAGTEMPLATE",self.apeTAG)
466 config_txt=config_txt.replace(
"BOWSOBJECTTEMPLATE",self.bowDB)
467 config_txt=config_txt.replace(
"BOWSTAGTEMPLATE",self.bowTAG)
468 config_txt=config_txt.replace(
"VERTEXTYPETEMPLATE",self.vertextype)
469 config_txt=config_txt.replace(
"TRACKTYPETEMPLATE",self.tracktype)
470 config_txt=config_txt.replace(
"REFITTERTEMPLATE",self.refittertype)
471 config_txt=config_txt.replace(
"TTRHBUILDERTEMPLATE",self.ttrhtype)
472 config_txt=config_txt.replace(
"PTCUTTEMPLATE",self.ptcut)
473 config_txt=config_txt.replace(
"INTLUMITEMPLATE",self.intlumi)
474 config_txt=config_txt.replace(
"RUNCONTROLTEMPLATE",self.applyruncontrol)
475 lfn_with_quotes =
map(
lambda x:
"\'"+x+
"\'",lfn)
476 config_txt=config_txt.replace(
"FILESOURCETEMPLATE",
"["+
",".
join(lfn_with_quotes)+
"]")
477 config_txt=config_txt.replace(
"OUTFILETEMPLATE",self.output_full_name+
".root")
479 fout.write(config_txt)
481 for line
in fin.readlines():
483 if 'END OF EXTRA CONDITIONS' in line:
484 for element
in self.extraCondVect :
485 if(
"Rcd" in element):
486 params = self.extraCondVect[element].
split(
',')
489 fout.write(
" process.conditionsIn"+element+
"= CalibTracker.Configuration.Common.PoolDBESSource_cfi.poolDBESSource.clone( \n")
490 fout.write(
" connect = cms.string('"+params[0]+
"'), \n")
491 fout.write(
" toGet = cms.VPSet(cms.PSet(record = cms.string('"+element+
"'), \n")
492 fout.write(
" tag = cms.string('"+params[1]+
"'), \n")
494 fout.write(
" label = cms.untracked.string('"+params[2]+
"') \n")
498 fout.write(
" process.prefer_conditionsIn"+element+
" = cms.ESPrefer(\"PoolDBESSource\", \"conditionsIn"+element[0]+
"\") \n \n")
506 self.LSF_dir = os.path.join(self.the_dir,
"LSF")
507 if not os.path.exists(self.LSF_dir):
508 os.makedirs(self.LSF_dir)
510 self.output_LSF_name=self.output_full_name+
".lsf" 511 fout=open(os.path.join(self.LSF_dir,self.output_LSF_name),
'w')
513 job_name = self.output_full_name
515 log_dir = os.path.join(self.the_dir,
"log")
516 if not os.path.exists(log_dir):
519 fout.write(
"#!/bin/sh \n")
520 fout.write(
"#BSUB -L /bin/sh\n")
521 fout.write(
"#BSUB -J "+job_name+
"\n")
522 fout.write(
"#BSUB -o "+os.path.join(log_dir,job_name+
".log")+
"\n")
523 fout.write(
"#BSUB -q cmscaf1nd \n")
524 fout.write(
"JobName="+job_name+
" \n")
525 fout.write(
"OUT_DIR="+self.OUTDIR+
" \n")
526 fout.write(
"LXBATCH_DIR=`pwd` \n")
527 fout.write(
"cd "+os.path.join(self.CMSSW_dir,
"src")+
" \n")
528 fout.write(
"eval `scram runtime -sh` \n")
529 fout.write(
"cd $LXBATCH_DIR \n")
530 fout.write(
"cmsRun "+os.path.join(self.cfg_dir,self.outputCfgName)+
" \n")
531 fout.write(
"ls -lh . \n")
532 fout.write(
"for RootOutputFile in $(ls *root ); do xrdcp -f ${RootOutputFile} root://eoscms//eos/cms${OUT_DIR}/${RootOutputFile} ; done \n")
533 fout.write(
"for TxtOutputFile in $(ls *txt ); do xrdcp -f ${TxtOutputFile} root://eoscms//eos/cms${OUT_DIR}/${TxtOutputFile} ; done \n")
542 self.BASH_dir = os.path.join(self.the_dir,
"BASH")
543 if not os.path.exists(self.BASH_dir):
544 os.makedirs(self.BASH_dir)
546 self.output_BASH_name=self.output_number_name+
".sh" 547 fout=open(os.path.join(self.BASH_dir,self.output_BASH_name),
'w')
549 job_name = self.output_full_name
551 log_dir = os.path.join(self.the_dir,
"log")
552 if not os.path.exists(log_dir):
555 fout.write(
"#!/bin/bash \n")
557 fout.write(
"JobName="+job_name+
" \n")
558 fout.write(
"echo \"Job started at \" `date` \n")
559 fout.write(
"CMSSW_DIR="+os.path.join(self.CMSSW_dir,
"src")+
" \n")
560 fout.write(
"export X509_USER_PROXY=$CMSSW_DIR/Alignment/OfflineValidation/test/.user_proxy \n")
561 fout.write(
"OUT_DIR="+self.OUTDIR+
" \n")
562 fout.write(
"LXBATCH_DIR=$PWD \n")
564 fout.write(
"cd ${CMSSW_DIR} \n")
565 fout.write(
"eval `scramv1 runtime -sh` \n")
566 fout.write(
"echo \"batch dir: $LXBATCH_DIR release: $CMSSW_DIR release base: $CMSSW_RELEASE_BASE\" \n")
567 fout.write(
"cd $LXBATCH_DIR \n")
568 fout.write(
"cp "+os.path.join(self.cfg_dir,self.outputCfgName)+
" . \n")
569 fout.write(
"echo \"cmsRun "+self.outputCfgName+
"\" \n")
570 fout.write(
"cmsRun "+self.outputCfgName+
" \n")
571 fout.write(
"echo \"Content of working dir is \"`ls -lh` \n")
573 fout.write(
"for RootOutputFile in $(ls *root ); do xrdcp -f ${RootOutputFile} root://eoscms//eos/cms${OUT_DIR}/${RootOutputFile} ; done \n")
575 fout.write(
"echo \"Job ended at \" `date` \n")
576 fout.write(
"exit 0 \n")
582 return os.path.join(self.OUTDIR,self.output_full_name+
".root")
586 print(
"submit job", self.job_id)
587 job_name = self.output_full_name
588 submitcommand1 =
"chmod u+x " + os.path.join(self.LSF_dir,self.output_LSF_name)
589 child1 = os.system(submitcommand1)
592 self.batch_job_id =
getCommandOutput(
"bsub < "+os.path.join(self.LSF_dir,self.output_LSF_name))
596 return self.batch_job_id.
split(
"<")[1].
split(
">")[0]
604 print(
"Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
611 print(
'\n'+CopyRights)
613 HOME = os.environ.get(
'HOME')
616 input_CMSSW_BASE = os.environ.get(
'CMSSW_BASE')
617 AnalysisStep_dir = os.path.join(input_CMSSW_BASE,
"src/Alignment/OfflineValidation/test")
618 lib_path = os.path.abspath(AnalysisStep_dir)
619 sys.path.append(lib_path)
624 desc=
"""This is a description of %prog.""" 625 parser = OptionParser(description=desc,version=
'%prog version 0.1')
626 parser.add_option(
'-s',
'--submit', help=
'job submitted', dest=
'submit', action=
'store_true', default=
False)
627 parser.add_option(
'-j',
'--jobname', help=
'task name', dest=
'taskname', action=
'store', default=
'myTask')
628 parser.add_option(
'-D',
'--dataset', help=
'selected dataset', dest=
'data', action=
'store', default=
'')
629 parser.add_option(
'-r',
'--doRunBased',help=
'selected dataset', dest=
'doRunBased', action=
'store_true' , default=
False)
630 parser.add_option(
'-i',
'--input', help=
'set input configuration (overrides default)', dest=
'inputconfig',action=
'store',default=
None)
631 parser.add_option(
'-b',
'--begin', help=
'starting point', dest=
'start', action=
'store', default=
'1')
632 parser.add_option(
'-e',
'--end', help=
'ending point', dest=
'end', action=
'store', default=
'999999')
633 parser.add_option(
'-v',
'--verbose', help=
'verbose output', dest=
'verbose', action=
'store_true', default=
False)
634 parser.add_option(
'-u',
'--unitTest', help=
'unit tests?', dest=
'isUnitTest', action=
'store_true', default=
False)
635 parser.add_option(
'-I',
'--instance', help=
'DAS instance to use', dest=
'instance', action=
'store', default=
None)
636 (opts, args) = parser.parse_args()
638 now = datetime.datetime.now()
646 USER = os.environ.get(
'USER')
647 eosdir=os.path.join(
"/store/group/alca_trackeralign",USER,
"test_out",t)
652 print(
"Not going to create EOS folder. -s option has not been chosen")
685 ConfigFile = opts.inputconfig
687 if ConfigFile
is not None:
689 print(
"********************************************************")
690 print(
"* Parsing from input file:", ConfigFile,
" ")
693 config.read(ConfigFile)
695 print(
"Parsed the following configuration \n\n")
697 pprint.pprint(inputDict)
700 raise SystemExit(
"\n\n ERROR! Could not parse any input file, perhaps you are submitting this from the wrong folder? \n\n")
707 doRunBased = opts.doRunBased
709 listOfValidations = config.getResultingSection(
"validations")
711 for item
in listOfValidations:
712 if (
bool(listOfValidations[item]) ==
True):
721 applyEXTRACOND.append(
ConfigSectionMap(config,
"Conditions:"+item)[
'applyextracond'])
722 conditions.append(config.getResultingSection(
"ExtraConditions"))
724 alignmentDB.append(
ConfigSectionMap(config,
"Conditions:"+item)[
'alignmentdb'])
725 alignmentTAG.append(
ConfigSectionMap(config,
"Conditions:"+item)[
'alignmenttag'])
737 if(config.exists(
"Refit",
"refittertype")):
740 refittertype.append(
str(RefitType.COMMON))
742 if(config.exists(
"Refit",
"ttrhtype")):
745 ttrhtype.append(
"WithAngleAndTemplate")
747 applyruncontrol.append(
ConfigSectionMap(config,
"Selection")[
'applyruncontrol'])
753 print(
"********************************************************")
754 print(
"* Parsing from command line *")
755 print(
"********************************************************")
757 jobName = [
'testing']
760 doRunBased = opts.doRunBased
761 maxevents = [
'10000']
763 gt = [
'74X_dataRun2_Prompt_v4']
764 allFromGT = [
'False']
765 applyEXTRACOND = [
'False']
766 conditions = [[(
'SiPixelTemplateDBObjectRcd',
'frontier://FrontierProd/CMS_CONDITIONS',
'SiPixelTemplateDBObject_38T_2015_v3_hltvalidation')]]
767 alignmentDB = [
'frontier://FrontierProd/CMS_CONDITIONS']
768 alignmentTAG = [
'TrackerAlignment_Prompt']
769 apeDB = [
'frontier://FrontierProd/CMS_CONDITIONS']
770 apeTAG = [
'TrackerAlignmentExtendedErr_2009_v2_express_IOVs']
772 bowDB = [
'frontier://FrontierProd/CMS_CONDITIONS']
773 bowTAG = [
'TrackerSurafceDeformations_v1_express']
775 vertextype = [
'offlinePrimaryVertices']
776 tracktype = [
'ALCARECOTkAlMinBias']
778 applyruncontrol = [
'False']
785 print(
"********************************************************")
786 print(
"* Configuration info *")
787 print(
"********************************************************")
788 print(
"- submitted : ",opts.submit)
789 print(
"- taskname : ",opts.taskname)
790 print(
"- Jobname : ",jobName)
791 print(
"- use DA : ",isDA)
792 print(
"- is MC : ",isMC)
793 print(
"- is run-based: ",doRunBased)
794 print(
"- evts/job : ",maxevents)
795 print(
"- GlobatTag : ",gt)
796 print(
"- allFromGT? : ",allFromGT)
797 print(
"- extraCond? : ",applyEXTRACOND)
798 print(
"- extraCond : ",conditions)
799 print(
"- Align db : ",alignmentDB)
800 print(
"- Align tag : ",alignmentTAG)
801 print(
"- APE db : ",apeDB)
802 print(
"- APE tag : ",apeTAG)
803 print(
"- use bows? : ",applyBOWS)
804 print(
"- K&B db : ",bowDB)
805 print(
"- K&B tag : ",bowTAG)
806 print(
"- VertexColl : ",vertextype)
807 print(
"- TrackColl : ",tracktype)
808 print(
"- RefitterSeq : ",refittertype)
809 print(
"- TTRHBuilder : ",ttrhtype)
810 print(
"- RunControl? : ",applyruncontrol)
811 print(
"- Pt> ",ptcut)
812 print(
"- run= ",runboundary)
813 print(
"- JSON : ",lumilist)
814 print(
"- Out Dir : ",eosdir)
816 print(
"********************************************************")
817 print(
"Will run on",len(jobName),
"workflows")
823 print(
">>>> This is Data!")
824 print(
">>>> Doing run based selection")
825 cmd =
'dasgoclient -limit=0 -query \'run dataset='+opts.data + (
' instance='+opts.instance+
'\'' if (opts.instance
is not None)
else '\'')
826 p = Popen(cmd , shell=
True, stdout=PIPE, stderr=PIPE)
827 out, err = p.communicate()
829 listOfRuns=out.decode().
split(
"\n")
832 print(
"Will run on ",len(listOfRuns),
"runs: \n",listOfRuns)
836 print(
"first run:",opts.start,
"last run:",opts.end)
838 for run
in listOfRuns:
839 if (
int(run)<
int(opts.start)
or int(run)>
int(opts.end)):
840 print(
"excluding",run)
847 print(
"'======> taking",run)
850 mytuple.append((run,opts.data))
854 instances=[opts.instance
for entry
in mytuple]
855 pool = multiprocessing.Pool(processes=20)
856 count = pool.map(getFilesForRun,
zip(mytuple,instances))
857 file_info = dict(
zip(listOfRuns, count))
861 for run
in listOfRuns:
862 if (
int(run)<
int(opts.start)
or int(run)>
int(opts.end)):
863 print(
'rejecting run',run,
' becasue outside of boundaries')
867 print(
'rejecting run',run,
' becasue outside not in JSON')
890 od = collections.OrderedDict(sorted(file_info.items()))
898 print(
"|| WARNING: won't run on any run, probably DAS returned an empty query,\n|| but that's fine because this is a unit test!")
903 raise Exception(
'Will not run on any run.... please check again the configuration')
906 myLumiDB =
getLuminosity(HOME,myRuns[0],myRuns[-1],doRunBased,opts.verbose)
909 pprint.pprint(myLumiDB)
912 for iConf
in range(len(jobName)):
913 print(
"This is Task n.",iConf+1,
"of",len(jobName))
918 scripts_dir = os.path.join(AnalysisStep_dir,
"scripts")
919 if not os.path.exists(scripts_dir):
920 os.makedirs(scripts_dir)
921 hadd_script_file = os.path.join(scripts_dir,jobName[iConf]+
"_"+opts.taskname+
".sh")
922 fout = open(hadd_script_file,
'w')
924 output_file_list1=list()
925 output_file_list2=list()
926 output_file_list2.append(
"hadd ")
933 cmd =
'dasgoclient -query \'file dataset='+opts.data+ (
' instance='+opts.instance+
'\'' if (opts.instance
is not None)
else '\'')
934 s = Popen(cmd , shell=
True, stdout=PIPE, stderr=PIPE)
935 out,err = s.communicate()
936 mylist = out.decode().
split(
'\n')
940 splitList =
split(mylist,10)
941 for files
in splitList:
942 inputFiles.append(files)
943 myRuns.append(
str(1))
945 print(
"this is DATA (not doing full run-based selection)")
946 print(runboundary[iConf])
947 cmd =
'dasgoclient -query \'file dataset='+opts.data+
' run='+runboundary[iConf]+ (
' instance='+opts.instance+
'\'' if (opts.instance
is not None)
else '\'')
949 s = Popen(cmd , shell=
True, stdout=PIPE, stderr=PIPE)
950 out,err = s.communicate()
952 mylist = out.decode().
split(
'\n')
955 print(
"mylist:",mylist)
957 splitList =
split(mylist,10)
958 for files
in splitList:
959 inputFiles.append(files)
960 myRuns.append(
str(runboundary[iConf]))
962 myLumiDB =
getLuminosity(HOME,myRuns[0],myRuns[-1],
True,opts.verbose)
968 inputFiles.append(od[element])
979 print(
"myRuns =====>",myRuns)
985 for jobN,theSrcFiles
in enumerate(inputFiles):
987 print(
"JOB:",jobN,
"run",myRuns[jobN],theSrcFiles)
989 print(
"JOB:",jobN,
"run",myRuns[jobN])
998 thejobIndex=myRuns[jobN]
1000 thejobIndex=myRuns[jobN]+
"_"+
str(jobN)
1002 if (myRuns[jobN])
in myLumiDB:
1003 theLumi = myLumiDB[myRuns[jobN]]
1005 print(
"=====> COULD NOT FIND LUMI, setting default = 1/pb")
1007 print(
"int. lumi:",theLumi,
"/pb")
1013 runInfo[
"run"] = myRuns[jobN]
1015 runInfo[
"conf"] = jobName[iConf]
1016 runInfo[
"gt"] = gt[iConf]
1017 runInfo[
"allFromGT"] = allFromGT[iConf]
1018 runInfo[
"alignmentDB"] = alignmentDB[iConf]
1019 runInfo[
"alignmentTag"] = alignmentTAG[iConf]
1020 runInfo[
"apeDB"] = apeDB[iConf]
1021 runInfo[
"apeTag"] = apeTAG[iConf]
1022 runInfo[
"applyBows"] = applyBOWS[iConf]
1023 runInfo[
"bowDB"] = bowDB[iConf]
1024 runInfo[
"bowTag"] = bowTAG[iConf]
1025 runInfo[
"ptCut"] = ptcut[iConf]
1026 runInfo[
"lumilist"] = lumilist[iConf]
1027 runInfo[
"applyEXTRACOND"] = applyEXTRACOND[iConf]
1028 runInfo[
"conditions"] = conditions[iConf]
1029 runInfo[
"nfiles"] = len(theSrcFiles)
1030 runInfo[
"srcFiles"] = theSrcFiles
1031 runInfo[
"intLumi"] = theLumi
1033 updateDB(((iConf+1)*10)+(jobN+1),runInfo)
1035 totalJobs=totalJobs+1
1037 aJob =
Job(opts.data,
1040 jobName[iConf],isDA[iConf],isMC[iConf],
1041 applyBOWS[iConf],applyEXTRACOND[iConf],conditions[iConf],
1042 myRuns[jobN], lumilist[iConf], theLumi, maxevents[iConf],
1043 gt[iConf],allFromGT[iConf],
1044 alignmentDB[iConf], alignmentTAG[iConf],
1045 apeDB[iConf], apeTAG[iConf],
1046 bowDB[iConf], bowTAG[iConf],
1047 vertextype[iConf], tracktype[iConf],
1048 refittertype[iConf], ttrhtype[iConf],
1049 applyruncontrol[iConf],
1050 ptcut[iConf],input_CMSSW_BASE,AnalysisStep_dir)
1052 aJob.setEOSout(eosdir)
1053 aJob.createTheCfgFile(theSrcFiles)
1054 aJob.createTheBashFile()
1056 output_file_list1.append(
"xrdcp root://eoscms//eos/cms"+aJob.getOutputFileName()+
" /tmp/$USER/"+opts.taskname+
" \n")
1058 theBashDir=aJob.BASH_dir
1059 theBaseName=aJob.getOutputBaseNameWithData()
1060 mergedFile =
"/tmp/$USER/"+opts.taskname+
"/"+aJob.getOutputBaseName()+
" "+opts.taskname+
".root" 1061 output_file_list2.append(
"/tmp/$USER/"+opts.taskname+
"/"+aJob.getOutputBaseName()+opts.taskname+
".root ")
1062 output_file_list2.append(
"/tmp/$USER/"+opts.taskname+
"/"+os.path.split(aJob.getOutputFileName())[1]+
" ")
1068 os.system(
"chmod u+x "+theBashDir+
"/*.sh")
1069 submissionCommand =
"condor_submit "+job_submit_file
1073 fout.write(
"#!/bin/bash \n")
1074 fout.write(
"MAIL=$USER@mail.cern.ch \n")
1075 fout.write(
"OUT_DIR="+eosdir+
"\n")
1076 fout.write(
"FILE="+
str(mergedFile)+
"\n")
1077 fout.write(
"echo $HOST | mail -s \"Harvesting job started\" $USER@mail.cern.ch \n")
1078 fout.write(
"cd "+os.path.join(input_CMSSW_BASE,
"src")+
"\n")
1079 fout.write(
"eval `scram r -sh` \n")
1080 fout.write(
"mkdir -p /tmp/$USER/"+opts.taskname+
" \n")
1081 fout.writelines(output_file_list1)
1082 fout.writelines(output_file_list2)
1084 fout.write(
"echo \"xrdcp -f $FILE root://eoscms//eos/cms$OUT_DIR\" \n")
1085 fout.write(
"xrdcp -f $FILE root://eoscms//eos/cms$OUT_DIR \n")
1086 fout.write(
"echo \"Harvesting for "+opts.taskname+
" task is complete; please find output at $OUT_DIR \" | mail -s \"Harvesting for " +opts.taskname +
" completed\" $MAIL \n")
1088 os.system(
"chmod u+x "+hadd_script_file)
1090 harvest_conditions =
'"' +
" && ".
join([
"ended(" + jobId +
")" for jobId
in batchJobIds]) +
'"' 1092 lastJobCommand =
"bsub -o harvester"+opts.taskname+
".tmp -q 1nh -w "+harvest_conditions+
" "+hadd_script_file
1099 del output_file_list1
1102 if __name__ ==
"__main__":
1107 def forward_proxy(rundir)
def isInJSON(run, jsonfile)
— Classes —############################
const bool isValid(const Frame &aFrame, const FrameQuality &aQuality, const uint16_t aExpectedPos)
def getLuminosity(homedir, minRun, maxRun, isRunBased, verbose)
def getNEvents(run, dataset)
def __init__(self, dataset, job_number, job_id, job_name, isDA, isMC, applyBOWS, applyEXTRACOND, extraconditions, runboundary, lumilist, intlumi, maxevents, gt, allFromGT, alignmentDB, alignmentTAG, apeDB, apeTAG, bowDB, bowTAG, vertextype, tracktype, refittertype, ttrhtype, applyruncontrol, ptcut, CMSSW_dir, the_dir)
def replace(string, replacements)
def getResultingSection(self, section, defaultDict={}, demandPars=[])
def createTheCfgFile(self, lfn)
def createTheLSFFile(self)
def getCommandOutput(command)
def optionxform(self, optionstr)
def mkdir_eos(out_path)
method to create recursively directories on EOS #############
def getOutputBaseName(self)
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
def ConfigSectionMap(config, section)
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
def __updateDict(self, dictionary, section)
def split(sequence, size)
def updateDB(run, runInfo)
static std::string join(char **cmd)
def write_HTCondor_submit_file(path, name, nruns, proxy_path=None)
def createTheBashFile(self)
def getOutputFileName(self)
def exists(self, section, option)
def getOutputBaseNameWithData(self)
def setEOSout(self, theEOSdir)