10 print 'Not injecting to wmagent in --show mode. Need to run the worklfows.' 12 if opt.wmcontrol==
'init':
15 if opt.wmcontrol==
'test':
18 if opt.wmcontrol==
'submit' and opt.nProcs==0:
19 print 'Not injecting to wmagent in -j 0 mode. Need to run the worklfows.' 21 if opt.wmcontrol==
'force':
22 print "This is an expert setting, you'd better know what you're doing" 26 from modules.wma
import upload_to_couch
27 (filePath,labelInCouch,user,group,where) = arguments
28 cacheId=upload_to_couch(filePath,
44 for k
in options.split(
','):
45 if k.startswith(
'dqm:'):
46 self.
dqmgui=k.split(
':',1)[-1]
47 elif k.startswith(
'wma:'):
50 self.
testMode=((mode!=
'submit')
and (mode!=
'force'))
62 self.
wmagent=os.getenv(
'WMAGENT_REQMGR')
68 self.
wmagent =
'cmsweb-testbed.cern.ch' 69 self.
DbsUrl =
"https://"+self.
wmagent+
"/dbs/int/global/DBSReader" 72 self.
dqmgui=
"https://cmsweb.cern.ch/dqm/relval" 77 self.
user = os.getenv(
'USER')
85 if not os.getenv(
'WMCORE_ROOT'):
86 print '\n\twmclient is not setup properly. Will not be able to upload or submit requests.\n' 91 print '\n\tFound wmclient\n' 94 "RequestType" :
"TaskChain",
95 "SubRequestType" :
"RelVal",
96 "RequestPriority": 500000,
97 "Requestor": self.
user,
99 "CMSSWVersion": os.getenv(
'CMSSW_VERSION'),
100 "Campaign": os.getenv(
'CMSSW_VERSION'),
101 "ScramArch": os.getenv(
'SCRAM_ARCH'),
102 "ProcessingVersion": self.
version,
104 "ConfigCacheUrl": self.
couch,
112 "SizePerEvent" : 1234,
113 "TimePerEvent" : 0.1,
114 "PrepID": os.getenv(
'CMSSW_VERSION')
118 "EnableHarvesting" :
"True",
119 "DQMUploadUrl" : self.
dqmgui,
120 "DQMConfigCacheID" :
None,
126 "ConfigCacheID" :
None,
128 "SplittingAlgo" :
"EventBased",
129 "EventsPerJob" :
None,
130 "RequestNumEvents" :
None,
131 "Seeding" :
"AutomaticSeeding",
132 "PrimaryDataset" :
None,
134 "Multicore" : opt.nThreads,
138 "TaskName" :
"DigiHLT",
139 "ConfigCacheID" :
None,
141 "InputDataset" :
None,
142 "SplittingAlgo" :
"LumiBased",
145 "Multicore" : opt.nThreads,
151 "InputFromOutputModule" :
None,
152 "ConfigCacheID" :
None,
154 "SplittingAlgo" :
"LumiBased",
157 "Multicore" : opt.nThreads,
164 def prepare(self,mReader, directories, mode='init'):
171 wmsplit[
'RECODreHLT']=2
175 wmsplit[
'DIGIUP15_PU50']=1
176 wmsplit[
'RECOUP15_PU50']=1
177 wmsplit[
'DIGIUP15_PU25']=1
178 wmsplit[
'RECOUP15_PU25']=1
179 wmsplit[
'DIGIUP15_PU25HS']=1
180 wmsplit[
'RECOUP15_PU25HS']=1
181 wmsplit[
'DIGIHIMIX']=5
182 wmsplit[
'RECOHIMIX']=5
183 wmsplit[
'RECODSplit']=1
184 wmsplit[
'SingleMuPt10_UP15_ID']=1
185 wmsplit[
'DIGIUP15_ID']=1
186 wmsplit[
'RECOUP15_ID']=1
187 wmsplit[
'TTbar_13_ID']=1
188 wmsplit[
'SingleMuPt10FS_ID']=1
189 wmsplit[
'TTbarFS_ID']=1
190 wmsplit[
'RECODR2_50nsreHLT']=5
191 wmsplit[
'RECODR2_25nsreHLT']=5
192 wmsplit[
'RECODR2_2016reHLT']=5
193 wmsplit[
'RECODR2_50nsreHLT_HIPM']=5
194 wmsplit[
'RECODR2_25nsreHLT_HIPM']=5
195 wmsplit[
'RECODR2_2016reHLT_HIPM']=1
196 wmsplit[
'RECODR2_2016reHLT_skimSingleMu']=1
197 wmsplit[
'RECODR2_2016reHLT_skimDoubleEG']=1
198 wmsplit[
'RECODR2_2016reHLT_skimMuonEG']=1
199 wmsplit[
'RECODR2_2016reHLT_skimJetHT']=1
200 wmsplit[
'RECODR2_2016reHLT_skimMET']=1
201 wmsplit[
'RECODR2_2016reHLT_skimSinglePh']=1
202 wmsplit[
'RECODR2_2016reHLT_skimMuOnia']=1
203 wmsplit[
'RECODR2_2016reHLT_skimSingleMu_HIPM']=1
204 wmsplit[
'RECODR2_2016reHLT_skimDoubleEG_HIPM']=1
205 wmsplit[
'RECODR2_2016reHLT_skimMuonEG_HIPM']=1
206 wmsplit[
'RECODR2_2016reHLT_skimJetHT_HIPM']=1
207 wmsplit[
'RECODR2_2016reHLT_skimMET_HIPM']=1
208 wmsplit[
'RECODR2_2016reHLT_skimSinglePh_HIPM']=1
209 wmsplit[
'RECODR2_2016reHLT_skimMuOnia_HIPM']=1
210 wmsplit[
'RECODR2_2017reHLT_Prompt']=1
211 wmsplit[
'RECODR2_2017reHLT_skimSingleMu_Prompt_Lumi']=1
212 wmsplit[
'RECODR2_2017reHLT_skimDoubleEG_Prompt']=1
213 wmsplit[
'RECODR2_2017reHLT_skimMET_Prompt']=1
214 wmsplit[
'RECODR2_2017reHLT_skimMuOnia_Prompt']=1
215 wmsplit[
'HLTDR2_50ns']=1
216 wmsplit[
'HLTDR2_25ns']=1
217 wmsplit[
'HLTDR2_2016']=1
218 wmsplit[
'HLTDR2_2017']=1
219 wmsplit[
'Hadronizer']=1
220 wmsplit[
'DIGIUP15']=1
221 wmsplit[
'RECOUP15']=1
222 wmsplit[
'RECOAODUP15']=5
223 wmsplit[
'DBLMINIAODMCUP15NODQM']=5
224 wmsplit[
'DigiFull']=5
225 wmsplit[
'RecoFull']=5
226 wmsplit[
'DigiFullPU']=1
227 wmsplit[
'RecoFullPU']=1
228 wmsplit[
'RECOHID11']=1
229 wmsplit[
'DigiFullTriggerPU_2023D17PU'] = 1
230 wmsplit[
'RecoFullGlobalPU_2023D17PU']=1
231 wmsplit[
'DIGIUP17']=1
232 wmsplit[
'RECOUP17']=1
233 wmsplit[
'DIGIUP17_PU25']=1
234 wmsplit[
'RECOUP17_PU25']=1
235 wmsplit[
'DIGICOS_UP17']=1
236 wmsplit[
'RECOCOS_UP17']=1
242 print "Not set up for step splitting" 246 for (n,dir)
in directories.items():
248 print "inspecting",dir
250 for (x,s)
in mReader.workFlowSteps.items():
260 if len( [step
for step
in s[3]
if "HARVESTGEN" in step] )>0:
261 chainDict[
'TimePerEvent']=0.01
262 thisLabel=thisLabel+
"_gen" 264 if len( [step
for step
in s[3]
if "DBLMINIAODMCUP15NODQM" in step] )>0:
265 thisLabel=thisLabel+
"_dblMiniAOD" 271 if 'INPUT' in step
or (
not isinstance(s[2][index],str)):
272 nextHasDSInput=s[2][index]
280 chainDict[
'nowmTasklist'][-1][
'nowmIO']=json.loads(open(
'%s/%s.io'%(dir,step)).read())
282 print "Failed to find",
'%s/%s.io'%(dir,step),
".The workflows were probably not run on cfg not created" 285 chainDict[
'nowmTasklist'][-1][
'PrimaryDataset']=
'RelVal'+s[1].
split(
'+')[0]
286 if not '--relval' in s[2][index]:
287 print 'Impossible to create task from scratch without splitting information with --relval' 290 arg=s[2][index].
split()
291 ns=
map(int,arg[arg.index(
'--relval')+1].
split(
','))
292 chainDict[
'nowmTasklist'][-1][
'RequestNumEvents'] = ns[0]
293 chainDict[
'nowmTasklist'][-1][
'EventsPerJob'] = ns[1]
294 if 'FASTSIM' in s[2][index]
or '--fast' in s[2][index]:
295 thisLabel+=
'_FastSim' 296 if 'lhe' in s[2][index]
in s[2][index]:
297 chainDict[
'nowmTasklist'][-1][
'LheInputFiles'] =
True 302 chainDict[
'nowmTasklist'][-1][
'nowmIO']=json.loads(open(
'%s/%s.io'%(dir,step)).read())
304 print "Failed to find",
'%s/%s.io'%(dir,step),
".The workflows were probably not run on cfg not created" 306 chainDict[
'nowmTasklist'][-1][
'InputDataset']=nextHasDSInput.dataSet
307 splitForThisWf=nextHasDSInput.split
308 chainDict[
'nowmTasklist'][-1][
'LumisPerJob']=splitForThisWf
310 chainDict[
'nowmTasklist'][-1][
'LumisPerJob']=wmsplit[step]
312 if len(nextHasDSInput.run):
313 chainDict[
'nowmTasklist'][-1][
'RunWhitelist']=nextHasDSInput.run
314 if len(nextHasDSInput.ls):
315 chainDict[
'nowmTasklist'][-1][
'LumiList']=nextHasDSInput.ls
317 if '--data' in s[2][index]
and nextHasDSInput.label:
318 thisLabel+=
'_RelVal_%s'%nextHasDSInput.label
319 if 'filter' in chainDict[
'nowmTasklist'][-1][
'nowmIO']:
320 print "This has an input DS and a filter sequence: very likely to be the PyQuen sample" 321 processStrPrefix=
'PU_' 322 setPrimaryDs =
'RelVal'+s[1].
split(
'+')[0]
324 chainDict[
'nowmTasklist'][-1][
'PrimaryDataset']=setPrimaryDs
330 chainDict[
'nowmTasklist'][-1][
'nowmIO']=json.loads(open(
'%s/%s.io'%(dir,step)).read())
332 print "Failed to find",
'%s/%s.io'%(dir,step),
".The workflows were probably not run on cfg not created" 335 chainDict[
'nowmTasklist'][-1][
'LumisPerJob']=splitForThisWf
337 chainDict[
'nowmTasklist'][-1][
'LumisPerJob']=wmsplit[step]
340 if 'Hadronizer' in step:
341 chainDict[
'nowmTasklist'][-1][
'LumisPerJob']=wmsplit[
'Hadronizer']
344 chainDict[
'nowmTasklist'][-1][
'TaskName']=step
346 chainDict[
'nowmTasklist'][-1][
'PrimaryDataset']=setPrimaryDs
347 chainDict[
'nowmTasklist'][-1][
'ConfigCacheID']=
'%s/%s.py'%(dir,step)
348 chainDict[
'nowmTasklist'][-1][
'GlobalTag']=chainDict[
'nowmTasklist'][-1][
'nowmIO'][
'GT']
349 chainDict[
'GlobalTag']=chainDict[
'nowmTasklist'][-1][
'nowmIO'][
'GT']
350 if 'NANOEDM' in step :
351 nanoedmGT = chainDict[
'nowmTasklist'][-1][
'nowmIO'][
'GT']
352 if 'NANOMERGE' in step :
353 chainDict[
'GlobalTag'] = nanoedmGT
354 if 'pileup' in chainDict[
'nowmTasklist'][-1][
'nowmIO']:
355 chainDict[
'nowmTasklist'][-1][
'MCPileup']=chainDict[
'nowmTasklist'][-1][
'nowmIO'][
'pileup']
356 if '--pileup ' in s[2][index]:
357 processStrPrefix=
'PU_' 359 processStrPrefix=
'PU25ns_' 361 processStrPrefix=
'PU50ns_' 362 if 'DIGIPREMIX_S2' in s[2][index] :
364 processStrPrefix=
'PUpmx25ns_' 365 elif s[2][index].
split()[ s[2][index].
split().
index(
'--pileup_input')+1 ].
find(
'50ns') > 0 :
366 processStrPrefix=
'PUpmx50ns_' 370 chainDict[
'AcquisitionEra'][step]=chainDict[
'CMSSWVersion']
371 chainDict[
'ProcessingString'][step]=processStrPrefix+chainDict[
'nowmTasklist'][-1][
'GlobalTag'].
replace(
'::All',
'').
replace(
'-',
'_')+thisLabel
372 if 'NANOMERGE' in step :
373 chainDict[
'ProcessingString'][step]=processStrPrefix+nanoedmGT.replace(
'::All',
'').
replace(
'-',
'_')+thisLabel
376 chainDict[
'nowmTasklist'][-1][
'AcquisitionEra']=chainDict[
'CMSSWVersion']
377 chainDict[
'nowmTasklist'][-1][
'ProcessingString']=processStrPrefix+chainDict[
'nowmTasklist'][-1][
'GlobalTag'].
replace(
'::All',
'').
replace(
'-',
'_')+thisLabel
378 if 'NANOMERGE' in step :
379 chainDict[
'nowmTasklist'][-1][
'ProcessingString']=processStrPrefix+nanoedmGT.replace(
'::All',
'').
replace(
'-',
'_')+thisLabel
382 chainDict[
'nowmTasklist'][-1][
'Campaign'] = chainDict[
'nowmTasklist'][-1][
'AcquisitionEra']+self.
batchName 385 if (
'DBLMINIAODMCUP15NODQM' in step):
386 chainDict[
'nowmTasklist'][-1][
'ProcessingString']=chainDict[
'nowmTasklist'][-1][
'ProcessingString']+
'_miniAOD' 388 if( chainDict[
'nowmTasklist'][-1][
'Multicore'] ):
395 chainDict[
'nowmTasklist'][-1][
'Memory'] = self.
memoryOffset +
int( chainDict[
'nowmTasklist'][-1][
'Multicore'] -1 ) * self.
memPerCore 399 chainDict[
'RequestString']=
'RV'+chainDict[
'CMSSWVersion']+s[1].
split(
'+')[0]
400 if processStrPrefix
or thisLabel:
401 chainDict[
'RequestString']+=
'_'+processStrPrefix+thisLabel
404 chainDict[
'PrepID'] = chainDict[
'CMSSWVersion']+
'__'+self.
batchTime+
'-'+s[1].
split(
'+')[0]
406 chainDict[
'PrepID'] = chainDict[
'CMSSWVersion']+self.
batchName+
'-'+s[1].
split(
'+')[0]
408 chainDict[
'SubRequestType'] =
"HIRelVal" 415 for i_second
in reversed(range(len(chainDict[
'nowmTasklist']))):
416 t_second=chainDict[
'nowmTasklist'][i_second]
418 if 'primary' in t_second[
'nowmIO']:
420 primary=t_second[
'nowmIO'][
'primary'][0].
replace(
'file:',
'')
421 for i_input
in reversed(range(0,i_second)):
422 t_input=chainDict[
'nowmTasklist'][i_input]
423 for (om,o)
in t_input[
'nowmIO'].
items():
426 t_second[
'InputTask'] = t_input[
'TaskName']
427 t_second[
'InputFromOutputModule'] = om
429 if t_second[
'TaskName'].startswith(
'HARVEST'):
431 chainDict[
'DQMConfigCacheID']=t_second[
'ConfigCacheID']
451 chainDict[
'AcquisitionEra'] = chainDict[
'AcquisitionEra'].
values()[0]
452 chainDict[
'ProcessingString'] = chainDict[
'ProcessingString'].
values()[0]
454 chainDict[
'AcquisitionEra'] = chainDict[
'nowmTasklist'][0][
'AcquisitionEra']
455 chainDict[
'ProcessingString'] = chainDict[
'nowmTasklist'][0][
'ProcessingString']
459 chainDict[
'Campaign'] = chainDict[
'AcquisitionEra']+self.
batchName 465 if type(i)==int
and i < len(chainDict[
'nowmTasklist']):
466 chainDict[
'nowmTasklist'][i][
'KeepOutput']=
True 467 for (i,t)
in enumerate(chainDict[
'nowmTasklist']):
468 if t[
'TaskName'].startswith(
'HARVEST'):
472 elif t[
'TaskName']
in self.
keep:
476 chainDict[
'Task%d'%(itask)]=t
483 chainDict[
'TaskChain']=itask
485 chainDict.pop(
'nowmTasklist')
492 labelInCouch=self.
label+
'_'+label
493 cacheName=filePath.split(
'/')[-1]
496 print '\tFake upload of',filePath,
'to couch with label',labelInCouch
500 from modules.wma
import upload_to_couch,DATABASE_NAME
502 print '\n\tUnable to find wmcontrol modules. Please include it in your python path\n' 507 print "Not re-uploading",filePath,
"to",where,
"for",label
510 print "Loading",filePath,
"to",where,
"for",label
512 pool = multiprocessing.Pool(1)
513 cacheIds = pool.map( upload_to_couch_oneArg, [(filePath,labelInCouch,self.
user,self.
group,where)] )
514 cacheId = cacheIds[0]
519 for (n,d)
in self.chainDicts.items():
521 if it.startswith(
"Task")
and it!=
'TaskChain':
523 couchID=self.
uploadConf(d[it][
'ConfigCacheID'],
524 str(n)+d[it][
'TaskName'],
527 print d[it][
'ConfigCacheID'],
" uploaded to couchDB for",
str(n),
"with ID",couchID
528 d[it][
'ConfigCacheID']=couchID
529 if it ==
'DQMConfigCacheID':
530 couchID=self.
uploadConf(d[
'DQMConfigCacheID'],
534 print d[
'DQMConfigCacheID'],
"uploaded to couchDB for",
str(n),
"with ID",couchID
535 d[
'DQMConfigCacheID']=couchID
540 from modules.wma
import makeRequest,approveRequest
541 from wmcontrol
import random_sleep
542 print '\n\tFound wmcontrol\n' 544 print '\n\tUnable to find wmcontrol modules. Please include it in your python path\n' 550 for (n,d)
in self.chainDicts.items():
552 print "Only viewing request",n
553 print pprint.pprint(d)
556 print "For eyes before submitting",n
557 print pprint.pprint(d)
558 print "Submitting",n,
"..........." 559 workFlow=makeRequest(self.
wmagent,d,encodeDict=
True)
560 print "...........",n,
"submitted"
def prepare(self, mReader, directories, mode='init')
def performInjectionOptionTest(opt)
def uploadConf(self, filePath, label, where)
the info are not in the task specific dict but in the general dict t_input.update(copy.deepcopy(self.defaultHarvest)) t_input['DQMConfigCacheID']=t_second['ConfigCacheID']
def replace(string, replacements)
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
def __init__(self, opt, mode='init', options='')
def upload_to_couch_oneArg(arguments)