CMS 3D CMS Logo

MatrixInjector.py
Go to the documentation of this file.
1 import sys
2 import json
3 import os
4 import copy
5 import multiprocessing
6 import time
7 
9  if opt.show:
10  print 'Not injecting to wmagent in --show mode. Need to run the worklfows.'
11  sys.exit(-1)
12  if opt.wmcontrol=='init':
13  #init means it'll be in test mode
14  opt.nProcs=0
15  if opt.wmcontrol=='test':
16  #means the wf were created already, and we just dryRun it.
17  opt.dryRun=True
18  if opt.wmcontrol=='submit' and opt.nProcs==0:
19  print 'Not injecting to wmagent in -j 0 mode. Need to run the worklfows.'
20  sys.exit(-1)
21  if opt.wmcontrol=='force':
22  print "This is an expert setting, you'd better know what you're doing"
23  opt.dryRun=True
24 
25 def upload_to_couch_oneArg(arguments):
26  from modules.wma import upload_to_couch
27  (filePath,labelInCouch,user,group,where) = arguments
28  cacheId=upload_to_couch(filePath,
29  labelInCouch,
30  user,
31  group,
32  test_mode=False,
33  url=where)
34  return cacheId
35 
36 
38 
39  def __init__(self,opt,mode='init',options=''):
40  self.count=1040
41 
42  self.dqmgui=None
43  self.wmagent=None
44  for k in options.split(','):
45  if k.startswith('dqm:'):
46  self.dqmgui=k.split(':',1)[-1]
47  elif k.startswith('wma:'):
48  self.wmagent=k.split(':',1)[-1]
49 
50  self.testMode=((mode!='submit') and (mode!='force'))
51  self.version =1
52  self.keep = opt.keep
53  self.memoryOffset = opt.memoryOffset
54  self.memPerCore = opt.memPerCore
55  self.batchName = ''
56  self.batchTime = str(int(time.time()))
57  if(opt.batchName):
58  self.batchName = '__'+opt.batchName+'-'+self.batchTime
59 
60  #wagemt stuff
61  if not self.wmagent:
62  self.wmagent=os.getenv('WMAGENT_REQMGR')
63  if not self.wmagent:
64  if not opt.testbed :
65  self.wmagent = 'cmsweb.cern.ch'
66  self.DbsUrl = "https://"+self.wmagent+"/dbs/prod/global/DBSReader"
67  else :
68  self.wmagent = 'cmsweb-testbed.cern.ch'
69  self.DbsUrl = "https://"+self.wmagent+"/dbs/int/global/DBSReader"
70 
71  if not self.dqmgui:
72  self.dqmgui="https://cmsweb.cern.ch/dqm/relval"
73  #couch stuff
74  self.couch = 'https://'+self.wmagent+'/couchdb'
75 # self.couchDB = 'reqmgr_config_cache'
76  self.couchCache={} # so that we do not upload like crazy, and recyle cfgs
77  self.user = os.getenv('USER')
78  self.group = 'ppd'
79  self.label = 'RelValSet_'+os.getenv('CMSSW_VERSION').replace('-','')+'_v'+str(self.version)
80  self.speciallabel=''
81  if opt.label:
82  self.speciallabel= '_'+opt.label
83 
84 
85  if not os.getenv('WMCORE_ROOT'):
86  print '\n\twmclient is not setup properly. Will not be able to upload or submit requests.\n'
87  if not self.testMode:
88  print '\n\t QUIT\n'
89  sys.exit(-18)
90  else:
91  print '\n\tFound wmclient\n'
92 
93  self.defaultChain={
94  "RequestType" : "TaskChain", #this is how we handle relvals
95  "SubRequestType" : "RelVal", #this is how we handle relvals, now that TaskChain is also used for central MC production
96  "RequestPriority": 500000,
97  "Requestor": self.user, #Person responsible
98  "Group": self.group, #group for the request
99  "CMSSWVersion": os.getenv('CMSSW_VERSION'), #CMSSW Version (used for all tasks in chain)
100  "Campaign": os.getenv('CMSSW_VERSION'), # = AcquisitionEra, will be reset later to the one of first task, will both be the CMSSW_VERSION
101  "ScramArch": os.getenv('SCRAM_ARCH'), #Scram Arch (used for all tasks in chain)
102  "ProcessingVersion": self.version, #Processing Version (used for all tasks in chain)
103  "GlobalTag": None, #Global Tag (overridden per task)
104  "ConfigCacheUrl": self.couch, #URL of CouchDB containing Config Cache
105  "DbsUrl": self.DbsUrl,
106  #- Will contain all configs for all Tasks
107  #"SiteWhitelist" : ["T2_CH_CERN", "T1_US_FNAL"], #Site whitelist
108  "TaskChain" : None, #Define number of tasks in chain.
109  "nowmTasklist" : [], #a list of tasks as we put them in
110  "Multicore" : 1, # do not set multicore for the whole chain
111  "Memory" : 3000,
112  "SizePerEvent" : 1234,
113  "TimePerEvent" : 10,
114  "PrepID": os.getenv('CMSSW_VERSION')
115  }
116 
118  "EnableHarvesting" : "True",
119  "DQMUploadUrl" : self.dqmgui,
120  "DQMConfigCacheID" : None,
121  "Multicore" : 1 # hardcode Multicore to be 1 for Harvest
122  }
123 
125  "TaskName" : None, #Task Name
126  "ConfigCacheID" : None, #Generator Config id
127  "GlobalTag": None,
128  "SplittingAlgo" : "EventBased", #Splitting Algorithm
129  "EventsPerJob" : None, #Size of jobs in terms of splitting algorithm
130  "RequestNumEvents" : None, #Total number of events to generate
131  "Seeding" : "AutomaticSeeding", #Random seeding method
132  "PrimaryDataset" : None, #Primary Dataset to be created
133  "nowmIO": {},
134  "Multicore" : opt.nThreads, # this is the per-taskchain Multicore; it's the default assigned to a task if it has no value specified
135  "KeepOutput" : False
136  }
138  "TaskName" : "DigiHLT", #Task Name
139  "ConfigCacheID" : None, #Processing Config id
140  "GlobalTag": None,
141  "InputDataset" : None, #Input Dataset to be processed
142  "SplittingAlgo" : "LumiBased", #Splitting Algorithm
143  "LumisPerJob" : 10, #Size of jobs in terms of splitting algorithm
144  "nowmIO": {},
145  "Multicore" : opt.nThreads, # this is the per-taskchain Multicore; it's the default assigned to a task if it has no value specified
146  "KeepOutput" : False
147  }
148  self.defaultTask={
149  "TaskName" : None, #Task Name
150  "InputTask" : None, #Input Task Name (Task Name field of a previous Task entry)
151  "InputFromOutputModule" : None, #OutputModule name in the input task that will provide files to process
152  "ConfigCacheID" : None, #Processing Config id
153  "GlobalTag": None,
154  "SplittingAlgo" : "LumiBased", #Splitting Algorithm
155  "LumisPerJob" : 10, #Size of jobs in terms of splitting algorithm
156  "nowmIO": {},
157  "Multicore" : opt.nThreads, # this is the per-taskchain Multicore; it's the default assigned to a task if it has no value specified
158  "KeepOutput" : False
159  }
160 
161  self.chainDicts={}
162 
163 
164  def prepare(self,mReader, directories, mode='init'):
165  try:
166  #from Configuration.PyReleaseValidation.relval_steps import wmsplit
167  wmsplit = {}
168  wmsplit['DIGIHI']=5
169  wmsplit['RECOHI']=5
170  wmsplit['HLTD']=5
171  wmsplit['RECODreHLT']=2
172  wmsplit['DIGIPU']=4
173  wmsplit['DIGIPU1']=4
174  wmsplit['RECOPU1']=1
175  wmsplit['DIGIUP15_PU50']=1
176  wmsplit['RECOUP15_PU50']=1
177  wmsplit['DIGIUP15_PU25']=1
178  wmsplit['RECOUP15_PU25']=1
179  wmsplit['DIGIUP15_PU25HS']=1
180  wmsplit['RECOUP15_PU25HS']=1
181  wmsplit['DIGIHIMIX']=5
182  wmsplit['RECOHIMIX']=5
183  wmsplit['RECODSplit']=1
184  wmsplit['SingleMuPt10_UP15_ID']=1
185  wmsplit['DIGIUP15_ID']=1
186  wmsplit['RECOUP15_ID']=1
187  wmsplit['TTbar_13_ID']=1
188  wmsplit['SingleMuPt10FS_ID']=1
189  wmsplit['TTbarFS_ID']=1
190  wmsplit['RECODR2_50nsreHLT']=5
191  wmsplit['RECODR2_25nsreHLT']=5
192  wmsplit['RECODR2_2016reHLT']=5
193  wmsplit['RECODR2_50nsreHLT_HIPM']=5
194  wmsplit['RECODR2_25nsreHLT_HIPM']=5
195  wmsplit['RECODR2_2016reHLT_HIPM']=1
196  wmsplit['RECODR2_2016reHLT_skimSingleMu']=1
197  wmsplit['RECODR2_2016reHLT_skimDoubleEG']=1
198  wmsplit['RECODR2_2016reHLT_skimMuonEG']=1
199  wmsplit['RECODR2_2016reHLT_skimJetHT']=1
200  wmsplit['RECODR2_2016reHLT_skimMET']=1
201  wmsplit['RECODR2_2016reHLT_skimSinglePh']=1
202  wmsplit['RECODR2_2016reHLT_skimMuOnia']=1
203  wmsplit['RECODR2_2016reHLT_skimSingleMu_HIPM']=1
204  wmsplit['RECODR2_2016reHLT_skimDoubleEG_HIPM']=1
205  wmsplit['RECODR2_2016reHLT_skimMuonEG_HIPM']=1
206  wmsplit['RECODR2_2016reHLT_skimJetHT_HIPM']=1
207  wmsplit['RECODR2_2016reHLT_skimMET_HIPM']=1
208  wmsplit['RECODR2_2016reHLT_skimSinglePh_HIPM']=1
209  wmsplit['RECODR2_2016reHLT_skimMuOnia_HIPM']=1
210  wmsplit['RECODR2_2017reHLT_Prompt']=1
211  wmsplit['RECODR2_2017reHLT_skimSingleMu_Prompt_Lumi']=1
212  wmsplit['RECODR2_2017reHLT_skimDoubleEG_Prompt']=1
213  wmsplit['RECODR2_2017reHLT_skimMET_Prompt']=1
214  wmsplit['RECODR2_2017reHLT_skimMuOnia_Prompt']=1
215  wmsplit['RECODR2_2017reHLT_Prompt_L1TEgDQM']=1
216  wmsplit['RECODR2_2018reHLT_Prompt']=1
217  wmsplit['RECODR2_2018reHLT_skimSingleMu_Prompt_Lumi']=1
218  wmsplit['RECODR2_2018reHLT_skimDoubleEG_Prompt']=1
219  wmsplit['RECODR2_2018reHLT_skimJetHT_Prompt']=1
220  wmsplit['RECODR2_2018reHLT_skimMET_Prompt']=1
221  wmsplit['RECODR2_2018reHLT_skimMuOnia_Prompt']=1
222  wmsplit['RECODR2_2018reHLT_skimEGamma_Prompt_L1TEgDQM']=1
223  wmsplit['RECODR2_2018reHLT_skimMuonEG_Prompt']=1
224  wmsplit['RECODR2_2018reHLT_skimCharmonium_Prompt']=1
225  wmsplit['HLTDR2_50ns']=1
226  wmsplit['HLTDR2_25ns']=1
227  wmsplit['HLTDR2_2016']=1
228  wmsplit['HLTDR2_2017']=1
229  wmsplit['HLTDR2_2018']=1
230  wmsplit['Hadronizer']=1
231  wmsplit['DIGIUP15']=1
232  wmsplit['RECOUP15']=1
233  wmsplit['RECOAODUP15']=5
234  wmsplit['DBLMINIAODMCUP15NODQM']=5
235  wmsplit['DigiFull']=5
236  wmsplit['RecoFull']=5
237  wmsplit['DigiFullPU']=1
238  wmsplit['RecoFullPU']=1
239  wmsplit['RECOHID11']=1
240  wmsplit['DigiFullTriggerPU_2023D17PU'] = 1
241  wmsplit['RecoFullGlobalPU_2023D17PU']=1
242  wmsplit['DIGIUP17']=1
243  wmsplit['RECOUP17']=1
244  wmsplit['DIGIUP17_PU25']=1
245  wmsplit['RECOUP17_PU25']=1
246  wmsplit['DIGICOS_UP17']=1
247  wmsplit['RECOCOS_UP17']=1
248 
249 
250  #import pprint
251  #pprint.pprint(wmsplit)
252  except:
253  print "Not set up for step splitting"
254  wmsplit={}
255 
256  acqEra=False
257  for (n,dir) in directories.items():
258  chainDict=copy.deepcopy(self.defaultChain)
259  print "inspecting",dir
260  nextHasDSInput=None
261  for (x,s) in mReader.workFlowSteps.items():
262  #x has the format (num, prefix)
263  #s has the format (num, name, commands, stepList)
264  if x[0]==n:
265  #print "found",n,s[3]
266  #chainDict['RequestString']='RV'+chainDict['CMSSWVersion']+s[1].split('+')[0]
267  index=0
268  splitForThisWf=None
269  thisLabel=self.speciallabel
270  #if 'HARVESTGEN' in s[3]:
271  if len( [step for step in s[3] if "HARVESTGEN" in step] )>0:
272  chainDict['TimePerEvent']=0.01
273  thisLabel=thisLabel+"_gen"
274  # for double miniAOD test
275  if len( [step for step in s[3] if "DBLMINIAODMCUP15NODQM" in step] )>0:
276  thisLabel=thisLabel+"_dblMiniAOD"
277  processStrPrefix=''
278  setPrimaryDs=None
279  nanoedmGT=''
280  for step in s[3]:
281 
282  if 'INPUT' in step or (not isinstance(s[2][index],str)):
283  nextHasDSInput=s[2][index]
284 
285  else:
286 
287  if (index==0):
288  #first step and not input -> gen part
289  chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultScratch))
290  try:
291  chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
292  except:
293  print "Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created"
294  return -15
295 
296  chainDict['nowmTasklist'][-1]['PrimaryDataset']='RelVal'+s[1].split('+')[0]
297  if not '--relval' in s[2][index]:
298  print 'Impossible to create task from scratch without splitting information with --relval'
299  return -12
300  else:
301  arg=s[2][index].split()
302  ns=map(int,arg[arg.index('--relval')+1].split(','))
303  chainDict['nowmTasklist'][-1]['RequestNumEvents'] = ns[0]
304  chainDict['nowmTasklist'][-1]['EventsPerJob'] = ns[1]
305  if 'FASTSIM' in s[2][index] or '--fast' in s[2][index]:
306  thisLabel+='_FastSim'
307  if 'lhe' in s[2][index] in s[2][index]:
308  chainDict['nowmTasklist'][-1]['LheInputFiles'] =True
309 
310  elif nextHasDSInput:
311  chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultInput))
312  try:
313  chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
314  except:
315  print "Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created"
316  return -15
317  chainDict['nowmTasklist'][-1]['InputDataset']=nextHasDSInput.dataSet
318  if ('DQMHLTonRAWAOD' in step) :
319  chainDict['nowmTasklist'][-1]['IncludeParents']=True
320  splitForThisWf=nextHasDSInput.split
321  chainDict['nowmTasklist'][-1]['LumisPerJob']=splitForThisWf
322  if step in wmsplit:
323  chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit[step]
324  # get the run numbers or #events
325  if len(nextHasDSInput.run):
326  chainDict['nowmTasklist'][-1]['RunWhitelist']=nextHasDSInput.run
327  if len(nextHasDSInput.ls):
328  chainDict['nowmTasklist'][-1]['LumiList']=nextHasDSInput.ls
329  #print "what is s",s[2][index]
330  if '--data' in s[2][index] and nextHasDSInput.label:
331  thisLabel+='_RelVal_%s'%nextHasDSInput.label
332  if 'filter' in chainDict['nowmTasklist'][-1]['nowmIO']:
333  print "This has an input DS and a filter sequence: very likely to be the PyQuen sample"
334  processStrPrefix='PU_'
335  setPrimaryDs = 'RelVal'+s[1].split('+')[0]
336  if setPrimaryDs:
337  chainDict['nowmTasklist'][-1]['PrimaryDataset']=setPrimaryDs
338  nextHasDSInput=None
339  else:
340  #not first step and no inputDS
341  chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultTask))
342  try:
343  chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
344  except:
345  print "Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created"
346  return -15
347  if splitForThisWf:
348  chainDict['nowmTasklist'][-1]['LumisPerJob']=splitForThisWf
349  if step in wmsplit:
350  chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit[step]
351 
352  # change LumisPerJob for Hadronizer steps.
353  if 'Hadronizer' in step:
354  chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit['Hadronizer']
355 
356  #print step
357  chainDict['nowmTasklist'][-1]['TaskName']=step
358  if setPrimaryDs:
359  chainDict['nowmTasklist'][-1]['PrimaryDataset']=setPrimaryDs
360  chainDict['nowmTasklist'][-1]['ConfigCacheID']='%s/%s.py'%(dir,step)
361  chainDict['nowmTasklist'][-1]['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT'] # copy to the proper parameter name
362  chainDict['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT'] #set in general to the last one of the chain
363  if 'NANOEDM' in step :
364  nanoedmGT = chainDict['nowmTasklist'][-1]['nowmIO']['GT']
365  if 'NANOMERGE' in step :
366  chainDict['GlobalTag'] = nanoedmGT
367  if 'pileup' in chainDict['nowmTasklist'][-1]['nowmIO']:
368  chainDict['nowmTasklist'][-1]['MCPileup']=chainDict['nowmTasklist'][-1]['nowmIO']['pileup']
369  if '--pileup ' in s[2][index]: # catch --pileup (scenarion) and not --pileup_ (dataset to be mixed) => works also making PRE-MIXed dataset
370  processStrPrefix='PU_' # take care of pu overlay done with GEN-SIM mixing
371  if ( s[2][index].split()[ s[2][index].split().index('--pileup')+1 ] ).find('25ns') > 0 :
372  processStrPrefix='PU25ns_'
373  elif ( s[2][index].split()[ s[2][index].split().index('--pileup')+1 ] ).find('50ns') > 0 :
374  processStrPrefix='PU50ns_'
375  if 'premix_stage2' in s[2][index] and '--pileup_input' in s[2][index]: # take care of pu overlay done with DIGI mixing of premixed events
376  if s[2][index].split()[ s[2][index].split().index('--pileup_input')+1 ].find('25ns') > 0 :
377  processStrPrefix='PUpmx25ns_'
378  elif s[2][index].split()[ s[2][index].split().index('--pileup_input')+1 ].find('50ns') > 0 :
379  processStrPrefix='PUpmx50ns_'
380 
381  if acqEra:
382  #chainDict['AcquisitionEra'][step]=(chainDict['CMSSWVersion']+'-PU_'+chainDict['nowmTasklist'][-1]['GlobalTag']).replace('::All','')+thisLabel
383  chainDict['AcquisitionEra'][step]=chainDict['CMSSWVersion']
384  chainDict['ProcessingString'][step]=processStrPrefix+chainDict['nowmTasklist'][-1]['GlobalTag'].replace('::All','').replace('-','_')+thisLabel
385  if 'NANOMERGE' in step :
386  chainDict['ProcessingString'][step]=processStrPrefix+nanoedmGT.replace('::All','').replace('-','_')+thisLabel
387  else:
388  #chainDict['nowmTasklist'][-1]['AcquisitionEra']=(chainDict['CMSSWVersion']+'-PU_'+chainDict['nowmTasklist'][-1]['GlobalTag']).replace('::All','')+thisLabel
389  chainDict['nowmTasklist'][-1]['AcquisitionEra']=chainDict['CMSSWVersion']
390  chainDict['nowmTasklist'][-1]['ProcessingString']=processStrPrefix+chainDict['nowmTasklist'][-1]['GlobalTag'].replace('::All','').replace('-','_')+thisLabel
391  if 'NANOMERGE' in step :
392  chainDict['nowmTasklist'][-1]['ProcessingString']=processStrPrefix+nanoedmGT.replace('::All','').replace('-','_')+thisLabel
393 
394  if (self.batchName):
395  chainDict['nowmTasklist'][-1]['Campaign'] = chainDict['nowmTasklist'][-1]['AcquisitionEra']+self.batchName
396 
397  # specify different ProcessingString for double miniAOD dataset
398  if ('DBLMINIAODMCUP15NODQM' in step):
399  chainDict['nowmTasklist'][-1]['ProcessingString']=chainDict['nowmTasklist'][-1]['ProcessingString']+'_miniAOD'
400 
401  if( chainDict['nowmTasklist'][-1]['Multicore'] ):
402  # the scaling factor of 1.2GB / thread is empirical and measured on a SECOND round of tests with PU samples
403  # the number of threads is NO LONGER assumed to be the same for all tasks
404  # https://hypernews.cern.ch/HyperNews/CMS/get/edmFramework/3509/1/1/1.html
405  # now change to 1.5GB / additional thread according to discussion:
406  # https://hypernews.cern.ch/HyperNews/CMS/get/relval/4817/1/1.html
407 # chainDict['nowmTasklist'][-1]['Memory'] = 3000 + int( chainDict['nowmTasklist'][-1]['Multicore'] -1 )*1500
408  chainDict['nowmTasklist'][-1]['Memory'] = self.memoryOffset + int( chainDict['nowmTasklist'][-1]['Multicore'] -1 ) * self.memPerCore
409 
410  index+=1
411  #end of loop through steps
412  chainDict['RequestString']='RV'+chainDict['CMSSWVersion']+s[1].split('+')[0]
413  if processStrPrefix or thisLabel:
414  chainDict['RequestString']+='_'+processStrPrefix+thisLabel
415 
416 ### PrepID
417  chainDict['PrepID'] = chainDict['CMSSWVersion']+'__'+self.batchTime+'-'+s[1].split('+')[0]
418  if(self.batchName):
419  chainDict['PrepID'] = chainDict['CMSSWVersion']+self.batchName+'-'+s[1].split('+')[0]
420  if( 'HIN' in self.batchName ):
421  chainDict['SubRequestType'] = "HIRelVal"
422 
423  #wrap up for this one
424  import pprint
425  #print 'wrapping up'
426  #pprint.pprint(chainDict)
427  #loop on the task list
428  for i_second in reversed(range(len(chainDict['nowmTasklist']))):
429  t_second=chainDict['nowmTasklist'][i_second]
430  #print "t_second taskname", t_second['TaskName']
431  if 'primary' in t_second['nowmIO']:
432  #print t_second['nowmIO']['primary']
433  primary=t_second['nowmIO']['primary'][0].replace('file:','')
434  for i_input in reversed(range(0,i_second)):
435  t_input=chainDict['nowmTasklist'][i_input]
436  for (om,o) in t_input['nowmIO'].items():
437  if primary in o:
438  #print "found",primary,"procuced by",om,"of",t_input['TaskName']
439  t_second['InputTask'] = t_input['TaskName']
440  t_second['InputFromOutputModule'] = om
441  #print 't_second',pprint.pformat(t_second)
442  if t_second['TaskName'].startswith('HARVEST'):
443  chainDict.update(copy.deepcopy(self.defaultHarvest))
444  chainDict['DQMConfigCacheID']=t_second['ConfigCacheID']
445  ## the info are not in the task specific dict but in the general dict
446  #t_input.update(copy.deepcopy(self.defaultHarvest))
447  #t_input['DQMConfigCacheID']=t_second['ConfigCacheID']
448  break
449 
450  # agreed changes for wm injection:
451  # - Campaign: *optional* string during creation. It will default to AcqEra value if possible.
452  # Otherwise it will be empty.
453  # - AcquisitionEra: *mandatory* string at request level during creation. *optional* string
454  # at task level during creation. "optional" during assignment.
455  # - ProcessingString: *mandatory* string at request level during creation. *optional* string
456  # at task level during creation. "optional" during assignment.
457  # - ProcessingVersion: *optional* during creation (default 1). *optional* during assignment.
458  #
459  # Which requires following changes here:
460  # - reset Global AcuisitionEra, ProcessingString to be the one in the first task
461  # - and also Campaign to be always the same as the AcquisitionEra
462 
463  if acqEra:
464  chainDict['AcquisitionEra'] = chainDict['AcquisitionEra'].values()[0]
465  chainDict['ProcessingString'] = chainDict['ProcessingString'].values()[0]
466  else:
467  chainDict['AcquisitionEra'] = chainDict['nowmTasklist'][0]['AcquisitionEra']
468  chainDict['ProcessingString'] = chainDict['nowmTasklist'][0]['ProcessingString']
469 
470 ##### batch name appended to Campaign name
471 # chainDict['Campaign'] = chainDict['AcquisitionEra']
472  chainDict['Campaign'] = chainDict['AcquisitionEra']+self.batchName
473 
474  ## clean things up now
475  itask=0
476  if self.keep:
477  for i in self.keep:
478  if isinstance(i, int) and i < len(chainDict['nowmTasklist']):
479  chainDict['nowmTasklist'][i]['KeepOutput']=True
480  for (i,t) in enumerate(chainDict['nowmTasklist']):
481  if t['TaskName'].startswith('HARVEST'):
482  continue
483  if not self.keep:
484  t['KeepOutput']=True
485  elif t['TaskName'] in self.keep:
486  t['KeepOutput']=True
487  t.pop('nowmIO')
488  itask+=1
489  chainDict['Task%d'%(itask)]=t
490 
491 
492  ##
493 
494 
495  ## provide the number of tasks
496  chainDict['TaskChain']=itask#len(chainDict['nowmTasklist'])
497 
498  chainDict.pop('nowmTasklist')
499  self.chainDicts[n]=chainDict
500 
501 
502  return 0
503 
504  def uploadConf(self,filePath,label,where):
505  labelInCouch=self.label+'_'+label
506  cacheName=filePath.split('/')[-1]
507  if self.testMode:
508  self.count+=1
509  print '\tFake upload of',filePath,'to couch with label',labelInCouch
510  return self.count
511  else:
512  try:
513  from modules.wma import upload_to_couch,DATABASE_NAME
514  except:
515  print '\n\tUnable to find wmcontrol modules. Please include it in your python path\n'
516  print '\n\t QUIT\n'
517  sys.exit(-16)
518 
519  if cacheName in self.couchCache:
520  print "Not re-uploading",filePath,"to",where,"for",label
521  cacheId=self.couchCache[cacheName]
522  else:
523  print "Loading",filePath,"to",where,"for",label
524  ## totally fork the upload to couch to prevent cross loading of process configurations
525  pool = multiprocessing.Pool(1)
526  cacheIds = pool.map( upload_to_couch_oneArg, [(filePath,labelInCouch,self.user,self.group,where)] )
527  cacheId = cacheIds[0]
528  self.couchCache[cacheName]=cacheId
529  return cacheId
530 
531  def upload(self):
532  for (n,d) in self.chainDicts.items():
533  for it in d:
534  if it.startswith("Task") and it!='TaskChain':
535  #upload
536  couchID=self.uploadConf(d[it]['ConfigCacheID'],
537  str(n)+d[it]['TaskName'],
538  d['ConfigCacheUrl']
539  )
540  print d[it]['ConfigCacheID']," uploaded to couchDB for",str(n),"with ID",couchID
541  d[it]['ConfigCacheID']=couchID
542  if it =='DQMConfigCacheID':
543  couchID=self.uploadConf(d['DQMConfigCacheID'],
544  str(n)+'harvesting',
545  d['ConfigCacheUrl']
546  )
547  print d['DQMConfigCacheID'],"uploaded to couchDB for",str(n),"with ID",couchID
548  d['DQMConfigCacheID']=couchID
549 
550 
551  def submit(self):
552  try:
553  from modules.wma import makeRequest,approveRequest
554  from wmcontrol import random_sleep
555  print '\n\tFound wmcontrol\n'
556  except:
557  print '\n\tUnable to find wmcontrol modules. Please include it in your python path\n'
558  if not self.testMode:
559  print '\n\t QUIT\n'
560  sys.exit(-17)
561 
562  import pprint
563  for (n,d) in self.chainDicts.items():
564  if self.testMode:
565  print "Only viewing request",n
566  print pprint.pprint(d)
567  else:
568  #submit to wmagent each dict
569  print "For eyes before submitting",n
570  print pprint.pprint(d)
571  print "Submitting",n,"..........."
572  workFlow=makeRequest(self.wmagent,d,encodeDict=True)
573  print "...........",n,"submitted"
574  random_sleep()
575 
576 
577 
def prepare(self, mReader, directories, mode='init')
def performInjectionOptionTest(opt)
def uploadConf(self, filePath, label, where)
the info are not in the task specific dict but in the general dict t_input.update(copy.deepcopy(self.defaultHarvest)) t_input[&#39;DQMConfigCacheID&#39;]=t_second[&#39;ConfigCacheID&#39;]
def replace(string, replacements)
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:20
def __init__(self, opt, mode='init', options='')
#define str(s)
double split
Definition: MVATrainer.cc:139
def upload_to_couch_oneArg(arguments)