9 print 'Not injecting to wmagent in --show mode. Need to run the worklfows.'
11 if opt.wmcontrol==
'init':
14 if opt.wmcontrol==
'test':
17 if opt.wmcontrol==
'submit' and opt.nThreads==0:
18 print 'Not injecting to wmagent in -j 0 mode. Need to run the worklfows.'
20 if opt.wmcontrol==
'force':
21 print "This is an expert setting, you'd better know what you're doing"
25 from modules.wma
import upload_to_couch
26 (filePath,labelInCouch,user,group,where) = arguments
27 cacheId=upload_to_couch(filePath,
40 self.
testMode=((mode!=
'submit')
and (mode!=
'force'))
53 self.
user = os.getenv(
'USER')
61 if not os.getenv(
'WMCORE_ROOT'):
62 print '\n\twmclient is not setup properly. Will not be able to upload or submit requests.\n'
67 print '\n\tFound wmclient\n'
70 "RequestType" :
"TaskChain",
71 "Requestor": self.
user,
73 "CMSSWVersion": os.getenv(
'CMSSW_VERSION'),
74 "Campaign": os.getenv(
'CMSSW_VERSION'),
75 "ScramArch": os.getenv(
'SCRAM_ARCH'),
76 "ProcessingVersion": self.
version,
78 "CouchURL": self.
couch,
79 "ConfigCacheURL": self.
couch,
80 "DbsUrl":
"http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet",
83 "SiteWhitelist" : [
"T2_CH_CERN",
"T1_US_FNAL"],
86 "unmergedLFNBase" :
"/store/unmerged",
87 "mergedLFNBase" :
"/store/relval",
88 "dashboardActivity" :
"relval",
90 "SizePerEvent" : 1234,
95 "EnableHarvesting" :
"True",
96 "DQMUploadUrl" :
"https://cmsweb.cern.ch/dqm/relval",
97 "DQMConfigCacheID" :
None
102 "ConfigCacheID" :
None,
104 "SplittingAlgo" :
"EventBased",
105 "EventsPerJob" :
None,
106 "RequestNumEvents" :
None,
107 "Seeding" :
"AutomaticSeeding",
108 "PrimaryDataset" :
None,
113 "TaskName" :
"DigiHLT",
114 "ConfigCacheID" :
None,
116 "InputDataset" :
None,
117 "SplittingAlgo" :
"LumiBased",
125 "InputFromOutputModule" :
None,
126 "ConfigCacheID" :
None,
128 "SplittingAlgo" :
"LumiBased",
137 def prepare(self,mReader, directories, mode='init'):
141 pprint.pprint(wmsplit)
143 print "Not set up for step splitting"
147 for (n,dir)
in directories.items():
149 print "inspecting",dir
151 for (x,s)
in mReader.workFlowSteps.items():
156 chainDict[
'RequestString']=
'RV'+chainDict[
'CMSSWVersion']+s[1].
split(
'+')[0]
163 if 'INPUT' in step
or (
not isinstance(s[2][index],str)):
164 nextHasDSInput=s[2][index]
172 chainDict[
'nowmTasklist'][-1][
'nowmIO']=json.loads(open(
'%s/%s.io'%(dir,step)).
read())
174 print "Failed to find",
'%s/%s.io'%(dir,step),
".The workflows were probably not run on cfg not created"
177 chainDict[
'nowmTasklist'][-1][
'PrimaryDataset']=
'RelVal'+s[1].
split(
'+')[0]
178 if not '--relval' in s[2][index]:
179 print 'Impossible to create task from scratch without splitting information with --relval'
182 arg=s[2][index].
split()
183 ns=
map(int,arg[arg.index(
'--relval')+1].
split(
','))
184 chainDict[
'nowmTasklist'][-1][
'RequestNumEvents'] = ns[0]
185 chainDict[
'nowmTasklist'][-1][
'EventsPerJob'] = ns[1]
186 if 'FASTSIM' in s[2][index]
or '--fast' in s[2][index]:
187 thisLabel+=
'_FastSim'
192 chainDict[
'nowmTasklist'][-1][
'nowmIO']=json.loads(open(
'%s/%s.io'%(dir,step)).
read())
194 print "Failed to find",
'%s/%s.io'%(dir,step),
".The workflows were probably not run on cfg not created"
196 chainDict[
'nowmTasklist'][-1][
'InputDataset']=nextHasDSInput.dataSet
197 splitForThisWf=nextHasDSInput.split
198 chainDict[
'nowmTasklist'][-1][
'LumisPerJob']=splitForThisWf
200 chainDict[
'nowmTasklist'][-1][
'LumisPerJob']=wmsplit[step]
202 if len(nextHasDSInput.run):
203 chainDict[
'nowmTasklist'][-1][
'RunWhitelist']=nextHasDSInput.run
205 if '--data' in s[2][index]
and nextHasDSInput.label:
206 thisLabel+=
'_RelVal_%s'%nextHasDSInput.label
207 if 'filter' in chainDict[
'nowmTasklist'][-1][
'nowmIO']:
208 print "This has an input DS and a filter sequence: very likely to be the PyQuen sample"
209 processStrPrefix=
'PU_'
210 chainDict[
'nowmTasklist'][-1][
'PrimaryDataset']=
'RelVal'+s[1].
split(
'+')[0]
216 chainDict[
'nowmTasklist'][-1][
'nowmIO']=json.loads(open(
'%s/%s.io'%(dir,step)).
read())
218 print "Failed to find",
'%s/%s.io'%(dir,step),
".The workflows were probably not run on cfg not created"
221 chainDict[
'nowmTasklist'][-1][
'LumisPerJob']=splitForThisWf
223 chainDict[
'nowmTasklist'][-1][
'LumisPerJob']=wmsplit[step]
226 chainDict[
'nowmTasklist'][-1][
'TaskName']=step
227 chainDict[
'nowmTasklist'][-1][
'ConfigCacheID']=
'%s/%s.py'%(dir,step)
228 chainDict[
'nowmTasklist'][-1][
'GlobalTag']=chainDict[
'nowmTasklist'][-1][
'nowmIO'][
'GT']
229 chainDict[
'GlobalTag']=chainDict[
'nowmTasklist'][-1][
'nowmIO'][
'GT']
230 if 'pileup' in chainDict[
'nowmTasklist'][-1][
'nowmIO']:
231 chainDict[
'nowmTasklist'][-1][
'MCPileup']=chainDict[
'nowmTasklist'][-1][
'nowmIO'][
'pileup']
232 if '--pileup' in s[2][index]:
233 processStrPrefix=
'PU_'
236 chainDict[
'AcquisitionEra'][step]=chainDict[
'CMSSWVersion']
237 chainDict[
'ProcessingString'][step]=processStrPrefix+chainDict[
'nowmTasklist'][-1][
'GlobalTag'].
replace(
'::All',
'')+thisLabel
240 chainDict[
'nowmTasklist'][-1][
'AcquisitionEra']=chainDict[
'CMSSWVersion']
241 chainDict[
'nowmTasklist'][-1][
'ProcessingString']=processStrPrefix+chainDict[
'nowmTasklist'][-1][
'GlobalTag'].
replace(
'::All',
'')+thisLabel
250 for i_second
in reversed(range(len(chainDict[
'nowmTasklist']))):
251 t_second=chainDict[
'nowmTasklist'][i_second]
253 if 'primary' in t_second[
'nowmIO']:
255 primary=t_second[
'nowmIO'][
'primary'][0].
replace(
'file:',
'')
256 for i_input
in reversed(range(0,i_second)):
257 t_input=chainDict[
'nowmTasklist'][i_input]
258 for (om,o)
in t_input[
'nowmIO'].items():
261 t_second[
'InputTask'] = t_input[
'TaskName']
262 t_second[
'InputFromOutputModule'] = om
264 if t_second[
'TaskName'].startswith(
'HARVEST'):
266 chainDict[
'DQMConfigCacheID']=t_second[
'ConfigCacheID']
276 chainDict[
'AcquisitionEra'] = chainDict[
'AcquisitionEra'].
values()[0]
282 if type(i)==int
and i < len(chainDict[
'nowmTasklist']):
283 chainDict[
'nowmTasklist'][i][
'KeepOutput']=
True
284 for (i,t)
in enumerate(chainDict[
'nowmTasklist']):
285 if t[
'TaskName'].startswith(
'HARVEST'):
289 elif t[
'TaskName']
in self.
keep:
293 chainDict[
'Task%d'%(itask)]=t
300 chainDict[
'TaskChain']=itask
302 chainDict.pop(
'nowmTasklist')
309 labelInCouch=self.
label+
'_'+label
310 cacheName=filePath.split(
'/')[-1]
313 print '\tFake upload of',filePath,
'to couch with label',labelInCouch
317 from modules.wma
import upload_to_couch,DATABASE_NAME
319 print '\n\tUnable to find wmcontrol modules. Please include it in your python path\n'
324 print "Not re-uploading",filePath,
"to",where,
"for",label
327 print "Loading",filePath,
"to",where,
"for",label
329 pool = multiprocessing.Pool(1)
330 cacheIds = pool.map( upload_to_couch_oneArg, [(filePath,labelInCouch,self.
user,self.
group,where)] )
331 cacheId = cacheIds[0]
336 for (n,d)
in self.chainDicts.items():
338 if it.startswith(
"Task")
and it!=
'TaskChain':
340 couchID=self.
uploadConf(d[it][
'ConfigCacheID'],
341 str(n)+d[it][
'TaskName'],
344 print d[it][
'ConfigCacheID'],
" uploaded to couchDB for",str(n),
"with ID",couchID
345 d[it][
'ConfigCacheID']=couchID
346 if it ==
'DQMConfigCacheID':
347 couchID=self.
uploadConf(d[
'DQMConfigCacheID'],
351 print d[
'DQMConfigCacheID'],
"uploaded to couchDB for",str(n),
"with ID",couchID
352 d[
'DQMConfigCacheID']=couchID
357 from modules.wma
import makeRequest,approveRequest
358 from wmcontrol
import random_sleep
359 print '\n\tFound wmcontrol\n'
361 print '\n\tUnable to find wmcontrol modules. Please include it in your python path\n'
367 for (n,d)
in self.chainDicts.items():
369 print "Only viewing request",n
370 print pprint.pprint(d)
373 print "For eyes before submitting",n
374 print pprint.pprint(d)
375 print "Submitting",n,
"..........."
376 workFlow=makeRequest(self.
wmagent,d,encodeDict=
True)
377 approveRequest(self.
wmagent,workFlow)
378 print "...........",n,
"submitted"
def performInjectionOptionTest
def upload_to_couch_oneArg