CMS 3D CMS Logo

MatrixInjector.py
Go to the documentation of this file.
1 from __future__ import print_function
2 import sys
3 import json
4 import os
5 import copy
6 import multiprocessing
7 import time
8 
10  if opt.show:
11  print('Not injecting to wmagent in --show mode. Need to run the worklfows.')
12  sys.exit(-1)
13  if opt.wmcontrol=='init':
14  #init means it'll be in test mode
15  opt.nProcs=0
16  if opt.wmcontrol=='test':
17  #means the wf were created already, and we just dryRun it.
18  opt.dryRun=True
19  if opt.wmcontrol=='submit' and opt.nProcs==0:
20  print('Not injecting to wmagent in -j 0 mode. Need to run the worklfows.')
21  sys.exit(-1)
22  if opt.wmcontrol=='force':
23  print("This is an expert setting, you'd better know what you're doing")
24  opt.dryRun=True
25 
26 def upload_to_couch_oneArg(arguments):
27  from modules.wma import upload_to_couch
28  (filePath,labelInCouch,user,group,where) = arguments
29  cacheId=upload_to_couch(filePath,
30  labelInCouch,
31  user,
32  group,
33  test_mode=False,
34  url=where)
35  return cacheId
36 
37 
39 
40  def __init__(self,opt,mode='init',options=''):
41  self.count=1040
42 
43  self.dqmgui=None
44  self.wmagent=None
45  for k in options.split(','):
46  if k.startswith('dqm:'):
47  self.dqmgui=k.split(':',1)[-1]
48  elif k.startswith('wma:'):
49  self.wmagent=k.split(':',1)[-1]
50 
51  self.testMode=((mode!='submit') and (mode!='force'))
52  self.version =1
53  self.keep = opt.keep
54  self.memoryOffset = opt.memoryOffset
55  self.memPerCore = opt.memPerCore
56  self.batchName = ''
57  self.batchTime = str(int(time.time()))
58  if(opt.batchName):
59  self.batchName = '__'+opt.batchName+'-'+self.batchTime
60 
61  # WMagent url
62  if not self.wmagent:
63  # Overwrite with env variable
64  self.wmagent = os.getenv('WMAGENT_REQMGR')
65 
66  if not self.wmagent:
67  # Default values
68  if not opt.testbed:
69  self.wmagent = 'cmsweb.cern.ch'
70  else:
71  self.wmagent = 'cmsweb-testbed.cern.ch'
72 
73  # DBSReader url
74  if opt.dbsUrl is not None:
75  self.DbsUrl = opt.dbsUrl
76  elif os.getenv('CMS_DBSREADER_URL') is not None:
77  self.DbsUrl = os.getenv('CMS_DBSREADER_URL')
78  else:
79  # Default values
80  if not opt.testbed:
81  self.DbsUrl = "https://cmsweb-prod.cern.ch/dbs/prod/global/DBSReader"
82  else:
83  self.DbsUrl = "https://cmsweb-testbed.cern.ch/dbs/int/global/DBSReader"
84 
85  if not self.dqmgui:
86  self.dqmgui="https://cmsweb.cern.ch/dqm/relval"
87  #couch stuff
88  self.couch = 'https://'+self.wmagent+'/couchdb'
89 # self.couchDB = 'reqmgr_config_cache'
90  self.couchCache={} # so that we do not upload like crazy, and recyle cfgs
91  self.user = os.getenv('USER')
92  self.group = 'ppd'
93  self.label = 'RelValSet_'+os.getenv('CMSSW_VERSION').replace('-','')+'_v'+str(self.version)
94  self.speciallabel=''
95  if opt.label:
96  self.speciallabel= '_'+opt.label
97 
98 
99  if not os.getenv('WMCORE_ROOT'):
100  print('\n\twmclient is not setup properly. Will not be able to upload or submit requests.\n')
101  if not self.testMode:
102  print('\n\t QUIT\n')
103  sys.exit(-18)
104  else:
105  print('\n\tFound wmclient\n')
106 
108  "RequestType" : "TaskChain", #this is how we handle relvals
109  "SubRequestType" : "RelVal", #this is how we handle relvals, now that TaskChain is also used for central MC production
110  "RequestPriority": 500000,
111  "Requestor": self.user, #Person responsible
112  "Group": self.group, #group for the request
113  "CMSSWVersion": os.getenv('CMSSW_VERSION'), #CMSSW Version (used for all tasks in chain)
114  "Campaign": os.getenv('CMSSW_VERSION'), # = AcquisitionEra, will be reset later to the one of first task, will both be the CMSSW_VERSION
115  "ScramArch": os.getenv('SCRAM_ARCH'), #Scram Arch (used for all tasks in chain)
116  "ProcessingVersion": self.version, #Processing Version (used for all tasks in chain)
117  "GlobalTag": None, #Global Tag (overridden per task)
118  "ConfigCacheUrl": self.couch, #URL of CouchDB containing Config Cache
119  "DbsUrl": self.DbsUrl,
120  #- Will contain all configs for all Tasks
121  #"SiteWhitelist" : ["T2_CH_CERN", "T1_US_FNAL"], #Site whitelist
122  "TaskChain" : None, #Define number of tasks in chain.
123  "nowmTasklist" : [], #a list of tasks as we put them in
124  "Multicore" : 1, # do not set multicore for the whole chain
125  "Memory" : 3000,
126  "SizePerEvent" : 1234,
127  "TimePerEvent" : 10,
128  "PrepID": os.getenv('CMSSW_VERSION')
129  }
130 
132  "EnableHarvesting" : "True",
133  "DQMUploadUrl" : self.dqmgui,
134  "DQMConfigCacheID" : None,
135  "Multicore" : 1 # hardcode Multicore to be 1 for Harvest
136  }
137 
139  "TaskName" : None, #Task Name
140  "ConfigCacheID" : None, #Generator Config id
141  "GlobalTag": None,
142  "SplittingAlgo" : "EventBased", #Splitting Algorithm
143  "EventsPerJob" : None, #Size of jobs in terms of splitting algorithm
144  "RequestNumEvents" : None, #Total number of events to generate
145  "Seeding" : "AutomaticSeeding", #Random seeding method
146  "PrimaryDataset" : None, #Primary Dataset to be created
147  "nowmIO": {},
148  "Multicore" : opt.nThreads, # this is the per-taskchain Multicore; it's the default assigned to a task if it has no value specified
149  "KeepOutput" : False
150  }
152  "TaskName" : "DigiHLT", #Task Name
153  "ConfigCacheID" : None, #Processing Config id
154  "GlobalTag": None,
155  "InputDataset" : None, #Input Dataset to be processed
156  "SplittingAlgo" : "LumiBased", #Splitting Algorithm
157  "LumisPerJob" : 10, #Size of jobs in terms of splitting algorithm
158  "nowmIO": {},
159  "Multicore" : opt.nThreads, # this is the per-taskchain Multicore; it's the default assigned to a task if it has no value specified
160  "KeepOutput" : False
161  }
162  self.defaultTask={
163  "TaskName" : None, #Task Name
164  "InputTask" : None, #Input Task Name (Task Name field of a previous Task entry)
165  "InputFromOutputModule" : None, #OutputModule name in the input task that will provide files to process
166  "ConfigCacheID" : None, #Processing Config id
167  "GlobalTag": None,
168  "SplittingAlgo" : "LumiBased", #Splitting Algorithm
169  "LumisPerJob" : 10, #Size of jobs in terms of splitting algorithm
170  "nowmIO": {},
171  "Multicore" : opt.nThreads, # this is the per-taskchain Multicore; it's the default assigned to a task if it has no value specified
172  "KeepOutput" : False
173  }
174 
175  self.chainDicts={}
176 
177  @staticmethod
178  def get_wmsplit():
179  """
180  Return a "wmsplit" dictionary that contain non-default LumisPerJob values
181  """
182  wmsplit = {}
183  try:
184  #from Configuration.PyReleaseValidation.relval_steps import wmsplit
185  wmsplit = {}
186  wmsplit['DIGIHI']=5
187  wmsplit['RECOHI']=5
188  wmsplit['HLTD']=5
189  wmsplit['RECODreHLT']=2
190  wmsplit['DIGIPU']=4
191  wmsplit['DIGIPU1']=4
192  wmsplit['RECOPU1']=1
193  wmsplit['DIGIUP15_PU50']=1
194  wmsplit['RECOUP15_PU50']=1
195  wmsplit['DIGIUP15_PU25']=1
196  wmsplit['RECOUP15_PU25']=1
197  wmsplit['DIGIUP15_PU25HS']=1
198  wmsplit['RECOUP15_PU25HS']=1
199  wmsplit['DIGIHIMIX']=5
200  wmsplit['RECOHIMIX']=5
201  wmsplit['RECODSplit']=1
202  wmsplit['SingleMuPt10_UP15_ID']=1
203  wmsplit['DIGIUP15_ID']=1
204  wmsplit['RECOUP15_ID']=1
205  wmsplit['TTbar_13_ID']=1
206  wmsplit['SingleMuPt10FS_ID']=1
207  wmsplit['TTbarFS_ID']=1
208  wmsplit['RECODR2_50nsreHLT']=5
209  wmsplit['RECODR2_25nsreHLT']=5
210  wmsplit['RECODR2_2016reHLT']=5
211  wmsplit['RECODR2_50nsreHLT_HIPM']=5
212  wmsplit['RECODR2_25nsreHLT_HIPM']=5
213  wmsplit['RECODR2_2016reHLT_HIPM']=1
214  wmsplit['RECODR2_2016reHLT_skimSingleMu']=1
215  wmsplit['RECODR2_2016reHLT_skimDoubleEG']=1
216  wmsplit['RECODR2_2016reHLT_skimMuonEG']=1
217  wmsplit['RECODR2_2016reHLT_skimJetHT']=1
218  wmsplit['RECODR2_2016reHLT_skimMET']=1
219  wmsplit['RECODR2_2016reHLT_skimSinglePh']=1
220  wmsplit['RECODR2_2016reHLT_skimMuOnia']=1
221  wmsplit['RECODR2_2016reHLT_skimSingleMu_HIPM']=1
222  wmsplit['RECODR2_2016reHLT_skimDoubleEG_HIPM']=1
223  wmsplit['RECODR2_2016reHLT_skimMuonEG_HIPM']=1
224  wmsplit['RECODR2_2016reHLT_skimJetHT_HIPM']=1
225  wmsplit['RECODR2_2016reHLT_skimMET_HIPM']=1
226  wmsplit['RECODR2_2016reHLT_skimSinglePh_HIPM']=1
227  wmsplit['RECODR2_2016reHLT_skimMuOnia_HIPM']=1
228  wmsplit['RECODR2_2017reHLT_Prompt']=1
229  wmsplit['RECODR2_2017reHLT_skimSingleMu_Prompt_Lumi']=1
230  wmsplit['RECODR2_2017reHLT_skimDoubleEG_Prompt']=1
231  wmsplit['RECODR2_2017reHLT_skimMET_Prompt']=1
232  wmsplit['RECODR2_2017reHLT_skimMuOnia_Prompt']=1
233  wmsplit['RECODR2_2017reHLT_Prompt_L1TEgDQM']=1
234  wmsplit['RECODR2_2018reHLT_Prompt']=1
235  wmsplit['RECODR2_2018reHLT_skimSingleMu_Prompt_Lumi']=1
236  wmsplit['RECODR2_2018reHLT_skimDoubleEG_Prompt']=1
237  wmsplit['RECODR2_2018reHLT_skimJetHT_Prompt']=1
238  wmsplit['RECODR2_2018reHLT_skimMET_Prompt']=1
239  wmsplit['RECODR2_2018reHLT_skimMuOnia_Prompt']=1
240  wmsplit['RECODR2_2018reHLT_skimEGamma_Prompt_L1TEgDQM']=1
241  wmsplit['RECODR2_2018reHLT_skimMuonEG_Prompt']=1
242  wmsplit['RECODR2_2018reHLT_skimCharmonium_Prompt']=1
243  wmsplit['RECODR2_2018reHLT_skimJetHT_Prompt_HEfail']=1
244  wmsplit['RECODR2_2018reHLT_skimJetHT_Prompt_BadHcalMitig']=1
245  wmsplit['RECODR2_2018reHLTAlCaTkCosmics_Prompt']=1
246  wmsplit['RECODR2_2018reHLT_skimDisplacedJet_Prompt']=1
247  wmsplit['RECODR2_2018reHLT_ZBPrompt']=1
248  wmsplit['RECODR2_2018reHLT_Offline']=1
249  wmsplit['RECODR2_2018reHLT_skimSingleMu_Offline_Lumi']=1
250  wmsplit['RECODR2_2018reHLT_skimDoubleEG_Offline']=1
251  wmsplit['RECODR2_2018reHLT_skimJetHT_Offline']=1
252  wmsplit['RECODR2_2018reHLT_skimMET_Offline']=1
253  wmsplit['RECODR2_2018reHLT_skimMuOnia_Offline']=1
254  wmsplit['RECODR2_2018reHLT_skimEGamma_Offline_L1TEgDQM']=1
255  wmsplit['RECODR2_2018reHLT_skimMuonEG_Offline']=1
256  wmsplit['RECODR2_2018reHLT_skimCharmonium_Offline']=1
257  wmsplit['RECODR2_2018reHLT_skimJetHT_Offline_HEfail']=1
258  wmsplit['RECODR2_2018reHLT_skimJetHT_Offline_BadHcalMitig']=1
259  wmsplit['RECODR2_2018reHLTAlCaTkCosmics_Offline']=1
260  wmsplit['RECODR2_2018reHLT_skimDisplacedJet_Offline']=1
261  wmsplit['RECODR2_2018reHLT_ZBOffline']=1
262  wmsplit['HLTDR2_50ns']=1
263  wmsplit['HLTDR2_25ns']=1
264  wmsplit['HLTDR2_2016']=1
265  wmsplit['HLTDR2_2017']=1
266  wmsplit['HLTDR2_2018']=1
267  wmsplit['HLTDR2_2018_BadHcalMitig']=1
268  wmsplit['Hadronizer']=1
269  wmsplit['DIGIUP15']=1
270  wmsplit['RECOUP15']=1
271  wmsplit['RECOAODUP15']=5
272  wmsplit['DBLMINIAODMCUP15NODQM']=5
273  wmsplit['DigiFull']=5
274  wmsplit['RecoFull']=5
275  wmsplit['DigiFullPU']=1
276  wmsplit['RecoFullPU']=1
277  wmsplit['RECOHID11']=1
278  wmsplit['DigiFullTriggerPU_2023D17PU'] = 1
279  wmsplit['RecoFullGlobalPU_2023D17PU']=1
280  wmsplit['DIGIUP17']=1
281  wmsplit['RECOUP17']=1
282  wmsplit['DIGIUP17_PU25']=1
283  wmsplit['RECOUP17_PU25']=1
284  wmsplit['DIGICOS_UP16']=1
285  wmsplit['RECOCOS_UP16']=1
286  wmsplit['DIGICOS_UP17']=1
287  wmsplit['RECOCOS_UP17']=1
288  wmsplit['DIGICOS_UP18']=1
289  wmsplit['RECOCOS_UP18']=1
290  wmsplit['HYBRIDRepackHI2015VR']=1
291  wmsplit['HYBRIDZSHI2015']=1
292  wmsplit['RECOHID15']=1
293  wmsplit['RECOHID18']=1
294  except Exception as ex:
295  print('Exception while building a wmsplit dictionary: %s' % (str(ex)))
296  return {}
297 
298  return wmsplit
299 
300  def prepare(self,mReader, directories, mode='init'):
301  wmsplit = MatrixInjector.get_wmsplit()
302  acqEra=False
303  for (n,dir) in directories.items():
304  chainDict=copy.deepcopy(self.defaultChain)
305  print("inspecting",dir)
306  nextHasDSInput=None
307  for (x,s) in mReader.workFlowSteps.items():
308  #x has the format (num, prefix)
309  #s has the format (num, name, commands, stepList)
310  if x[0]==n:
311  #print "found",n,s[3]
312  #chainDict['RequestString']='RV'+chainDict['CMSSWVersion']+s[1].split('+')[0]
313  index=0
314  splitForThisWf=None
315  thisLabel=self.speciallabel
316  #if 'HARVESTGEN' in s[3]:
317  if len( [step for step in s[3] if "HARVESTGEN" in step] )>0:
318  chainDict['TimePerEvent']=0.01
319  thisLabel=thisLabel+"_gen"
320  # for double miniAOD test
321  if len( [step for step in s[3] if "DBLMINIAODMCUP15NODQM" in step] )>0:
322  thisLabel=thisLabel+"_dblMiniAOD"
323  processStrPrefix=''
324  setPrimaryDs=None
325  nanoedmGT=''
326  for step in s[3]:
327 
328  if 'INPUT' in step or (not isinstance(s[2][index],str)):
329  nextHasDSInput=s[2][index]
330 
331  else:
332 
333  if (index==0):
334  #first step and not input -> gen part
335  chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultScratch))
336  try:
337  chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
338  except:
339  print("Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created")
340  return -15
341 
342  chainDict['nowmTasklist'][-1]['PrimaryDataset']='RelVal'+s[1].split('+')[0]
343  if not '--relval' in s[2][index]:
344  print('Impossible to create task from scratch without splitting information with --relval')
345  return -12
346  else:
347  arg=s[2][index].split()
348  ns=map(int,arg[arg.index('--relval')+1].split(','))
349  chainDict['nowmTasklist'][-1]['RequestNumEvents'] = ns[0]
350  chainDict['nowmTasklist'][-1]['EventsPerJob'] = ns[1]
351  if 'FASTSIM' in s[2][index] or '--fast' in s[2][index]:
352  thisLabel+='_FastSim'
353  if 'lhe' in s[2][index] in s[2][index]:
354  chainDict['nowmTasklist'][-1]['LheInputFiles'] =True
355 
356  elif nextHasDSInput:
357  chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultInput))
358  try:
359  chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
360  except:
361  print("Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created")
362  return -15
363  chainDict['nowmTasklist'][-1]['InputDataset']=nextHasDSInput.dataSet
364  if ('DQMHLTonRAWAOD' in step) :
365  chainDict['nowmTasklist'][-1]['IncludeParents']=True
366  splitForThisWf=nextHasDSInput.split
367  chainDict['nowmTasklist'][-1]['LumisPerJob']=splitForThisWf
368  if step in wmsplit:
369  chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit[step]
370  # get the run numbers or #events
371  if len(nextHasDSInput.run):
372  chainDict['nowmTasklist'][-1]['RunWhitelist']=nextHasDSInput.run
373  if len(nextHasDSInput.ls):
374  chainDict['nowmTasklist'][-1]['LumiList']=nextHasDSInput.ls
375  #print "what is s",s[2][index]
376  if '--data' in s[2][index] and nextHasDSInput.label:
377  thisLabel+='_RelVal_%s'%nextHasDSInput.label
378  if 'filter' in chainDict['nowmTasklist'][-1]['nowmIO']:
379  print("This has an input DS and a filter sequence: very likely to be the PyQuen sample")
380  processStrPrefix='PU_'
381  setPrimaryDs = 'RelVal'+s[1].split('+')[0]
382  if setPrimaryDs:
383  chainDict['nowmTasklist'][-1]['PrimaryDataset']=setPrimaryDs
384  nextHasDSInput=None
385  else:
386  #not first step and no inputDS
387  chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultTask))
388  try:
389  chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
390  except:
391  print("Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created")
392  return -15
393  if splitForThisWf:
394  chainDict['nowmTasklist'][-1]['LumisPerJob']=splitForThisWf
395  if step in wmsplit:
396  chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit[step]
397 
398  # change LumisPerJob for Hadronizer steps.
399  if 'Hadronizer' in step:
400  chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit['Hadronizer']
401 
402  #print step
403  chainDict['nowmTasklist'][-1]['TaskName']=step
404  if setPrimaryDs:
405  chainDict['nowmTasklist'][-1]['PrimaryDataset']=setPrimaryDs
406  chainDict['nowmTasklist'][-1]['ConfigCacheID']='%s/%s.py'%(dir,step)
407  chainDict['nowmTasklist'][-1]['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT'] # copy to the proper parameter name
408  chainDict['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT'] #set in general to the last one of the chain
409  if 'NANOEDM' in step :
410  nanoedmGT = chainDict['nowmTasklist'][-1]['nowmIO']['GT']
411  if 'NANOMERGE' in step :
412  chainDict['GlobalTag'] = nanoedmGT
413  if 'pileup' in chainDict['nowmTasklist'][-1]['nowmIO']:
414  chainDict['nowmTasklist'][-1]['MCPileup']=chainDict['nowmTasklist'][-1]['nowmIO']['pileup']
415  if '--pileup ' in s[2][index]: # catch --pileup (scenarion) and not --pileup_ (dataset to be mixed) => works also making PRE-MIXed dataset
416  processStrPrefix='PU_' # take care of pu overlay done with GEN-SIM mixing
417  if ( s[2][index].split()[ s[2][index].split().index('--pileup')+1 ] ).find('25ns') > 0 :
418  processStrPrefix='PU25ns_'
419  elif ( s[2][index].split()[ s[2][index].split().index('--pileup')+1 ] ).find('50ns') > 0 :
420  processStrPrefix='PU50ns_'
421  if 'premix_stage2' in s[2][index] and '--pileup_input' in s[2][index]: # take care of pu overlay done with DIGI mixing of premixed events
422  if s[2][index].split()[ s[2][index].split().index('--pileup_input')+1 ].find('25ns') > 0 :
423  processStrPrefix='PUpmx25ns_'
424  elif s[2][index].split()[ s[2][index].split().index('--pileup_input')+1 ].find('50ns') > 0 :
425  processStrPrefix='PUpmx50ns_'
426 
427  if acqEra:
428  #chainDict['AcquisitionEra'][step]=(chainDict['CMSSWVersion']+'-PU_'+chainDict['nowmTasklist'][-1]['GlobalTag']).replace('::All','')+thisLabel
429  chainDict['AcquisitionEra'][step]=chainDict['CMSSWVersion']
430  chainDict['ProcessingString'][step]=processStrPrefix+chainDict['nowmTasklist'][-1]['GlobalTag'].replace('::All','').replace('-','_')+thisLabel
431  if 'NANOMERGE' in step :
432  chainDict['ProcessingString'][step]=processStrPrefix+nanoedmGT.replace('::All','').replace('-','_')+thisLabel
433  else:
434  #chainDict['nowmTasklist'][-1]['AcquisitionEra']=(chainDict['CMSSWVersion']+'-PU_'+chainDict['nowmTasklist'][-1]['GlobalTag']).replace('::All','')+thisLabel
435  chainDict['nowmTasklist'][-1]['AcquisitionEra']=chainDict['CMSSWVersion']
436  chainDict['nowmTasklist'][-1]['ProcessingString']=processStrPrefix+chainDict['nowmTasklist'][-1]['GlobalTag'].replace('::All','').replace('-','_')+thisLabel
437  if 'NANOMERGE' in step :
438  chainDict['nowmTasklist'][-1]['ProcessingString']=processStrPrefix+nanoedmGT.replace('::All','').replace('-','_')+thisLabel
439 
440  if (self.batchName):
441  chainDict['nowmTasklist'][-1]['Campaign'] = chainDict['nowmTasklist'][-1]['AcquisitionEra']+self.batchName
442 
443  # specify different ProcessingString for double miniAOD dataset
444  if ('DBLMINIAODMCUP15NODQM' in step):
445  chainDict['nowmTasklist'][-1]['ProcessingString']=chainDict['nowmTasklist'][-1]['ProcessingString']+'_miniAOD'
446 
447  if( chainDict['nowmTasklist'][-1]['Multicore'] ):
448  # the scaling factor of 1.2GB / thread is empirical and measured on a SECOND round of tests with PU samples
449  # the number of threads is NO LONGER assumed to be the same for all tasks
450  # https://hypernews.cern.ch/HyperNews/CMS/get/edmFramework/3509/1/1/1.html
451  # now change to 1.5GB / additional thread according to discussion:
452  # https://hypernews.cern.ch/HyperNews/CMS/get/relval/4817/1/1.html
453 # chainDict['nowmTasklist'][-1]['Memory'] = 3000 + int( chainDict['nowmTasklist'][-1]['Multicore'] -1 )*1500
454  chainDict['nowmTasklist'][-1]['Memory'] = self.memoryOffset + int( chainDict['nowmTasklist'][-1]['Multicore'] -1 ) * self.memPerCore
455 
456  index+=1
457  #end of loop through steps
458  chainDict['RequestString']='RV'+chainDict['CMSSWVersion']+s[1].split('+')[0]
459  if processStrPrefix or thisLabel:
460  chainDict['RequestString']+='_'+processStrPrefix+thisLabel
461 
462 ### PrepID
463  chainDict['PrepID'] = chainDict['CMSSWVersion']+'__'+self.batchTime+'-'+s[1].split('+')[0]
464  if(self.batchName):
465  chainDict['PrepID'] = chainDict['CMSSWVersion']+self.batchName+'-'+s[1].split('+')[0]
466  if( 'HIN' in self.batchName ):
467  chainDict['SubRequestType'] = "HIRelVal"
468 
469  #wrap up for this one
470  import pprint
471  #print 'wrapping up'
472  #pprint.pprint(chainDict)
473  #loop on the task list
474  for i_second in reversed(range(len(chainDict['nowmTasklist']))):
475  t_second=chainDict['nowmTasklist'][i_second]
476  #print "t_second taskname", t_second['TaskName']
477  if 'primary' in t_second['nowmIO']:
478  #print t_second['nowmIO']['primary']
479  primary=t_second['nowmIO']['primary'][0].replace('file:','')
480  for i_input in reversed(range(0,i_second)):
481  t_input=chainDict['nowmTasklist'][i_input]
482  for (om,o) in t_input['nowmIO'].items():
483  if primary in o:
484  #print "found",primary,"procuced by",om,"of",t_input['TaskName']
485  t_second['InputTask'] = t_input['TaskName']
486  t_second['InputFromOutputModule'] = om
487  #print 't_second',pprint.pformat(t_second)
488  if t_second['TaskName'].startswith('HARVEST'):
489  chainDict.update(copy.deepcopy(self.defaultHarvest))
490  chainDict['DQMConfigCacheID']=t_second['ConfigCacheID']
491  ## the info are not in the task specific dict but in the general dict
492  #t_input.update(copy.deepcopy(self.defaultHarvest))
493  #t_input['DQMConfigCacheID']=t_second['ConfigCacheID']
494  break
495 
496  # agreed changes for wm injection:
497  # - Campaign: *optional* string during creation. It will default to AcqEra value if possible.
498  # Otherwise it will be empty.
499  # - AcquisitionEra: *mandatory* string at request level during creation. *optional* string
500  # at task level during creation. "optional" during assignment.
501  # - ProcessingString: *mandatory* string at request level during creation. *optional* string
502  # at task level during creation. "optional" during assignment.
503  # - ProcessingVersion: *optional* during creation (default 1). *optional* during assignment.
504  #
505  # Which requires following changes here:
506  # - reset Global AcuisitionEra, ProcessingString to be the one in the first task
507  # - and also Campaign to be always the same as the AcquisitionEra
508 
509  if acqEra:
510  chainDict['AcquisitionEra'] = chainDict['AcquisitionEra'].values()[0]
511  chainDict['ProcessingString'] = chainDict['ProcessingString'].values()[0]
512  else:
513  chainDict['AcquisitionEra'] = chainDict['nowmTasklist'][0]['AcquisitionEra']
514  chainDict['ProcessingString'] = chainDict['nowmTasklist'][0]['ProcessingString']
515 
516 ##### batch name appended to Campaign name
517 # chainDict['Campaign'] = chainDict['AcquisitionEra']
518  chainDict['Campaign'] = chainDict['AcquisitionEra']+self.batchName
519 
520  ## clean things up now
521  itask=0
522  if self.keep:
523  for i in self.keep:
524  if isinstance(i, int) and i < len(chainDict['nowmTasklist']):
525  chainDict['nowmTasklist'][i]['KeepOutput']=True
526  for (i,t) in enumerate(chainDict['nowmTasklist']):
527  if t['TaskName'].startswith('HARVEST'):
528  continue
529  if not self.keep:
530  t['KeepOutput']=True
531  elif t['TaskName'] in self.keep:
532  t['KeepOutput']=True
533  if t['TaskName'].startswith('HYBRIDRepackHI2015VR'):
534  t['KeepOutput']=False
535  t.pop('nowmIO')
536  itask+=1
537  chainDict['Task%d'%(itask)]=t
538 
539 
540  ##
541 
542 
543  ## provide the number of tasks
544  chainDict['TaskChain']=itask#len(chainDict['nowmTasklist'])
545 
546  chainDict.pop('nowmTasklist')
547  self.chainDicts[n]=chainDict
548 
549 
550  return 0
551 
552  def uploadConf(self,filePath,label,where):
553  labelInCouch=self.label+'_'+label
554  cacheName=filePath.split('/')[-1]
555  if self.testMode:
556  self.count+=1
557  print('\tFake upload of',filePath,'to couch with label',labelInCouch)
558  return self.count
559  else:
560  try:
561  from modules.wma import upload_to_couch,DATABASE_NAME
562  except:
563  print('\n\tUnable to find wmcontrol modules. Please include it in your python path\n')
564  print('\n\t QUIT\n')
565  sys.exit(-16)
566 
567  if cacheName in self.couchCache:
568  print("Not re-uploading",filePath,"to",where,"for",label)
569  cacheId=self.couchCache[cacheName]
570  else:
571  print("Loading",filePath,"to",where,"for",label)
572  ## totally fork the upload to couch to prevent cross loading of process configurations
573  pool = multiprocessing.Pool(1)
574  cacheIds = pool.map( upload_to_couch_oneArg, [(filePath,labelInCouch,self.user,self.group,where)] )
575  cacheId = cacheIds[0]
576  self.couchCache[cacheName]=cacheId
577  return cacheId
578 
579  def upload(self):
580  for (n,d) in self.chainDicts.items():
581  for it in d:
582  if it.startswith("Task") and it!='TaskChain':
583  #upload
584  couchID=self.uploadConf(d[it]['ConfigCacheID'],
585  str(n)+d[it]['TaskName'],
586  d['ConfigCacheUrl']
587  )
588  print(d[it]['ConfigCacheID']," uploaded to couchDB for",str(n),"with ID",couchID)
589  d[it]['ConfigCacheID']=couchID
590  if it =='DQMConfigCacheID':
591  couchID=self.uploadConf(d['DQMConfigCacheID'],
592  str(n)+'harvesting',
593  d['ConfigCacheUrl']
594  )
595  print(d['DQMConfigCacheID'],"uploaded to couchDB for",str(n),"with ID",couchID)
596  d['DQMConfigCacheID']=couchID
597 
598 
599  def submit(self):
600  try:
601  from modules.wma import makeRequest,approveRequest
602  from wmcontrol import random_sleep
603  print('\n\tFound wmcontrol\n')
604  except:
605  print('\n\tUnable to find wmcontrol modules. Please include it in your python path\n')
606  if not self.testMode:
607  print('\n\t QUIT\n')
608  sys.exit(-17)
609 
610  import pprint
611  for (n,d) in self.chainDicts.items():
612  if self.testMode:
613  print("Only viewing request",n)
614  print(pprint.pprint(d))
615  else:
616  #submit to wmagent each dict
617  print("For eyes before submitting",n)
618  print(pprint.pprint(d))
619  print("Submitting",n,"...........")
620  workFlow=makeRequest(self.wmagent,d,encodeDict=True)
621  print("...........",n,"submitted")
622  random_sleep()
623 
624 
625 
def prepare(self, mReader, directories, mode='init')
def performInjectionOptionTest(opt)
def uploadConf(self, filePath, label, where)
the info are not in the task specific dict but in the general dict t_input.update(copy.deepcopy(self.defaultHarvest)) t_input[&#39;DQMConfigCacheID&#39;]=t_second[&#39;ConfigCacheID&#39;]
def replace(string, replacements)
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:20
def __init__(self, opt, mode='init', options='')
#define str(s)
double split
Definition: MVATrainer.cc:139
def upload_to_couch_oneArg(arguments)