9 print 'Not injecting to wmagent in --show mode. Need to run the worklfows.'
11 if opt.wmcontrol==
'init':
14 if opt.wmcontrol==
'test':
17 if opt.wmcontrol==
'submit' and opt.nThreads==0:
18 print 'Not injecting to wmagent in -j 0 mode. Need to run the worklfows.'
20 if opt.wmcontrol==
'force':
21 print "This is an expert setting, you'd better know what you're doing"
25 from modules.wma
import upload_to_couch
26 (filePath,labelInCouch,user,group,where) = arguments
27 cacheId=upload_to_couch(filePath,
43 for k
in options.split(
','):
44 if k.startswith(
'dqm:'):
45 self.
dqmgui=k.split(
':',1)[-1]
46 elif k.startswith(
'wma:'):
49 self.
testMode=((mode!=
'submit')
and (mode!=
'force'))
55 self.
wmagent=os.getenv(
'WMAGENT_REQMGR')
60 self.
dqmgui=
"https://cmsweb.cern.ch/dqm/relval"
65 self.
user = os.getenv(
'USER')
73 if not os.getenv(
'WMCORE_ROOT'):
74 print '\n\twmclient is not setup properly. Will not be able to upload or submit requests.\n'
79 print '\n\tFound wmclient\n'
82 "RequestType" :
"TaskChain",
83 "SubRequestType" :
"RelVal",
84 "RequestPriority": 500000,
85 "Requestor": self.
user,
87 "CMSSWVersion": os.getenv(
'CMSSW_VERSION'),
88 "Campaign": os.getenv(
'CMSSW_VERSION'),
89 "ScramArch": os.getenv(
'SCRAM_ARCH'),
90 "ProcessingVersion": self.
version,
92 "CouchURL": self.
couch,
93 "ConfigCacheURL": self.
couch,
94 "DbsUrl":
"https://cmsweb.cern.ch/dbs/prod/global/DBSReader",
99 "unmergedLFNBase" :
"/store/unmerged",
100 "mergedLFNBase" :
"/store/relval",
101 "dashboardActivity" :
"relval",
103 "SizePerEvent" : 1234,
108 "EnableHarvesting" :
"True",
109 "DQMUploadUrl" : self.
dqmgui,
110 "DQMConfigCacheID" :
None
115 "ConfigCacheID" :
None,
117 "SplittingAlgo" :
"EventBased",
118 "EventsPerJob" :
None,
119 "RequestNumEvents" :
None,
120 "Seeding" :
"AutomaticSeeding",
121 "PrimaryDataset" :
None,
126 "TaskName" :
"DigiHLT",
127 "ConfigCacheID" :
None,
129 "InputDataset" :
None,
130 "SplittingAlgo" :
"LumiBased",
138 "InputFromOutputModule" :
None,
139 "ConfigCacheID" :
None,
141 "SplittingAlgo" :
"LumiBased",
150 def prepare(self,mReader, directories, mode='init'):
157 wmsplit[
'RECODreHLT']=2
161 wmsplit[
'DIGIUP15_PU50']=1
162 wmsplit[
'RECOUP15_PU50']=1
163 wmsplit[
'DIGIUP15_PU25']=1
164 wmsplit[
'RECOUP15_PU25']=1
165 wmsplit[
'DIGIHISt3']=5
166 wmsplit[
'RECODSplit']=1
167 wmsplit[
'SingleMuPt10_UP15_ID']=1
168 wmsplit[
'DIGIUP15_ID']=1
169 wmsplit[
'RECOUP15_ID']=1
170 wmsplit[
'TTbar_13_ID']=1
171 wmsplit[
'SingleMuPt10FS_ID']=1
172 wmsplit[
'TTbarFS_ID']=1
177 print "Not set up for step splitting"
181 for (n,dir)
in directories.items():
183 print "inspecting",dir
185 for (x,s)
in mReader.workFlowSteps.items():
194 if 'HARVESTGEN' in s[3]:
195 thisLabel=thisLabel+
"_gen"
200 if 'INPUT' in step
or (
not isinstance(s[2][index],str)):
201 nextHasDSInput=s[2][index]
209 chainDict[
'nowmTasklist'][-1][
'nowmIO']=json.loads(open(
'%s/%s.io'%(dir,step)).
read())
211 print "Failed to find",
'%s/%s.io'%(dir,step),
".The workflows were probably not run on cfg not created"
214 chainDict[
'nowmTasklist'][-1][
'PrimaryDataset']=
'RelVal'+s[1].
split(
'+')[0]
215 if not '--relval' in s[2][index]:
216 print 'Impossible to create task from scratch without splitting information with --relval'
219 arg=s[2][index].
split()
220 ns=
map(int,arg[arg.index(
'--relval')+1].
split(
','))
221 chainDict[
'nowmTasklist'][-1][
'RequestNumEvents'] = ns[0]
222 chainDict[
'nowmTasklist'][-1][
'EventsPerJob'] = ns[1]
223 if 'FASTSIM' in s[2][index]
or '--fast' in s[2][index]:
224 thisLabel+=
'_FastSim'
225 if 'lhe' in s[2][index]
in s[2][index]:
226 chainDict[
'nowmTasklist'][-1][
'LheInputFiles'] =
True
231 chainDict[
'nowmTasklist'][-1][
'nowmIO']=json.loads(open(
'%s/%s.io'%(dir,step)).
read())
233 print "Failed to find",
'%s/%s.io'%(dir,step),
".The workflows were probably not run on cfg not created"
235 chainDict[
'nowmTasklist'][-1][
'InputDataset']=nextHasDSInput.dataSet
236 splitForThisWf=nextHasDSInput.split
237 chainDict[
'nowmTasklist'][-1][
'LumisPerJob']=splitForThisWf
239 chainDict[
'nowmTasklist'][-1][
'LumisPerJob']=wmsplit[step]
241 if len(nextHasDSInput.run):
242 chainDict[
'nowmTasklist'][-1][
'RunWhitelist']=nextHasDSInput.run
244 if '--data' in s[2][index]
and nextHasDSInput.label:
245 thisLabel+=
'_RelVal_%s'%nextHasDSInput.label
246 if 'filter' in chainDict[
'nowmTasklist'][-1][
'nowmIO']:
247 print "This has an input DS and a filter sequence: very likely to be the PyQuen sample"
248 processStrPrefix=
'PU_'
249 setPrimaryDs =
'RelVal'+s[1].
split(
'+')[0]
251 chainDict[
'nowmTasklist'][-1][
'PrimaryDataset']=setPrimaryDs
257 chainDict[
'nowmTasklist'][-1][
'nowmIO']=json.loads(open(
'%s/%s.io'%(dir,step)).
read())
259 print "Failed to find",
'%s/%s.io'%(dir,step),
".The workflows were probably not run on cfg not created"
262 chainDict[
'nowmTasklist'][-1][
'LumisPerJob']=splitForThisWf
264 chainDict[
'nowmTasklist'][-1][
'LumisPerJob']=wmsplit[step]
267 chainDict[
'nowmTasklist'][-1][
'TaskName']=step
269 chainDict[
'nowmTasklist'][-1][
'PrimaryDataset']=setPrimaryDs
270 chainDict[
'nowmTasklist'][-1][
'ConfigCacheID']=
'%s/%s.py'%(dir,step)
271 chainDict[
'nowmTasklist'][-1][
'GlobalTag']=chainDict[
'nowmTasklist'][-1][
'nowmIO'][
'GT']
272 chainDict[
'GlobalTag']=chainDict[
'nowmTasklist'][-1][
'nowmIO'][
'GT']
273 if 'pileup' in chainDict[
'nowmTasklist'][-1][
'nowmIO']:
274 chainDict[
'nowmTasklist'][-1][
'MCPileup']=chainDict[
'nowmTasklist'][-1][
'nowmIO'][
'pileup']
275 if '--pileup ' in s[2][index]:
276 processStrPrefix=
'PU_'
278 processStrPrefix=
'PU25ns_'
280 processStrPrefix=
'PU50ns_'
281 if 'DIGIPREMIX_S2' in s[2][index] :
283 processStrPrefix=
'PUpmx25ns_'
284 elif s[2][index].
split()[ s[2][index].
split().
index(
'--pileup_input')+1 ].
find(
'50ns') > 0 :
285 processStrPrefix=
'PUpmx50ns_'
289 chainDict[
'AcquisitionEra'][step]=chainDict[
'CMSSWVersion']
290 chainDict[
'ProcessingString'][step]=processStrPrefix+chainDict[
'nowmTasklist'][-1][
'GlobalTag'].
replace(
'::All',
'')+thisLabel
293 chainDict[
'nowmTasklist'][-1][
'AcquisitionEra']=chainDict[
'CMSSWVersion']
294 chainDict[
'nowmTasklist'][-1][
'ProcessingString']=processStrPrefix+chainDict[
'nowmTasklist'][-1][
'GlobalTag'].
replace(
'::All',
'')+thisLabel
298 chainDict[
'RequestString']=
'RV'+chainDict[
'CMSSWVersion']+s[1].
split(
'+')[0]
299 if processStrPrefix
or thisLabel:
300 chainDict[
'RequestString']+=
'_'+processStrPrefix+thisLabel
309 for i_second
in reversed(range(len(chainDict[
'nowmTasklist']))):
310 t_second=chainDict[
'nowmTasklist'][i_second]
312 if 'primary' in t_second[
'nowmIO']:
314 primary=t_second[
'nowmIO'][
'primary'][0].
replace(
'file:',
'')
315 for i_input
in reversed(range(0,i_second)):
316 t_input=chainDict[
'nowmTasklist'][i_input]
317 for (om,o)
in t_input[
'nowmIO'].items():
320 t_second[
'InputTask'] = t_input[
'TaskName']
321 t_second[
'InputFromOutputModule'] = om
323 if t_second[
'TaskName'].startswith(
'HARVEST'):
325 chainDict[
'DQMConfigCacheID']=t_second[
'ConfigCacheID']
335 chainDict[
'AcquisitionEra'] = chainDict[
'AcquisitionEra'].
values()[0]
341 if type(i)==int
and i < len(chainDict[
'nowmTasklist']):
342 chainDict[
'nowmTasklist'][i][
'KeepOutput']=
True
343 for (i,t)
in enumerate(chainDict[
'nowmTasklist']):
344 if t[
'TaskName'].startswith(
'HARVEST'):
348 elif t[
'TaskName']
in self.
keep:
352 chainDict[
'Task%d'%(itask)]=t
359 chainDict[
'TaskChain']=itask
361 chainDict.pop(
'nowmTasklist')
368 labelInCouch=self.
label+
'_'+label
369 cacheName=filePath.split(
'/')[-1]
372 print '\tFake upload of',filePath,
'to couch with label',labelInCouch
376 from modules.wma
import upload_to_couch,DATABASE_NAME
378 print '\n\tUnable to find wmcontrol modules. Please include it in your python path\n'
383 print "Not re-uploading",filePath,
"to",where,
"for",label
386 print "Loading",filePath,
"to",where,
"for",label
388 pool = multiprocessing.Pool(1)
389 cacheIds = pool.map( upload_to_couch_oneArg, [(filePath,labelInCouch,self.
user,self.
group,where)] )
390 cacheId = cacheIds[0]
395 for (n,d)
in self.chainDicts.items():
397 if it.startswith(
"Task")
and it!=
'TaskChain':
399 couchID=self.
uploadConf(d[it][
'ConfigCacheID'],
400 str(n)+d[it][
'TaskName'],
403 print d[it][
'ConfigCacheID'],
" uploaded to couchDB for",str(n),
"with ID",couchID
404 d[it][
'ConfigCacheID']=couchID
405 if it ==
'DQMConfigCacheID':
406 couchID=self.
uploadConf(d[
'DQMConfigCacheID'],
410 print d[
'DQMConfigCacheID'],
"uploaded to couchDB for",str(n),
"with ID",couchID
411 d[
'DQMConfigCacheID']=couchID
416 from modules.wma
import makeRequest,approveRequest
417 from wmcontrol
import random_sleep
418 print '\n\tFound wmcontrol\n'
420 print '\n\tUnable to find wmcontrol modules. Please include it in your python path\n'
426 for (n,d)
in self.chainDicts.items():
428 print "Only viewing request",n
429 print pprint.pprint(d)
432 print "For eyes before submitting",n
433 print pprint.pprint(d)
434 print "Submitting",n,
"..........."
435 workFlow=makeRequest(self.
wmagent,d,encodeDict=
True)
436 approveRequest(self.
wmagent,workFlow)
437 print "...........",n,
"submitted"
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
def performInjectionOptionTest
def upload_to_couch_oneArg