3 '''Script that submits CMS Tracker Alignment Primary Vertex Validation workflows, 6 submitPVValidationJobs.py -j TEST -D /HLTPhysics/Run2016C-TkAlMinBias-07Dec2018-v1/ALCARECO -i testPVValidation_Relvals_DATA.ini -r 9 from __future__
import print_function
10 from builtins
import range
12 __author__ =
'Marco Musich' 13 __copyright__ =
'Copyright 2020, CERN CMS' 14 __credits__ = [
'Ernesto Migliore',
'Salvatore Di Guida']
15 __license__ =
'Unknown' 16 __maintainer__ =
'Marco Musich' 17 __email__ =
'marco.musich@cern.ch' 25 import configparser
as ConfigParser
29 from optparse
import OptionParser
30 from subprocess
import Popen, PIPE
34 import multiprocessing
41 CopyRights =
'##################################\n' 42 CopyRights +=
'# submitPVValidationJobs.py #\n' 43 CopyRights +=
'# marco.musich@cern.ch #\n' 44 CopyRights +=
'# April 2020 #\n' 45 CopyRights +=
'##################################\n' 50 """Check if GRID proxy has been initialized.""" 53 with open(os.devnull,
"w")
as dump:
54 subprocess.check_call([
"voms-proxy-info",
"--exists"],
55 stdout = dump, stderr = dump)
56 except subprocess.CalledProcessError:
63 """Forward proxy to location visible from the batch system. 65 - `rundir`: directory for storing the forwarded proxy 69 print(
"Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
72 local_proxy = subprocess.check_output([
"voms-proxy-info",
"--path"]).
strip()
73 shutil.copyfile(local_proxy, os.path.join(rundir,
".user_proxy"))
78 """Writes 'job.submit' file in `path`. 80 - `path`: job directory 81 - `script`: script to be executed 82 - `proxy_path`: path to proxy (only used in case of requested proxy forward) 85 job_submit_template=
"""\ 87 requirements = (OpSysAndVer =?= "CentOS7") 88 executable = {script:s} 89 output = {jobm:s}/{out:s}.out 90 error = {jobm:s}/{out:s}.err 91 log = {jobm:s}/{out:s}.log 92 transfer_output_files = "" 93 +JobFlavour = "{flavour:s}" 96 if proxy_path
is not None:
97 job_submit_template +=
"""\ 98 +x509userproxy = "{proxy:s}" 101 job_submit_file = os.path.join(path,
"job_"+name+
".submit")
102 with open(job_submit_file,
"w")
as f:
103 f.write(job_submit_template.format(script = os.path.join(path,name+
"_$(ProcId).sh"),
104 out = name+
"_$(ProcId)",
105 jobm = os.path.abspath(path),
106 flavour =
"tomorrow",
110 return job_submit_file
115 """This function executes `command` and returns it output. 117 - `command`: Shell command to be invoked by this function. 119 child = os.popen(command)
123 print(
'%s failed w/ exit code %d' % (command, err))
129 cmd2 =
' dasgoclient -limit=0 -query \'file run='+blob[0][0]+
' dataset='+blob[0][1]+ (
' instance='+blob[1]+
'\'' if (blob[1]
is not None)
else '\'')
131 q = Popen(cmd2 , shell=
True, stdout=PIPE, stderr=PIPE)
132 out, err = q.communicate()
134 outputList = out.decode().
split(
'\n')
141 nEvents = subprocess.check_output([
"das_client",
"--limit",
"0",
"--query",
"summary run={} dataset={} | grep summary.nevents".
format(run, dataset)])
142 return 0
if nEvents ==
"[]\n" else int(nEvents)
147 """Expects something like 148 +-------+------+--------+--------+-------------------+------------------+ 149 | nfill | nrun | nls | ncms | totdelivered(/fb) | totrecorded(/fb) | 150 +-------+------+--------+--------+-------------------+------------------+ 151 | 73 | 327 | 142418 | 138935 | 19.562 | 18.036 | 152 +-------+------+--------+--------+-------------------+------------------+ 153 And extracts the total recorded luminosity (/b). 162 output = subprocess.check_output([homedir+
"/.local/bin/brilcalc",
"lumi",
"-b",
"STABLE BEAMS",
"-u",
"/pb",
"--begin",
str(minRun),
"--end",
str(maxRun),
"--output-style",
"csv",
"-c",
"web"])
164 warnings.warn(
'ATTENTION! Impossible to query the BRIL DB!')
168 print(
"INSIDE GET LUMINOSITY")
171 for line
in output.decode().
split(
"\n"):
172 if (
"#" not in line):
173 runToCache = line.split(
",")[0].
split(
":")[0]
174 lumiToCache = line.split(
",")[-1].
replace(
"\r",
"")
177 myCachedLumi[runToCache] = lumiToCache
187 with open(jsonfile,
'r') as myJSON: jsonDATA = json.load(myJSON) 188 return (run
in jsonDATA)
190 warnings.warn(
'ATTENTION! Impossible to find lumi mask! All runs will be used.')
197 for section
in config.sections():
198 dictionary[section] = {}
199 for option
in config.options(section):
200 dictionary[section][option] = config.get(section, option)
208 Converts 'something' to boolean. Raises exception for invalid formats 209 Possible True values: 1, True, "1", "TRue", "yes", "y", "t" 210 Possible False values: 0, False, None, [], {}, "", "0", "faLse", "no", "n", "f", 0.0, ... 212 if str(value).lower()
in (
"yes",
"y",
"true",
"t",
"1"):
return True 213 if str(value).lower()
in (
"no",
"n",
"false",
"f",
"0",
"0.0",
"",
"none",
"[]",
"{}"):
return False 214 raise Exception(
'Invalid value for boolean conversion: ' +
str(value))
219 dbName =
"runInfo.pkl" 221 if os.path.exists(dbName):
222 with open(dbName,
'rb')
as f:
223 infos = pickle.load(f)
225 for f
in glob.glob(
"root-files/Run*.root"):
226 run = runFromFilename(f)
229 infos[run][
"start_time"] = getRunStartTime(run)
232 with open(dbName,
"wb")
as f:
233 pickle.dump(infos, f)
238 dbName =
"runInfo.pkl" 240 if os.path.exists(dbName):
241 with open(dbName,
'rb')
as f:
242 infos = pickle.load(f)
247 with open(dbName,
"wb")
as f:
248 pickle.dump(infos, f)
258 def exists( self, section, option):
260 items = self.items(section)
261 except ConfigParser.NoSectionError:
264 if item[0] == option:
272 for option
in self.options( section ):
273 result[option] = self.get( section, option )
274 if "local"+section.title()
in self.sections():
275 for option
in self.options(
"local"+section.title() ):
276 result[option] = self.get(
"local"+section.title(),option )
277 except ConfigParser.NoSectionError
as section:
278 msg = (
"%s in configuration files. This section is mandatory." 285 result = copy.deepcopy(defaultDict)
286 for option
in demandPars:
288 result[option] = self.get( section, option )
289 except ConfigParser.NoOptionError
as globalSectionError:
290 globalSection =
str( globalSectionError ).
split(
"'" )[-2]
291 splittedSectionName = section.split(
":" )
292 if len( splittedSectionName ) > 1:
293 localSection = (
"local"+section.split(
":" )[0].
title()+
":" 294 +section.split(
":")[1])
296 localSection = (
"local"+section.split(
":" )[0].
title())
297 if self.has_section( localSection ):
299 result[option] = self.get( localSection, option )
300 except ConfigParser.NoOptionError
as option:
301 msg = (
"%s. This option is mandatory." 304 "section '"+globalSection+
"' or", 1)))
307 msg = (
"%s. This option is mandatory." 308 %(
str(globalSectionError).
replace(
":",
"", 1)))
317 options = config.options(section)
318 for option
in options:
320 the_dict[option] = config.get(section, option)
321 if the_dict[option] == -1:
322 DebugPrint(
"skip: %s" % option)
324 print(
"exception on %s!" % option)
325 the_dict[option] =
None 332 for dir
in out_path.split(
'/'):
333 newpath=os.path.join(newpath,dir)
335 if newpath.find(
'test_out') > 0:
337 command=
"/afs/cern.ch/project/eos/installation/cms/bin/eos.select mkdir "+newpath
338 p = subprocess.Popen(command,shell=
True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
339 (out, err) = p.communicate()
344 command2=
"/afs/cern.ch/project/eos/installation/cms/bin/eos.select ls "+out_path
345 p = subprocess.Popen(command2,shell=
True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
346 (out, err) = p.communicate()
351 def split(sequence, size):
357 for i
in range(0, len(sequence), size):
358 yield sequence[i:i+size]
364 def __init__(self,dataset, job_number, job_id, job_name, isDA, isMC, applyBOWS, applyEXTRACOND, extraconditions, runboundary, lumilist, intlumi, maxevents, gt, allFromGT, alignmentDB, alignmentTAG, apeDB, apeTAG, bowDB, bowTAG, vertextype, tracktype, refittertype, ttrhtype, applyruncontrol, ptcut, CMSSW_dir ,the_dir):
367 theDataSet = dataset.split(
"/")[1]+
"_"+(dataset.split(
"/")[2]).
split(
"-")[0]
369 self.data = theDataSet
370 self.job_number = job_number
372 self.batch_job_id =
None 373 self.job_name = job_name
377 self.applyBOWS = applyBOWS
378 self.applyEXTRACOND = applyEXTRACOND
379 self.extraCondVect = extraconditions
380 self.runboundary = runboundary
381 self.lumilist = lumilist
382 self.intlumi = intlumi
383 self.maxevents = maxevents
385 self.allFromGT = allFromGT
386 self.alignmentDB = alignmentDB
387 self.alignmentTAG = alignmentTAG
392 self.vertextype = vertextype
393 self.tracktype = tracktype
394 self.refittertype = refittertype
395 self.ttrhtype = ttrhtype
396 self.applyruncontrol = applyruncontrol
400 self.CMSSW_dir=CMSSW_dir
402 self.output_full_name=self.getOutputBaseName()+
"_"+
str(self.job_id)
403 self.output_number_name=self.getOutputBaseNameWithData()+
"_"+
str(self.job_number)
406 self.outputCfgName=
None 411 self.output_LSF_name=
None 412 self.output_BASH_name=
None 422 self.OUTDIR = theEOSdir
426 return "PVValidation_"+self.job_name
430 return "PVValidation_"+self.job_name+
"_"+self.data
438 self.cfg_dir = os.path.join(self.the_dir,
"cfg")
439 if not os.path.exists(self.cfg_dir):
440 os.makedirs(self.cfg_dir)
442 self.outputCfgName=self.output_full_name+
"_cfg.py" 443 fout=open(os.path.join(self.cfg_dir,self.outputCfgName),
'w')
445 template_cfg_file = os.path.join(self.the_dir,
"PVValidation_T_cfg.py")
446 file = open(template_cfg_file,
'r') 448 config_txt = '\n\n' + CopyRights +
'\n\n' 449 config_txt += file.read()
450 config_txt=config_txt.replace(
"ISDATEMPLATE",self.isDA)
451 config_txt=config_txt.replace(
"ISMCTEMPLATE",self.isMC)
452 config_txt=config_txt.replace(
"APPLYBOWSTEMPLATE",self.applyBOWS)
453 config_txt=config_txt.replace(
"EXTRACONDTEMPLATE",self.applyEXTRACOND)
454 config_txt=config_txt.replace(
"USEFILELISTTEMPLATE",
"True")
455 config_txt=config_txt.replace(
"RUNBOUNDARYTEMPLATE",self.runboundary)
456 config_txt=config_txt.replace(
"LUMILISTTEMPLATE",self.lumilist)
457 config_txt=config_txt.replace(
"MAXEVENTSTEMPLATE",self.maxevents)
458 config_txt=config_txt.replace(
"GLOBALTAGTEMPLATE",self.gt)
459 config_txt=config_txt.replace(
"ALLFROMGTTEMPLATE",self.allFromGT)
460 config_txt=config_txt.replace(
"ALIGNOBJTEMPLATE",self.alignmentDB)
461 config_txt=config_txt.replace(
"GEOMTAGTEMPLATE",self.alignmentTAG)
462 config_txt=config_txt.replace(
"APEOBJTEMPLATE",self.apeDB)
463 config_txt=config_txt.replace(
"ERRORTAGTEMPLATE",self.apeTAG)
464 config_txt=config_txt.replace(
"BOWSOBJECTTEMPLATE",self.bowDB)
465 config_txt=config_txt.replace(
"BOWSTAGTEMPLATE",self.bowTAG)
466 config_txt=config_txt.replace(
"VERTEXTYPETEMPLATE",self.vertextype)
467 config_txt=config_txt.replace(
"TRACKTYPETEMPLATE",self.tracktype)
468 config_txt=config_txt.replace(
"REFITTERTEMPLATE",self.refittertype)
469 config_txt=config_txt.replace(
"TTRHBUILDERTEMPLATE",self.ttrhtype)
470 config_txt=config_txt.replace(
"PTCUTTEMPLATE",self.ptcut)
471 config_txt=config_txt.replace(
"INTLUMITEMPLATE",self.intlumi)
472 config_txt=config_txt.replace(
"RUNCONTROLTEMPLATE",self.applyruncontrol)
473 lfn_with_quotes =
map(
lambda x:
"\'"+x+
"\'",lfn)
474 config_txt=config_txt.replace(
"FILESOURCETEMPLATE",
"["+
",".
join(lfn_with_quotes)+
"]")
475 config_txt=config_txt.replace(
"OUTFILETEMPLATE",self.output_full_name+
".root")
479 for element
in self.extraCondVect :
480 if(
"Rcd" in element):
481 params = self.extraCondVect[element].
split(
',')
483 process.conditionsIn{record} = CalibTracker.Configuration.Common.PoolDBESSource_cfi.poolDBESSource.clone( 484 connect = cms.string('{database}'), 485 toGet = cms.VPSet(cms.PSet(record = cms.string('{record}'), 486 tag = cms.string('{tag}'), 487 label = cms.untracked.string('{label}') 491 process.prefer_conditionsIn{record} = cms.ESPrefer("PoolDBESSource", "conditionsIn{record}") 492 '''.
format(record = element, database = params[0], tag = params[1], label = (params[2]
if len(params)>2
else ''))
495 if(self.applyEXTRACOND==
"True"):
496 if not self.extraCondVect:
497 raise Exception(
'Requested extra conditions, but none provided')
499 config_txt=config_txt.replace(
"END OF EXTRA CONDITIONS",textToWrite)
501 print(
"INFO: Will not apply any extra conditions")
504 fout.write(config_txt)
513 self.LSF_dir = os.path.join(self.the_dir,
"LSF")
514 if not os.path.exists(self.LSF_dir):
515 os.makedirs(self.LSF_dir)
517 self.output_LSF_name=self.output_full_name+
".lsf" 518 fout=open(os.path.join(self.LSF_dir,self.output_LSF_name),
'w')
520 job_name = self.output_full_name
522 log_dir = os.path.join(self.the_dir,
"log")
523 if not os.path.exists(log_dir):
526 fout.write(
"#!/bin/sh \n")
527 fout.write(
"#BSUB -L /bin/sh\n")
528 fout.write(
"#BSUB -J "+job_name+
"\n")
529 fout.write(
"#BSUB -o "+os.path.join(log_dir,job_name+
".log")+
"\n")
530 fout.write(
"#BSUB -q cmscaf1nd \n")
531 fout.write(
"JobName="+job_name+
" \n")
532 fout.write(
"OUT_DIR="+self.OUTDIR+
" \n")
533 fout.write(
"LXBATCH_DIR=`pwd` \n")
534 fout.write(
"cd "+os.path.join(self.CMSSW_dir,
"src")+
" \n")
535 fout.write(
"eval `scram runtime -sh` \n")
536 fout.write(
"cd $LXBATCH_DIR \n")
537 fout.write(
"cmsRun "+os.path.join(self.cfg_dir,self.outputCfgName)+
" \n")
538 fout.write(
"ls -lh . \n")
539 fout.write(
"for RootOutputFile in $(ls *root ); do xrdcp -f ${RootOutputFile} root://eoscms//eos/cms${OUT_DIR}/${RootOutputFile} ; done \n")
540 fout.write(
"for TxtOutputFile in $(ls *txt ); do xrdcp -f ${TxtOutputFile} root://eoscms//eos/cms${OUT_DIR}/${TxtOutputFile} ; done \n")
549 self.BASH_dir = os.path.join(self.the_dir,
"BASH")
550 if not os.path.exists(self.BASH_dir):
551 os.makedirs(self.BASH_dir)
553 self.output_BASH_name=self.output_number_name+
".sh" 554 fout=open(os.path.join(self.BASH_dir,self.output_BASH_name),
'w')
556 job_name = self.output_full_name
558 log_dir = os.path.join(self.the_dir,
"log")
559 if not os.path.exists(log_dir):
562 fout.write(
"#!/bin/bash \n")
564 fout.write(
"JobName="+job_name+
" \n")
565 fout.write(
"echo \"Job started at \" `date` \n")
566 fout.write(
"CMSSW_DIR="+os.path.join(self.CMSSW_dir,
"src")+
" \n")
567 fout.write(
"export X509_USER_PROXY=$CMSSW_DIR/Alignment/OfflineValidation/test/.user_proxy \n")
568 fout.write(
"OUT_DIR="+self.OUTDIR+
" \n")
569 fout.write(
"LXBATCH_DIR=$PWD \n")
571 fout.write(
"cd ${CMSSW_DIR} \n")
572 fout.write(
"eval `scramv1 runtime -sh` \n")
573 fout.write(
"echo \"batch dir: $LXBATCH_DIR release: $CMSSW_DIR release base: $CMSSW_RELEASE_BASE\" \n")
574 fout.write(
"cd $LXBATCH_DIR \n")
575 fout.write(
"cp "+os.path.join(self.cfg_dir,self.outputCfgName)+
" . \n")
576 fout.write(
"echo \"cmsRun "+self.outputCfgName+
"\" \n")
577 fout.write(
"cmsRun "+self.outputCfgName+
" \n")
578 fout.write(
"echo \"Content of working dir is \"`ls -lh` \n")
580 fout.write(
"for RootOutputFile in $(ls *root ); do xrdcp -f ${RootOutputFile} root://eoscms//eos/cms${OUT_DIR}/${RootOutputFile} ; done \n")
582 fout.write(
"echo \"Job ended at \" `date` \n")
583 fout.write(
"exit 0 \n")
589 return os.path.join(self.OUTDIR,self.output_full_name+
".root")
593 print(
"submit job", self.job_id)
594 job_name = self.output_full_name
595 submitcommand1 =
"chmod u+x " + os.path.join(self.LSF_dir,self.output_LSF_name)
596 child1 = os.system(submitcommand1)
599 self.batch_job_id =
getCommandOutput(
"bsub < "+os.path.join(self.LSF_dir,self.output_LSF_name))
603 return self.batch_job_id.
split(
"<")[1].
split(
">")[0]
611 print(
"Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
618 print(
'\n'+CopyRights)
620 HOME = os.environ.get(
'HOME')
623 input_CMSSW_BASE = os.environ.get(
'CMSSW_BASE')
624 AnalysisStep_dir = os.path.join(input_CMSSW_BASE,
"src/Alignment/OfflineValidation/test")
625 lib_path = os.path.abspath(AnalysisStep_dir)
626 sys.path.append(lib_path)
631 desc=
"""This is a description of %prog.""" 632 parser = OptionParser(description=desc,version=
'%prog version 0.1')
633 parser.add_option(
'-s',
'--submit', help=
'job submitted', dest=
'submit', action=
'store_true', default=
False)
634 parser.add_option(
'-j',
'--jobname', help=
'task name', dest=
'taskname', action=
'store', default=
'myTask')
635 parser.add_option(
'-D',
'--dataset', help=
'selected dataset', dest=
'data', action=
'store', default=
'')
636 parser.add_option(
'-r',
'--doRunBased',help=
'selected dataset', dest=
'doRunBased', action=
'store_true' , default=
False)
637 parser.add_option(
'-i',
'--input', help=
'set input configuration (overrides default)', dest=
'inputconfig',action=
'store',default=
None)
638 parser.add_option(
'-b',
'--begin', help=
'starting point', dest=
'start', action=
'store', default=
'1')
639 parser.add_option(
'-e',
'--end', help=
'ending point', dest=
'end', action=
'store', default=
'999999')
640 parser.add_option(
'-v',
'--verbose', help=
'verbose output', dest=
'verbose', action=
'store_true', default=
False)
641 parser.add_option(
'-u',
'--unitTest', help=
'unit tests?', dest=
'isUnitTest', action=
'store_true', default=
False)
642 parser.add_option(
'-I',
'--instance', help=
'DAS instance to use', dest=
'instance', action=
'store', default=
None)
643 (opts, args) = parser.parse_args()
645 now = datetime.datetime.now()
653 USER = os.environ.get(
'USER')
654 eosdir=os.path.join(
"/store/group/alca_trackeralign",USER,
"test_out",t)
659 print(
"Not going to create EOS folder. -s option has not been chosen")
692 ConfigFile = opts.inputconfig
694 if ConfigFile
is not None:
696 print(
"********************************************************")
697 print(
"* Parsing from input file:", ConfigFile,
" ")
700 config.read(ConfigFile)
702 print(
"Parsed the following configuration \n\n")
704 pprint.pprint(inputDict)
707 raise SystemExit(
"\n\n ERROR! Could not parse any input file, perhaps you are submitting this from the wrong folder? \n\n")
714 doRunBased = opts.doRunBased
716 listOfValidations = config.getResultingSection(
"validations")
718 for item
in listOfValidations:
719 if (
bool(listOfValidations[item]) ==
True):
728 applyEXTRACOND.append(
ConfigSectionMap(config,
"Conditions:"+item)[
'applyextracond'])
729 conditions.append(config.getResultingSection(
"ExtraConditions"))
731 alignmentDB.append(
ConfigSectionMap(config,
"Conditions:"+item)[
'alignmentdb'])
732 alignmentTAG.append(
ConfigSectionMap(config,
"Conditions:"+item)[
'alignmenttag'])
744 if(config.exists(
"Refit",
"refittertype")):
747 refittertype.append(
str(RefitType.COMMON))
749 if(config.exists(
"Refit",
"ttrhtype")):
752 ttrhtype.append(
"WithAngleAndTemplate")
754 applyruncontrol.append(
ConfigSectionMap(config,
"Selection")[
'applyruncontrol'])
760 print(
"********************************************************")
761 print(
"* Parsing from command line *")
762 print(
"********************************************************")
764 jobName = [
'testing']
767 doRunBased = opts.doRunBased
768 maxevents = [
'10000']
770 gt = [
'74X_dataRun2_Prompt_v4']
771 allFromGT = [
'False']
772 applyEXTRACOND = [
'False']
773 conditions = [[(
'SiPixelTemplateDBObjectRcd',
'frontier://FrontierProd/CMS_CONDITIONS',
'SiPixelTemplateDBObject_38T_2015_v3_hltvalidation')]]
774 alignmentDB = [
'frontier://FrontierProd/CMS_CONDITIONS']
775 alignmentTAG = [
'TrackerAlignment_Prompt']
776 apeDB = [
'frontier://FrontierProd/CMS_CONDITIONS']
777 apeTAG = [
'TrackerAlignmentExtendedErr_2009_v2_express_IOVs']
779 bowDB = [
'frontier://FrontierProd/CMS_CONDITIONS']
780 bowTAG = [
'TrackerSurafceDeformations_v1_express']
782 vertextype = [
'offlinePrimaryVertices']
783 tracktype = [
'ALCARECOTkAlMinBias']
785 applyruncontrol = [
'False']
792 print(
"********************************************************")
793 print(
"* Configuration info *")
794 print(
"********************************************************")
795 print(
"- submitted : ",opts.submit)
796 print(
"- taskname : ",opts.taskname)
797 print(
"- Jobname : ",jobName)
798 print(
"- use DA : ",isDA)
799 print(
"- is MC : ",isMC)
800 print(
"- is run-based: ",doRunBased)
801 print(
"- evts/job : ",maxevents)
802 print(
"- GlobatTag : ",gt)
803 print(
"- allFromGT? : ",allFromGT)
804 print(
"- extraCond? : ",applyEXTRACOND)
805 print(
"- extraCond : ",conditions)
806 print(
"- Align db : ",alignmentDB)
807 print(
"- Align tag : ",alignmentTAG)
808 print(
"- APE db : ",apeDB)
809 print(
"- APE tag : ",apeTAG)
810 print(
"- use bows? : ",applyBOWS)
811 print(
"- K&B db : ",bowDB)
812 print(
"- K&B tag : ",bowTAG)
813 print(
"- VertexColl : ",vertextype)
814 print(
"- TrackColl : ",tracktype)
815 print(
"- RefitterSeq : ",refittertype)
816 print(
"- TTRHBuilder : ",ttrhtype)
817 print(
"- RunControl? : ",applyruncontrol)
818 print(
"- Pt> ",ptcut)
819 print(
"- run= ",runboundary)
820 print(
"- JSON : ",lumilist)
821 print(
"- Out Dir : ",eosdir)
823 print(
"********************************************************")
824 print(
"Will run on",len(jobName),
"workflows")
830 print(
">>>> This is Data!")
831 print(
">>>> Doing run based selection")
832 cmd =
'dasgoclient -limit=0 -query \'run dataset='+opts.data + (
' instance='+opts.instance+
'\'' if (opts.instance
is not None)
else '\'')
833 p = Popen(cmd , shell=
True, stdout=PIPE, stderr=PIPE)
834 out, err = p.communicate()
836 listOfRuns=out.decode().
split(
"\n")
839 print(
"Will run on ",len(listOfRuns),
"runs: \n",listOfRuns)
843 print(
"first run:",opts.start,
"last run:",opts.end)
845 for run
in listOfRuns:
846 if (
int(run)<
int(opts.start)
or int(run)>
int(opts.end)):
847 print(
"excluding",run)
854 print(
"'======> taking",run)
857 mytuple.append((run,opts.data))
861 instances=[opts.instance
for entry
in mytuple]
862 pool = multiprocessing.Pool(processes=20)
863 count = pool.map(getFilesForRun,
zip(mytuple,instances))
864 file_info = dict(
zip(listOfRuns, count))
868 for run
in listOfRuns:
869 if (
int(run)<
int(opts.start)
or int(run)>
int(opts.end)):
870 print(
'rejecting run',run,
' becasue outside of boundaries')
874 print(
'rejecting run',run,
' becasue outside not in JSON')
897 od = collections.OrderedDict(sorted(file_info.items()))
905 print(
"|| WARNING: won't run on any run, probably DAS returned an empty query,\n|| but that's fine because this is a unit test!")
910 raise Exception(
'Will not run on any run.... please check again the configuration')
913 myLumiDB =
getLuminosity(HOME,myRuns[0],myRuns[-1],doRunBased,opts.verbose)
916 pprint.pprint(myLumiDB)
919 for iConf
in range(len(jobName)):
920 print(
"This is Task n.",iConf+1,
"of",len(jobName))
925 scripts_dir = os.path.join(AnalysisStep_dir,
"scripts")
926 if not os.path.exists(scripts_dir):
927 os.makedirs(scripts_dir)
928 hadd_script_file = os.path.join(scripts_dir,jobName[iConf]+
"_"+opts.taskname+
".sh")
929 fout = open(hadd_script_file,
'w')
931 output_file_list1=list()
932 output_file_list2=list()
933 output_file_list2.append(
"hadd ")
940 cmd =
'dasgoclient -query \'file dataset='+opts.data+ (
' instance='+opts.instance+
'\'' if (opts.instance
is not None)
else '\'')
941 s = Popen(cmd , shell=
True, stdout=PIPE, stderr=PIPE)
942 out,err = s.communicate()
943 mylist = out.decode().
split(
'\n')
947 splitList =
split(mylist,10)
948 for files
in splitList:
949 inputFiles.append(files)
950 myRuns.append(
str(1))
952 print(
"this is DATA (not doing full run-based selection)")
953 print(runboundary[iConf])
954 cmd =
'dasgoclient -query \'file dataset='+opts.data+
' run='+runboundary[iConf]+ (
' instance='+opts.instance+
'\'' if (opts.instance
is not None)
else '\'')
956 s = Popen(cmd , shell=
True, stdout=PIPE, stderr=PIPE)
957 out,err = s.communicate()
959 mylist = out.decode().
split(
'\n')
962 print(
"mylist:",mylist)
964 splitList =
split(mylist,10)
965 for files
in splitList:
966 inputFiles.append(files)
967 myRuns.append(
str(runboundary[iConf]))
969 myLumiDB =
getLuminosity(HOME,myRuns[0],myRuns[-1],
True,opts.verbose)
975 inputFiles.append(od[element])
986 print(
"myRuns =====>",myRuns)
992 for jobN,theSrcFiles
in enumerate(inputFiles):
994 print(
"JOB:",jobN,
"run",myRuns[jobN],theSrcFiles)
996 print(
"JOB:",jobN,
"run",myRuns[jobN])
1005 thejobIndex=myRuns[jobN]
1007 thejobIndex=myRuns[jobN]+
"_"+
str(jobN)
1009 if (myRuns[jobN])
in myLumiDB:
1010 theLumi = myLumiDB[myRuns[jobN]]
1012 print(
"=====> COULD NOT FIND LUMI, setting default = 1/pb")
1014 print(
"int. lumi:",theLumi,
"/pb")
1020 runInfo[
"run"] = myRuns[jobN]
1022 runInfo[
"conf"] = jobName[iConf]
1023 runInfo[
"gt"] = gt[iConf]
1024 runInfo[
"allFromGT"] = allFromGT[iConf]
1025 runInfo[
"alignmentDB"] = alignmentDB[iConf]
1026 runInfo[
"alignmentTag"] = alignmentTAG[iConf]
1027 runInfo[
"apeDB"] = apeDB[iConf]
1028 runInfo[
"apeTag"] = apeTAG[iConf]
1029 runInfo[
"applyBows"] = applyBOWS[iConf]
1030 runInfo[
"bowDB"] = bowDB[iConf]
1031 runInfo[
"bowTag"] = bowTAG[iConf]
1032 runInfo[
"ptCut"] = ptcut[iConf]
1033 runInfo[
"lumilist"] = lumilist[iConf]
1034 runInfo[
"applyEXTRACOND"] = applyEXTRACOND[iConf]
1035 runInfo[
"conditions"] = conditions[iConf]
1036 runInfo[
"nfiles"] = len(theSrcFiles)
1037 runInfo[
"srcFiles"] = theSrcFiles
1038 runInfo[
"intLumi"] = theLumi
1040 updateDB(((iConf+1)*10)+(jobN+1),runInfo)
1042 totalJobs=totalJobs+1
1044 aJob =
Job(opts.data,
1047 jobName[iConf],isDA[iConf],isMC[iConf],
1048 applyBOWS[iConf],applyEXTRACOND[iConf],conditions[iConf],
1049 myRuns[jobN], lumilist[iConf], theLumi, maxevents[iConf],
1050 gt[iConf],allFromGT[iConf],
1051 alignmentDB[iConf], alignmentTAG[iConf],
1052 apeDB[iConf], apeTAG[iConf],
1053 bowDB[iConf], bowTAG[iConf],
1054 vertextype[iConf], tracktype[iConf],
1055 refittertype[iConf], ttrhtype[iConf],
1056 applyruncontrol[iConf],
1057 ptcut[iConf],input_CMSSW_BASE,AnalysisStep_dir)
1059 aJob.setEOSout(eosdir)
1060 aJob.createTheCfgFile(theSrcFiles)
1061 aJob.createTheBashFile()
1063 output_file_list1.append(
"xrdcp root://eoscms//eos/cms"+aJob.getOutputFileName()+
" /tmp/$USER/"+opts.taskname+
" \n")
1065 theBashDir=aJob.BASH_dir
1066 theBaseName=aJob.getOutputBaseNameWithData()
1067 mergedFile =
"/tmp/$USER/"+opts.taskname+
"/"+aJob.getOutputBaseName()+
" "+opts.taskname+
".root" 1068 output_file_list2.append(
"/tmp/$USER/"+opts.taskname+
"/"+aJob.getOutputBaseName()+opts.taskname+
".root ")
1069 output_file_list2.append(
"/tmp/$USER/"+opts.taskname+
"/"+os.path.split(aJob.getOutputFileName())[1]+
" ")
1075 os.system(
"chmod u+x "+theBashDir+
"/*.sh")
1076 submissionCommand =
"condor_submit "+job_submit_file
1080 fout.write(
"#!/bin/bash \n")
1081 fout.write(
"MAIL=$USER@mail.cern.ch \n")
1082 fout.write(
"OUT_DIR="+eosdir+
"\n")
1083 fout.write(
"FILE="+
str(mergedFile)+
"\n")
1084 fout.write(
"echo $HOST | mail -s \"Harvesting job started\" $USER@mail.cern.ch \n")
1085 fout.write(
"cd "+os.path.join(input_CMSSW_BASE,
"src")+
"\n")
1086 fout.write(
"eval `scram r -sh` \n")
1087 fout.write(
"mkdir -p /tmp/$USER/"+opts.taskname+
" \n")
1088 fout.writelines(output_file_list1)
1089 fout.writelines(output_file_list2)
1091 fout.write(
"echo \"xrdcp -f $FILE root://eoscms//eos/cms$OUT_DIR\" \n")
1092 fout.write(
"xrdcp -f $FILE root://eoscms//eos/cms$OUT_DIR \n")
1093 fout.write(
"echo \"Harvesting for "+opts.taskname+
" task is complete; please find output at $OUT_DIR \" | mail -s \"Harvesting for " +opts.taskname +
" completed\" $MAIL \n")
1095 os.system(
"chmod u+x "+hadd_script_file)
1097 harvest_conditions =
'"' +
" && ".
join([
"ended(" + jobId +
")" for jobId
in batchJobIds]) +
'"' 1099 lastJobCommand =
"bsub -o harvester"+opts.taskname+
".tmp -q 1nh -w "+harvest_conditions+
" "+hadd_script_file
1106 del output_file_list1
1109 if __name__ ==
"__main__":
1114 def forward_proxy(rundir)
def isInJSON(run, jsonfile)
— Classes —############################
const bool isValid(const Frame &aFrame, const FrameQuality &aQuality, const uint16_t aExpectedPos)
def getLuminosity(homedir, minRun, maxRun, isRunBased, verbose)
def getNEvents(run, dataset)
def __init__(self, dataset, job_number, job_id, job_name, isDA, isMC, applyBOWS, applyEXTRACOND, extraconditions, runboundary, lumilist, intlumi, maxevents, gt, allFromGT, alignmentDB, alignmentTAG, apeDB, apeTAG, bowDB, bowTAG, vertextype, tracktype, refittertype, ttrhtype, applyruncontrol, ptcut, CMSSW_dir, the_dir)
def replace(string, replacements)
def getResultingSection(self, section, defaultDict={}, demandPars=[])
def createTheCfgFile(self, lfn)
def createTheLSFFile(self)
def getCommandOutput(command)
def optionxform(self, optionstr)
def mkdir_eos(out_path)
method to create recursively directories on EOS #############
def getOutputBaseName(self)
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
def ConfigSectionMap(config, section)
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
def __updateDict(self, dictionary, section)
def split(sequence, size)
def updateDB(run, runInfo)
static std::string join(char **cmd)
def write_HTCondor_submit_file(path, name, nruns, proxy_path=None)
def createTheBashFile(self)
def getOutputFileName(self)
def exists(self, section, option)
def getOutputBaseNameWithData(self)
def setEOSout(self, theEOSdir)