1 from __future__
import print_function
11 print(
'Not injecting to wmagent in --show mode. Need to run the worklfows.')
13 if opt.wmcontrol==
'init':
16 if opt.wmcontrol==
'test':
19 if opt.wmcontrol==
'submit' and opt.nProcs==0:
20 print(
'Not injecting to wmagent in -j 0 mode. Need to run the worklfows.')
22 if opt.wmcontrol==
'force':
23 print(
"This is an expert setting, you'd better know what you're doing")
27 from modules.wma
import upload_to_couch
28 (filePath,labelInCouch,user,group,where) = arguments
29 cacheId=upload_to_couch(filePath,
45 for k
in options.split(
','):
46 if k.startswith(
'dqm:'):
47 self.
dqmgui=k.split(
':',1)[-1]
48 elif k.startswith(
'wma:'):
51 self.
testMode=((mode!=
'submit')
and (mode!=
'force'))
64 self.
wmagent = os.getenv(
'WMAGENT_REQMGR')
71 self.
wmagent =
'cmsweb-testbed.cern.ch' 74 if opt.dbsUrl
is not None:
76 elif os.getenv(
'CMS_DBSREADER_URL')
is not None:
77 self.
DbsUrl = os.getenv(
'CMS_DBSREADER_URL')
81 self.
DbsUrl =
"https://cmsweb-prod.cern.ch/dbs/prod/global/DBSReader" 83 self.
DbsUrl =
"https://cmsweb-testbed.cern.ch/dbs/int/global/DBSReader" 86 self.
dqmgui=
"https://cmsweb.cern.ch/dqm/relval" 91 self.
user = os.getenv(
'USER')
99 if not os.getenv(
'WMCORE_ROOT'):
100 print(
'\n\twmclient is not setup properly. Will not be able to upload or submit requests.\n')
105 print(
'\n\tFound wmclient\n')
108 "RequestType" :
"TaskChain",
109 "SubRequestType" :
"RelVal",
110 "RequestPriority": 500000,
111 "Requestor": self.
user,
113 "CMSSWVersion": os.getenv(
'CMSSW_VERSION'),
114 "Campaign": os.getenv(
'CMSSW_VERSION'),
115 "ScramArch": os.getenv(
'SCRAM_ARCH'),
116 "ProcessingVersion": self.
version,
118 "ConfigCacheUrl": self.
couch,
126 "SizePerEvent" : 1234,
128 "PrepID": os.getenv(
'CMSSW_VERSION')
132 "EnableHarvesting" :
"True",
133 "DQMUploadUrl" : self.
dqmgui,
134 "DQMConfigCacheID" :
None,
140 "ConfigCacheID" :
None,
142 "SplittingAlgo" :
"EventBased",
143 "EventsPerJob" :
None,
144 "RequestNumEvents" :
None,
145 "Seeding" :
"AutomaticSeeding",
146 "PrimaryDataset" :
None,
148 "Multicore" : opt.nThreads,
152 "TaskName" :
"DigiHLT",
153 "ConfigCacheID" :
None,
155 "InputDataset" :
None,
156 "SplittingAlgo" :
"LumiBased",
159 "Multicore" : opt.nThreads,
165 "InputFromOutputModule" :
None,
166 "ConfigCacheID" :
None,
168 "SplittingAlgo" :
"LumiBased",
171 "Multicore" : opt.nThreads,
180 Return a "wmsplit" dictionary that contain non-default LumisPerJob values 189 wmsplit[
'RECODreHLT']=2
193 wmsplit[
'DIGIUP15_PU50']=1
194 wmsplit[
'RECOUP15_PU50']=1
195 wmsplit[
'DIGIUP15_PU25']=1
196 wmsplit[
'RECOUP15_PU25']=1
197 wmsplit[
'DIGIUP15_PU25HS']=1
198 wmsplit[
'RECOUP15_PU25HS']=1
199 wmsplit[
'DIGIHIMIX']=5
200 wmsplit[
'RECOHIMIX']=5
201 wmsplit[
'RECODSplit']=1
202 wmsplit[
'SingleMuPt10_UP15_ID']=1
203 wmsplit[
'DIGIUP15_ID']=1
204 wmsplit[
'RECOUP15_ID']=1
205 wmsplit[
'TTbar_13_ID']=1
206 wmsplit[
'SingleMuPt10FS_ID']=1
207 wmsplit[
'TTbarFS_ID']=1
208 wmsplit[
'RECODR2_50nsreHLT']=5
209 wmsplit[
'RECODR2_25nsreHLT']=5
210 wmsplit[
'RECODR2_2016reHLT']=5
211 wmsplit[
'RECODR2_50nsreHLT_HIPM']=5
212 wmsplit[
'RECODR2_25nsreHLT_HIPM']=5
213 wmsplit[
'RECODR2_2016reHLT_HIPM']=1
214 wmsplit[
'RECODR2_2016reHLT_skimSingleMu']=1
215 wmsplit[
'RECODR2_2016reHLT_skimDoubleEG']=1
216 wmsplit[
'RECODR2_2016reHLT_skimMuonEG']=1
217 wmsplit[
'RECODR2_2016reHLT_skimJetHT']=1
218 wmsplit[
'RECODR2_2016reHLT_skimMET']=1
219 wmsplit[
'RECODR2_2016reHLT_skimSinglePh']=1
220 wmsplit[
'RECODR2_2016reHLT_skimMuOnia']=1
221 wmsplit[
'RECODR2_2016reHLT_skimSingleMu_HIPM']=1
222 wmsplit[
'RECODR2_2016reHLT_skimDoubleEG_HIPM']=1
223 wmsplit[
'RECODR2_2016reHLT_skimMuonEG_HIPM']=1
224 wmsplit[
'RECODR2_2016reHLT_skimJetHT_HIPM']=1
225 wmsplit[
'RECODR2_2016reHLT_skimMET_HIPM']=1
226 wmsplit[
'RECODR2_2016reHLT_skimSinglePh_HIPM']=1
227 wmsplit[
'RECODR2_2016reHLT_skimMuOnia_HIPM']=1
228 wmsplit[
'RECODR2_2017reHLT_Prompt']=1
229 wmsplit[
'RECODR2_2017reHLT_skimSingleMu_Prompt_Lumi']=1
230 wmsplit[
'RECODR2_2017reHLT_skimDoubleEG_Prompt']=1
231 wmsplit[
'RECODR2_2017reHLT_skimMET_Prompt']=1
232 wmsplit[
'RECODR2_2017reHLT_skimMuOnia_Prompt']=1
233 wmsplit[
'RECODR2_2017reHLT_Prompt_L1TEgDQM']=1
234 wmsplit[
'RECODR2_2018reHLT_Prompt']=1
235 wmsplit[
'RECODR2_2018reHLT_skimSingleMu_Prompt_Lumi']=1
236 wmsplit[
'RECODR2_2018reHLT_skimDoubleEG_Prompt']=1
237 wmsplit[
'RECODR2_2018reHLT_skimJetHT_Prompt']=1
238 wmsplit[
'RECODR2_2018reHLT_skimMET_Prompt']=1
239 wmsplit[
'RECODR2_2018reHLT_skimMuOnia_Prompt']=1
240 wmsplit[
'RECODR2_2018reHLT_skimEGamma_Prompt_L1TEgDQM']=1
241 wmsplit[
'RECODR2_2018reHLT_skimMuonEG_Prompt']=1
242 wmsplit[
'RECODR2_2018reHLT_skimCharmonium_Prompt']=1
243 wmsplit[
'RECODR2_2018reHLT_skimJetHT_Prompt_HEfail']=1
244 wmsplit[
'RECODR2_2018reHLT_skimJetHT_Prompt_BadHcalMitig']=1
245 wmsplit[
'RECODR2_2018reHLTAlCaTkCosmics_Prompt']=1
246 wmsplit[
'RECODR2_2018reHLT_skimDisplacedJet_Prompt']=1
247 wmsplit[
'RECODR2_2018reHLT_ZBPrompt']=1
248 wmsplit[
'RECODR2_2018reHLT_Offline']=1
249 wmsplit[
'RECODR2_2018reHLT_skimSingleMu_Offline_Lumi']=1
250 wmsplit[
'RECODR2_2018reHLT_skimDoubleEG_Offline']=1
251 wmsplit[
'RECODR2_2018reHLT_skimJetHT_Offline']=1
252 wmsplit[
'RECODR2_2018reHLT_skimMET_Offline']=1
253 wmsplit[
'RECODR2_2018reHLT_skimMuOnia_Offline']=1
254 wmsplit[
'RECODR2_2018reHLT_skimEGamma_Offline_L1TEgDQM']=1
255 wmsplit[
'RECODR2_2018reHLT_skimMuonEG_Offline']=1
256 wmsplit[
'RECODR2_2018reHLT_skimCharmonium_Offline']=1
257 wmsplit[
'RECODR2_2018reHLT_skimJetHT_Offline_HEfail']=1
258 wmsplit[
'RECODR2_2018reHLT_skimJetHT_Offline_BadHcalMitig']=1
259 wmsplit[
'RECODR2_2018reHLTAlCaTkCosmics_Offline']=1
260 wmsplit[
'RECODR2_2018reHLT_skimDisplacedJet_Offline']=1
261 wmsplit[
'RECODR2_2018reHLT_ZBOffline']=1
262 wmsplit[
'HLTDR2_50ns']=1
263 wmsplit[
'HLTDR2_25ns']=1
264 wmsplit[
'HLTDR2_2016']=1
265 wmsplit[
'HLTDR2_2017']=1
266 wmsplit[
'HLTDR2_2018']=1
267 wmsplit[
'HLTDR2_2018_BadHcalMitig']=1
268 wmsplit[
'Hadronizer']=1
269 wmsplit[
'DIGIUP15']=1
270 wmsplit[
'RECOUP15']=1
271 wmsplit[
'RECOAODUP15']=5
272 wmsplit[
'DBLMINIAODMCUP15NODQM']=5
273 wmsplit[
'DigiFull']=5
274 wmsplit[
'RecoFull']=5
275 wmsplit[
'DigiFullPU']=1
276 wmsplit[
'RecoFullPU']=1
277 wmsplit[
'RECOHID11']=1
278 wmsplit[
'DigiFullTriggerPU_2023D17PU'] = 1
279 wmsplit[
'RecoFullGlobalPU_2023D17PU']=1
280 wmsplit[
'DIGIUP17']=1
281 wmsplit[
'RECOUP17']=1
282 wmsplit[
'DIGIUP17_PU25']=1
283 wmsplit[
'RECOUP17_PU25']=1
284 wmsplit[
'DIGICOS_UP16']=1
285 wmsplit[
'RECOCOS_UP16']=1
286 wmsplit[
'DIGICOS_UP17']=1
287 wmsplit[
'RECOCOS_UP17']=1
288 wmsplit[
'DIGICOS_UP18']=1
289 wmsplit[
'RECOCOS_UP18']=1
290 wmsplit[
'HYBRIDRepackHI2015VR']=1
291 wmsplit[
'HYBRIDZSHI2015']=1
292 wmsplit[
'RECOHID15']=1
293 wmsplit[
'RECOHID18']=1
294 except Exception
as ex:
295 print(
'Exception while building a wmsplit dictionary: %s' % (
str(ex)))
300 def prepare(self,mReader, directories, mode='init'):
301 wmsplit = MatrixInjector.get_wmsplit()
303 for (n,dir)
in directories.items():
305 print(
"inspecting",dir)
307 for (x,s)
in mReader.workFlowSteps.items():
317 if len( [step
for step
in s[3]
if "HARVESTGEN" in step] )>0:
318 chainDict[
'TimePerEvent']=0.01
319 thisLabel=thisLabel+
"_gen" 321 if len( [step
for step
in s[3]
if "DBLMINIAODMCUP15NODQM" in step] )>0:
322 thisLabel=thisLabel+
"_dblMiniAOD" 328 if 'INPUT' in step
or (
not isinstance(s[2][index],str)):
329 nextHasDSInput=s[2][index]
337 chainDict[
'nowmTasklist'][-1][
'nowmIO']=json.loads(open(
'%s/%s.io'%(dir,step)).read())
339 print(
"Failed to find",
'%s/%s.io'%(dir,step),
".The workflows were probably not run on cfg not created")
342 chainDict[
'nowmTasklist'][-1][
'PrimaryDataset']=
'RelVal'+s[1].
split(
'+')[0]
343 if not '--relval' in s[2][index]:
344 print(
'Impossible to create task from scratch without splitting information with --relval')
347 arg=s[2][index].
split()
348 ns=
map(int,arg[arg.index(
'--relval')+1].
split(
','))
349 chainDict[
'nowmTasklist'][-1][
'RequestNumEvents'] = ns[0]
350 chainDict[
'nowmTasklist'][-1][
'EventsPerJob'] = ns[1]
351 if 'FASTSIM' in s[2][index]
or '--fast' in s[2][index]:
352 thisLabel+=
'_FastSim' 353 if 'lhe' in s[2][index]
in s[2][index]:
354 chainDict[
'nowmTasklist'][-1][
'LheInputFiles'] =
True 359 chainDict[
'nowmTasklist'][-1][
'nowmIO']=json.loads(open(
'%s/%s.io'%(dir,step)).read())
361 print(
"Failed to find",
'%s/%s.io'%(dir,step),
".The workflows were probably not run on cfg not created")
363 chainDict[
'nowmTasklist'][-1][
'InputDataset']=nextHasDSInput.dataSet
364 if (
'DQMHLTonRAWAOD' in step) :
365 chainDict[
'nowmTasklist'][-1][
'IncludeParents']=
True 366 splitForThisWf=nextHasDSInput.split
367 chainDict[
'nowmTasklist'][-1][
'LumisPerJob']=splitForThisWf
369 chainDict[
'nowmTasklist'][-1][
'LumisPerJob']=wmsplit[step]
371 if len(nextHasDSInput.run):
372 chainDict[
'nowmTasklist'][-1][
'RunWhitelist']=nextHasDSInput.run
373 if len(nextHasDSInput.ls):
374 chainDict[
'nowmTasklist'][-1][
'LumiList']=nextHasDSInput.ls
376 if '--data' in s[2][index]
and nextHasDSInput.label:
377 thisLabel+=
'_RelVal_%s'%nextHasDSInput.label
378 if 'filter' in chainDict[
'nowmTasklist'][-1][
'nowmIO']:
379 print(
"This has an input DS and a filter sequence: very likely to be the PyQuen sample")
380 processStrPrefix=
'PU_' 381 setPrimaryDs =
'RelVal'+s[1].
split(
'+')[0]
383 chainDict[
'nowmTasklist'][-1][
'PrimaryDataset']=setPrimaryDs
389 chainDict[
'nowmTasklist'][-1][
'nowmIO']=json.loads(open(
'%s/%s.io'%(dir,step)).read())
391 print(
"Failed to find",
'%s/%s.io'%(dir,step),
".The workflows were probably not run on cfg not created")
394 chainDict[
'nowmTasklist'][-1][
'LumisPerJob']=splitForThisWf
396 chainDict[
'nowmTasklist'][-1][
'LumisPerJob']=wmsplit[step]
399 if 'Hadronizer' in step:
400 chainDict[
'nowmTasklist'][-1][
'LumisPerJob']=wmsplit[
'Hadronizer']
403 chainDict[
'nowmTasklist'][-1][
'TaskName']=step
405 chainDict[
'nowmTasklist'][-1][
'PrimaryDataset']=setPrimaryDs
406 chainDict[
'nowmTasklist'][-1][
'ConfigCacheID']=
'%s/%s.py'%(dir,step)
407 chainDict[
'nowmTasklist'][-1][
'GlobalTag']=chainDict[
'nowmTasklist'][-1][
'nowmIO'][
'GT']
408 chainDict[
'GlobalTag']=chainDict[
'nowmTasklist'][-1][
'nowmIO'][
'GT']
409 if 'NANOEDM' in step :
410 nanoedmGT = chainDict[
'nowmTasklist'][-1][
'nowmIO'][
'GT']
411 if 'NANOMERGE' in step :
412 chainDict[
'GlobalTag'] = nanoedmGT
413 if 'pileup' in chainDict[
'nowmTasklist'][-1][
'nowmIO']:
414 chainDict[
'nowmTasklist'][-1][
'MCPileup']=chainDict[
'nowmTasklist'][-1][
'nowmIO'][
'pileup']
415 if '--pileup ' in s[2][index]:
416 processStrPrefix=
'PU_' 418 processStrPrefix=
'PU25ns_' 420 processStrPrefix=
'PU50ns_' 421 if 'premix_stage2' in s[2][index]
and '--pileup_input' in s[2][index]:
423 processStrPrefix=
'PUpmx25ns_' 424 elif s[2][index].
split()[ s[2][index].
split().
index(
'--pileup_input')+1 ].
find(
'50ns') > 0 :
425 processStrPrefix=
'PUpmx50ns_' 429 chainDict[
'AcquisitionEra'][step]=chainDict[
'CMSSWVersion']
430 chainDict[
'ProcessingString'][step]=processStrPrefix+chainDict[
'nowmTasklist'][-1][
'GlobalTag'].
replace(
'::All',
'').
replace(
'-',
'_')+thisLabel
431 if 'NANOMERGE' in step :
432 chainDict[
'ProcessingString'][step]=processStrPrefix+nanoedmGT.replace(
'::All',
'').
replace(
'-',
'_')+thisLabel
435 chainDict[
'nowmTasklist'][-1][
'AcquisitionEra']=chainDict[
'CMSSWVersion']
436 chainDict[
'nowmTasklist'][-1][
'ProcessingString']=processStrPrefix+chainDict[
'nowmTasklist'][-1][
'GlobalTag'].
replace(
'::All',
'').
replace(
'-',
'_')+thisLabel
437 if 'NANOMERGE' in step :
438 chainDict[
'nowmTasklist'][-1][
'ProcessingString']=processStrPrefix+nanoedmGT.replace(
'::All',
'').
replace(
'-',
'_')+thisLabel
441 chainDict[
'nowmTasklist'][-1][
'Campaign'] = chainDict[
'nowmTasklist'][-1][
'AcquisitionEra']+self.
batchName 444 if (
'DBLMINIAODMCUP15NODQM' in step):
445 chainDict[
'nowmTasklist'][-1][
'ProcessingString']=chainDict[
'nowmTasklist'][-1][
'ProcessingString']+
'_miniAOD' 447 if( chainDict[
'nowmTasklist'][-1][
'Multicore'] ):
454 chainDict[
'nowmTasklist'][-1][
'Memory'] = self.
memoryOffset +
int( chainDict[
'nowmTasklist'][-1][
'Multicore'] -1 ) * self.
memPerCore 458 chainDict[
'RequestString']=
'RV'+chainDict[
'CMSSWVersion']+s[1].
split(
'+')[0]
459 if processStrPrefix
or thisLabel:
460 chainDict[
'RequestString']+=
'_'+processStrPrefix+thisLabel
463 chainDict[
'PrepID'] = chainDict[
'CMSSWVersion']+
'__'+self.
batchTime+
'-'+s[1].
split(
'+')[0]
465 chainDict[
'PrepID'] = chainDict[
'CMSSWVersion']+self.
batchName+
'-'+s[1].
split(
'+')[0]
467 chainDict[
'SubRequestType'] =
"HIRelVal" 474 for i_second
in reversed(range(len(chainDict[
'nowmTasklist']))):
475 t_second=chainDict[
'nowmTasklist'][i_second]
477 if 'primary' in t_second[
'nowmIO']:
479 primary=t_second[
'nowmIO'][
'primary'][0].
replace(
'file:',
'')
480 for i_input
in reversed(range(0,i_second)):
481 t_input=chainDict[
'nowmTasklist'][i_input]
482 for (om,o)
in t_input[
'nowmIO'].
items():
485 t_second[
'InputTask'] = t_input[
'TaskName']
486 t_second[
'InputFromOutputModule'] = om
488 if t_second[
'TaskName'].startswith(
'HARVEST'):
490 chainDict[
'DQMConfigCacheID']=t_second[
'ConfigCacheID']
510 chainDict[
'AcquisitionEra'] = chainDict[
'AcquisitionEra'].
values()[0]
511 chainDict[
'ProcessingString'] = chainDict[
'ProcessingString'].
values()[0]
513 chainDict[
'AcquisitionEra'] = chainDict[
'nowmTasklist'][0][
'AcquisitionEra']
514 chainDict[
'ProcessingString'] = chainDict[
'nowmTasklist'][0][
'ProcessingString']
518 chainDict[
'Campaign'] = chainDict[
'AcquisitionEra']+self.
batchName 524 if isinstance(i, int)
and i < len(chainDict[
'nowmTasklist']):
525 chainDict[
'nowmTasklist'][i][
'KeepOutput']=
True 526 for (i,t)
in enumerate(chainDict[
'nowmTasklist']):
527 if t[
'TaskName'].startswith(
'HARVEST'):
531 elif t[
'TaskName']
in self.
keep:
533 if t[
'TaskName'].startswith(
'HYBRIDRepackHI2015VR'):
534 t[
'KeepOutput']=
False 537 chainDict[
'Task%d'%(itask)]=t
544 chainDict[
'TaskChain']=itask
546 chainDict.pop(
'nowmTasklist')
553 labelInCouch=self.
label+
'_'+label
554 cacheName=filePath.split(
'/')[-1]
557 print(
'\tFake upload of',filePath,
'to couch with label',labelInCouch)
561 from modules.wma
import upload_to_couch,DATABASE_NAME
563 print(
'\n\tUnable to find wmcontrol modules. Please include it in your python path\n')
568 print(
"Not re-uploading",filePath,
"to",where,
"for",label)
571 print(
"Loading",filePath,
"to",where,
"for",label)
573 pool = multiprocessing.Pool(1)
574 cacheIds = pool.map( upload_to_couch_oneArg, [(filePath,labelInCouch,self.
user,self.
group,where)] )
575 cacheId = cacheIds[0]
580 for (n,d)
in self.chainDicts.items():
582 if it.startswith(
"Task")
and it!=
'TaskChain':
584 couchID=self.
uploadConf(d[it][
'ConfigCacheID'],
585 str(n)+d[it][
'TaskName'],
588 print(d[it][
'ConfigCacheID'],
" uploaded to couchDB for",
str(n),
"with ID",couchID)
589 d[it][
'ConfigCacheID']=couchID
590 if it ==
'DQMConfigCacheID':
591 couchID=self.
uploadConf(d[
'DQMConfigCacheID'],
595 print(d[
'DQMConfigCacheID'],
"uploaded to couchDB for",
str(n),
"with ID",couchID)
596 d[
'DQMConfigCacheID']=couchID
601 from modules.wma
import makeRequest,approveRequest
602 from wmcontrol
import random_sleep
603 print(
'\n\tFound wmcontrol\n')
605 print(
'\n\tUnable to find wmcontrol modules. Please include it in your python path\n')
611 for (n,d)
in self.chainDicts.items():
613 print(
"Only viewing request",n)
614 print(pprint.pprint(d))
617 print(
"For eyes before submitting",n)
618 print(pprint.pprint(d))
619 print(
"Submitting",n,
"...........")
620 workFlow=makeRequest(self.
wmagent,d,encodeDict=
True)
621 print(
"...........",n,
"submitted")
def prepare(self, mReader, directories, mode='init')
def performInjectionOptionTest(opt)
def uploadConf(self, filePath, label, where)
the info are not in the task specific dict but in the general dict t_input.update(copy.deepcopy(self.defaultHarvest)) t_input['DQMConfigCacheID']=t_second['ConfigCacheID']
def replace(string, replacements)
S & print(S &os, JobReport::InputFile const &f)
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
def __init__(self, opt, mode='init', options='')
def upload_to_couch_oneArg(arguments)