CMS 3D CMS Logo

MatrixInjector.py
Go to the documentation of this file.
1 from __future__ import print_function
2 import sys
3 import json
4 import os
5 import copy
6 import multiprocessing
7 import time
8 import re
9 
10 MAXWORKFLOWLENGTH = 81
11 
13  if opt.show:
14  print('Not injecting to wmagent in --show mode. Need to run the worklfows.')
15  sys.exit(-1)
16  if opt.wmcontrol=='init':
17  #init means it'll be in test mode
18  opt.nProcs=0
19  if opt.wmcontrol=='test':
20  #means the wf were created already, and we just dryRun it.
21  opt.dryRun=True
22  if opt.wmcontrol=='submit' and opt.nProcs==0:
23  print('Not injecting to wmagent in -j 0 mode. Need to run the worklfows.')
24  sys.exit(-1)
25  if opt.wmcontrol=='force':
26  print("This is an expert setting, you'd better know what you're doing")
27  opt.dryRun=True
28 
29 def upload_to_couch_oneArg(arguments):
30  from modules.wma import upload_to_couch
31  (filePath,labelInCouch,user,group,where) = arguments
32  cacheId=upload_to_couch(filePath,
33  labelInCouch,
34  user,
35  group,
36  test_mode=False,
37  url=where)
38  return cacheId
39 
40 
42 
43  def __init__(self,opt,mode='init',options=''):
44  self.count=1040
45 
46  self.dqmgui=None
47  self.wmagent=None
48  for k in options.split(','):
49  if k.startswith('dqm:'):
50  self.dqmgui=k.split(':',1)[-1]
51  elif k.startswith('wma:'):
52  self.wmagent=k.split(':',1)[-1]
53 
54  self.testMode=((mode!='submit') and (mode!='force'))
55  self.version =1
56  self.keep = opt.keep
57  self.memoryOffset = opt.memoryOffset
58  self.memPerCore = opt.memPerCore
59  self.numberEventsInLuminosityBlock = opt.numberEventsInLuminosityBlock
60  self.numberOfStreams = 0
61  if(opt.nStreams>0):
62  self.numberOfStreams = opt.nStreams
63  self.batchName = ''
64  self.batchTime = str(int(time.time()))
65  if(opt.batchName):
66  self.batchName = '__'+opt.batchName+'-'+self.batchTime
67 
68  #wagemt stuff
69  if not self.wmagent:
70  self.wmagent=os.getenv('WMAGENT_REQMGR')
71  if not self.wmagent:
72  if not opt.testbed :
73  self.wmagent = 'cmsweb.cern.ch'
74  self.DbsUrl = "https://"+self.wmagent+"/dbs/prod/global/DBSReader"
75  else :
76  self.wmagent = 'cmsweb-testbed.cern.ch'
77  self.DbsUrl = "https://"+self.wmagent+"/dbs/int/global/DBSReader"
78 
79  if not self.dqmgui:
80  self.dqmgui="https://cmsweb.cern.ch/dqm/relval"
81  #couch stuff
82  self.couch = 'https://'+self.wmagent+'/couchdb'
83 # self.couchDB = 'reqmgr_config_cache'
84  self.couchCache={} # so that we do not upload like crazy, and recyle cfgs
85  self.user = os.getenv('USER')
86  self.group = 'ppd'
87  self.label = 'RelValSet_'+os.getenv('CMSSW_VERSION').replace('-','')+'_v'+str(self.version)
88  self.speciallabel=''
89  if opt.label:
90  self.speciallabel= '_'+opt.label
91  self.longWFName = []
92 
93  if not os.getenv('WMCORE_ROOT'):
94  print('\n\twmclient is not setup properly. Will not be able to upload or submit requests.\n')
95  if not self.testMode:
96  print('\n\t QUIT\n')
97  sys.exit(-18)
98  else:
99  print('\n\tFound wmclient\n')
100 
102  "RequestType" : "TaskChain", #this is how we handle relvals
103  "SubRequestType" : "RelVal", #this is how we handle relvals, now that TaskChain is also used for central MC production
104  "RequestPriority": 500000,
105  "Requestor": self.user, #Person responsible
106  "Group": self.group, #group for the request
107  "CMSSWVersion": os.getenv('CMSSW_VERSION'), #CMSSW Version (used for all tasks in chain)
108  "Campaign": os.getenv('CMSSW_VERSION'), # = AcquisitionEra, will be reset later to the one of first task, will both be the CMSSW_VERSION
109  "ScramArch": os.getenv('SCRAM_ARCH'), #Scram Arch (used for all tasks in chain)
110  "ProcessingVersion": self.version, #Processing Version (used for all tasks in chain)
111  "GlobalTag": None, #Global Tag (overridden per task)
112  "ConfigCacheUrl": self.couch, #URL of CouchDB containing Config Cache
113  "DbsUrl": self.DbsUrl,
114  #- Will contain all configs for all Tasks
115  #"SiteWhitelist" : ["T2_CH_CERN", "T1_US_FNAL"], #Site whitelist
116  "TaskChain" : None, #Define number of tasks in chain.
117  "nowmTasklist" : [], #a list of tasks as we put them in
118  "Multicore" : 1, # do not set multicore for the whole chain
119  "Memory" : 3000,
120  "SizePerEvent" : 1234,
121  "TimePerEvent" : 10,
122  "PrepID": os.getenv('CMSSW_VERSION')
123  }
124 
126  "EnableHarvesting" : "True",
127  "DQMUploadUrl" : self.dqmgui,
128  "DQMConfigCacheID" : None,
129  "Multicore" : 1 # hardcode Multicore to be 1 for Harvest
130  }
131 
133  "TaskName" : None, #Task Name
134  "ConfigCacheID" : None, #Generator Config id
135  "GlobalTag": None,
136  "SplittingAlgo" : "EventBased", #Splitting Algorithm
137  "EventsPerJob" : None, #Size of jobs in terms of splitting algorithm
138  "EventsPerLumi" : None,
139  "RequestNumEvents" : None, #Total number of events to generate
140  "Seeding" : "AutomaticSeeding", #Random seeding method
141  "PrimaryDataset" : None, #Primary Dataset to be created
142  "nowmIO": {},
143  "Multicore" : opt.nThreads, # this is the per-taskchain Multicore; it's the default assigned to a task if it has no value specified
144  "EventStreams": self.numberOfStreams,
145  "KeepOutput" : False
146  }
148  "TaskName" : "DigiHLT", #Task Name
149  "ConfigCacheID" : None, #Processing Config id
150  "GlobalTag": None,
151  "InputDataset" : None, #Input Dataset to be processed
152  "SplittingAlgo" : "LumiBased", #Splitting Algorithm
153  "LumisPerJob" : 10, #Size of jobs in terms of splitting algorithm
154  "nowmIO": {},
155  "Multicore" : opt.nThreads, # this is the per-taskchain Multicore; it's the default assigned to a task if it has no value specified
156  "EventStreams": self.numberOfStreams,
157  "KeepOutput" : False
158  }
159  self.defaultTask={
160  "TaskName" : None, #Task Name
161  "InputTask" : None, #Input Task Name (Task Name field of a previous Task entry)
162  "InputFromOutputModule" : None, #OutputModule name in the input task that will provide files to process
163  "ConfigCacheID" : None, #Processing Config id
164  "GlobalTag": None,
165  "SplittingAlgo" : "LumiBased", #Splitting Algorithm
166  "LumisPerJob" : 10, #Size of jobs in terms of splitting algorithm
167  "nowmIO": {},
168  "Multicore" : opt.nThreads, # this is the per-taskchain Multicore; it's the default assigned to a task if it has no value specified
169  "EventStreams": self.numberOfStreams,
170  "KeepOutput" : False
171  }
172 
173  self.chainDicts={}
174 
175  @staticmethod
176  def get_wmsplit():
177  """
178  Return a "wmsplit" dictionary that contain non-default LumisPerJob values
179  """
180  wmsplit = {}
181  try:
182  wmsplit['DIGIHI'] = 5
183  wmsplit['RECOHI'] = 5
184  wmsplit['HLTD'] = 5
185  wmsplit['RECODreHLT'] = 2
186  wmsplit['DIGIPU'] = 4
187  wmsplit['DIGIPU1'] = 4
188  wmsplit['RECOPU1'] = 1
189  wmsplit['DIGIUP15_PU50'] = 1
190  wmsplit['RECOUP15_PU50'] = 1
191  wmsplit['DIGIUP15_PU25'] = 1
192  wmsplit['RECOUP15_PU25'] = 1
193  wmsplit['DIGIUP15_PU25HS'] = 1
194  wmsplit['RECOUP15_PU25HS'] = 1
195  wmsplit['DIGIHIMIX'] = 5
196  wmsplit['RECOHIMIX'] = 5
197  wmsplit['RECODSplit'] = 1
198  wmsplit['SingleMuPt10_UP15_ID'] = 1
199  wmsplit['DIGIUP15_ID'] = 1
200  wmsplit['RECOUP15_ID'] = 1
201  wmsplit['TTbar_13_ID'] = 1
202  wmsplit['SingleMuPt10FS_ID'] = 1
203  wmsplit['TTbarFS_ID'] = 1
204  wmsplit['RECODR2_50nsreHLT'] = 5
205  wmsplit['RECODR2_25nsreHLT'] = 5
206  wmsplit['RECODR2_2016reHLT'] = 5
207  wmsplit['RECODR2_50nsreHLT_HIPM'] = 5
208  wmsplit['RECODR2_25nsreHLT_HIPM'] = 5
209  wmsplit['RECODR2_2016reHLT_HIPM'] = 1
210  wmsplit['RECODR2_2016reHLT_skimSingleMu'] = 1
211  wmsplit['RECODR2_2016reHLT_skimDoubleEG'] = 1
212  wmsplit['RECODR2_2016reHLT_skimMuonEG'] = 1
213  wmsplit['RECODR2_2016reHLT_skimJetHT'] = 1
214  wmsplit['RECODR2_2016reHLT_skimMET'] = 1
215  wmsplit['RECODR2_2016reHLT_skimSinglePh'] = 1
216  wmsplit['RECODR2_2016reHLT_skimMuOnia'] = 1
217  wmsplit['RECODR2_2016reHLT_skimSingleMu_HIPM'] = 1
218  wmsplit['RECODR2_2016reHLT_skimDoubleEG_HIPM'] = 1
219  wmsplit['RECODR2_2016reHLT_skimMuonEG_HIPM'] = 1
220  wmsplit['RECODR2_2016reHLT_skimJetHT_HIPM'] = 1
221  wmsplit['RECODR2_2016reHLT_skimMET_HIPM'] = 1
222  wmsplit['RECODR2_2016reHLT_skimSinglePh_HIPM'] = 1
223  wmsplit['RECODR2_2016reHLT_skimMuOnia_HIPM'] = 1
224  wmsplit['RECODR2_2017reHLT_Prompt'] = 1
225  wmsplit['RECODR2_2017reHLT_skimSingleMu_Prompt_Lumi'] = 1
226  wmsplit['RECODR2_2017reHLT_skimDoubleEG_Prompt'] = 1
227  wmsplit['RECODR2_2017reHLT_skimMET_Prompt'] = 1
228  wmsplit['RECODR2_2017reHLT_skimMuOnia_Prompt'] = 1
229  wmsplit['RECODR2_2017reHLT_Prompt_L1TEgDQM'] = 1
230  wmsplit['RECODR2_2018reHLT_Prompt'] = 1
231  wmsplit['RECODR2_2018reHLT_skimSingleMu_Prompt_Lumi'] = 1
232  wmsplit['RECODR2_2018reHLT_skimDoubleEG_Prompt'] = 1
233  wmsplit['RECODR2_2018reHLT_skimJetHT_Prompt'] = 1
234  wmsplit['RECODR2_2018reHLT_skimMET_Prompt'] = 1
235  wmsplit['RECODR2_2018reHLT_skimMuOnia_Prompt'] = 1
236  wmsplit['RECODR2_2018reHLT_skimEGamma_Prompt_L1TEgDQM'] = 1
237  wmsplit['RECODR2_2018reHLT_skimMuonEG_Prompt'] = 1
238  wmsplit['RECODR2_2018reHLT_skimCharmonium_Prompt'] = 1
239  wmsplit['RECODR2_2018reHLT_skimJetHT_Prompt_HEfail'] = 1
240  wmsplit['RECODR2_2018reHLT_skimJetHT_Prompt_BadHcalMitig'] = 1
241  wmsplit['RECODR2_2018reHLTAlCaTkCosmics_Prompt'] = 1
242  wmsplit['RECODR2_2018reHLT_skimDisplacedJet_Prompt'] = 1
243  wmsplit['RECODR2_2018reHLT_ZBPrompt'] = 1
244  wmsplit['RECODR2_2018reHLT_Offline'] = 1
245  wmsplit['RECODR2_2018reHLT_skimSingleMu_Offline_Lumi'] = 1
246  wmsplit['RECODR2_2018reHLT_skimDoubleEG_Offline'] = 1
247  wmsplit['RECODR2_2018reHLT_skimJetHT_Offline'] = 1
248  wmsplit['RECODR2_2018reHLT_skimMET_Offline'] = 1
249  wmsplit['RECODR2_2018reHLT_skimMuOnia_Offline'] = 1
250  wmsplit['RECODR2_2018reHLT_skimEGamma_Offline_L1TEgDQM'] = 1
251  wmsplit['RECODR2_2018reHLT_skimMuonEG_Offline'] = 1
252  wmsplit['RECODR2_2018reHLT_skimCharmonium_Offline'] = 1
253  wmsplit['RECODR2_2018reHLT_skimJetHT_Offline_HEfail'] = 1
254  wmsplit['RECODR2_2018reHLT_skimJetHT_Offline_BadHcalMitig'] = 1
255  wmsplit['RECODR2_2018reHLTAlCaTkCosmics_Offline'] = 1
256  wmsplit['RECODR2_2018reHLT_skimDisplacedJet_Offline'] = 1
257  wmsplit['RECODR2_2018reHLT_ZBOffline'] = 1
258  wmsplit['HLTDR2_50ns'] = 1
259  wmsplit['HLTDR2_25ns'] = 1
260  wmsplit['HLTDR2_2016'] = 1
261  wmsplit['HLTDR2_2017'] = 1
262  wmsplit['HLTDR2_2018'] = 1
263  wmsplit['HLTDR2_2018_BadHcalMitig'] = 1
264  wmsplit['Hadronizer'] = 1
265  wmsplit['DIGIUP15'] = 1
266  wmsplit['RECOUP15'] = 1
267  wmsplit['RECOAODUP15'] = 5
268  wmsplit['DBLMINIAODMCUP15NODQM'] = 5
269  wmsplit['Digi'] = 5
270  wmsplit['Reco'] = 5
271  wmsplit['DigiPU'] = 1
272  wmsplit['RecoPU'] = 1
273  wmsplit['RECOHID11'] = 1
274  wmsplit['DIGIUP17'] = 1
275  wmsplit['RECOUP17'] = 1
276  wmsplit['DIGIUP17_PU25'] = 1
277  wmsplit['RECOUP17_PU25'] = 1
278  wmsplit['DIGICOS_UP16'] = 1
279  wmsplit['RECOCOS_UP16'] = 1
280  wmsplit['DIGICOS_UP17'] = 1
281  wmsplit['RECOCOS_UP17'] = 1
282  wmsplit['DIGICOS_UP18'] = 1
283  wmsplit['RECOCOS_UP18'] = 1
284  wmsplit['DIGICOS_UP21'] = 1
285  wmsplit['RECOCOS_UP21'] = 1
286  wmsplit['HYBRIDRepackHI2015VR'] = 1
287  wmsplit['HYBRIDZSHI2015'] = 1
288  wmsplit['RECOHID15'] = 1
289  wmsplit['RECOHID18'] = 1
290  # automate for phase 2
291  from .upgradeWorkflowComponents import upgradeKeys
292  for key in upgradeKeys[2026]:
293  if 'PU' not in key:
294  continue
295 
296  wmsplit['DigiTriggerPU_' + key] = 1
297  wmsplit['RecoGlobalPU_' + key] = 1
298 
299  except Exception as ex:
300  print('Exception while building a wmsplit dictionary: %s' % (str(ex)))
301  return {}
302 
303  return wmsplit
304 
305  def prepare(self, mReader, directories, mode='init'):
306  wmsplit = MatrixInjector.get_wmsplit()
307  acqEra=False
308  for (n,dir) in directories.items():
309  chainDict=copy.deepcopy(self.defaultChain)
310  print("inspecting",dir)
311  nextHasDSInput=None
312  for (x,s) in mReader.workFlowSteps.items():
313  #x has the format (num, prefix)
314  #s has the format (num, name, commands, stepList)
315  if x[0]==n:
316  #print "found",n,s[3]
317  #chainDict['RequestString']='RV'+chainDict['CMSSWVersion']+s[1].split('+')[0]
318  index=0
319  splitForThisWf=None
320  thisLabel=self.speciallabel
321  #if 'HARVESTGEN' in s[3]:
322  if len( [step for step in s[3] if "HARVESTGEN" in step] )>0:
323  chainDict['TimePerEvent']=0.01
324  thisLabel=thisLabel+"_gen"
325  # for double miniAOD test
326  if len( [step for step in s[3] if "DBLMINIAODMCUP15NODQM" in step] )>0:
327  thisLabel=thisLabel+"_dblMiniAOD"
328  processStrPrefix=''
329  setPrimaryDs=None
330  nanoedmGT=''
331  for step in s[3]:
332 
333  if 'INPUT' in step or (not isinstance(s[2][index],str)):
334  nextHasDSInput=s[2][index]
335 
336  else:
337 
338  if (index==0):
339  #first step and not input -> gen part
340  chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultScratch))
341  try:
342  chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
343  except:
344  print("Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created")
345  return -15
346 
347  chainDict['nowmTasklist'][-1]['PrimaryDataset']='RelVal'+s[1].split('+')[0]
348  if not '--relval' in s[2][index]:
349  print('Impossible to create task from scratch without splitting information with --relval')
350  return -12
351  else:
352  arg=s[2][index].split()
353  ns=list(map(int,arg[len(arg) - arg[-1::-1].index('--relval')].split(',')))
354  chainDict['nowmTasklist'][-1]['RequestNumEvents'] = ns[0]
355  chainDict['nowmTasklist'][-1]['EventsPerJob'] = ns[1]
356  chainDict['nowmTasklist'][-1]['EventsPerLumi'] = ns[1]
357  #overwrite EventsPerLumi if numberEventsInLuminosityBlock is set in cmsDriver
358  if 'numberEventsInLuminosityBlock' in s[2][index]:
359  nEventsInLuminosityBlock = re.findall('process.source.numberEventsInLuminosityBlock=cms.untracked.uint32\(([ 0-9 ]*)\)', s[2][index],re.DOTALL)
360  if nEventsInLuminosityBlock[-1].isdigit() and int(nEventsInLuminosityBlock[-1]) < ns[1]:
361  chainDict['nowmTasklist'][-1]['EventsPerLumi'] = int(nEventsInLuminosityBlock[-1])
362  if(self.numberEventsInLuminosityBlock > 0 and self.numberEventsInLuminosityBlock <= ns[1]):
363  chainDict['nowmTasklist'][-1]['EventsPerLumi'] = self.numberEventsInLuminosityBlock
364  if 'FASTSIM' in s[2][index] or '--fast' in s[2][index]:
365  thisLabel+='_FastSim'
366  if 'lhe' in s[2][index] in s[2][index]:
367  chainDict['nowmTasklist'][-1]['LheInputFiles'] =True
368 
369  elif nextHasDSInput:
370  chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultInput))
371  try:
372  chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
373  except:
374  print("Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created")
375  return -15
376  chainDict['nowmTasklist'][-1]['InputDataset']=nextHasDSInput.dataSet
377  if ('DQMHLTonRAWAOD' in step) :
378  chainDict['nowmTasklist'][-1]['IncludeParents']=True
379  splitForThisWf=nextHasDSInput.split
380  chainDict['nowmTasklist'][-1]['LumisPerJob']=splitForThisWf
381  if step in wmsplit:
382  chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit[step]
383  # get the run numbers or #events
384  if len(nextHasDSInput.run):
385  chainDict['nowmTasklist'][-1]['RunWhitelist']=nextHasDSInput.run
386  if len(nextHasDSInput.ls):
387  chainDict['nowmTasklist'][-1]['LumiList']=nextHasDSInput.ls
388  #print "what is s",s[2][index]
389  if '--data' in s[2][index] and nextHasDSInput.label:
390  thisLabel+='_RelVal_%s'%nextHasDSInput.label
391  if 'filter' in chainDict['nowmTasklist'][-1]['nowmIO']:
392  print("This has an input DS and a filter sequence: very likely to be the PyQuen sample")
393  processStrPrefix='PU_'
394  setPrimaryDs = 'RelVal'+s[1].split('+')[0]
395  if setPrimaryDs:
396  chainDict['nowmTasklist'][-1]['PrimaryDataset']=setPrimaryDs
397  nextHasDSInput=None
398  else:
399  #not first step and no inputDS
400  chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultTask))
401  try:
402  chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
403  except:
404  print("Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created")
405  return -15
406  if splitForThisWf:
407  chainDict['nowmTasklist'][-1]['LumisPerJob']=splitForThisWf
408  if step in wmsplit:
409  chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit[step]
410 
411  # change LumisPerJob for Hadronizer steps.
412  if 'Hadronizer' in step:
413  chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit['Hadronizer']
414 
415  #print step
416  chainDict['nowmTasklist'][-1]['TaskName']=step
417  if setPrimaryDs:
418  chainDict['nowmTasklist'][-1]['PrimaryDataset']=setPrimaryDs
419  chainDict['nowmTasklist'][-1]['ConfigCacheID']='%s/%s.py'%(dir,step)
420  chainDict['nowmTasklist'][-1]['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT'] # copy to the proper parameter name
421  chainDict['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT'] #set in general to the last one of the chain
422  if 'NANOEDM' in step :
423  nanoedmGT = chainDict['nowmTasklist'][-1]['nowmIO']['GT']
424  if 'NANOMERGE' in step :
425  chainDict['GlobalTag'] = nanoedmGT
426  if 'pileup' in chainDict['nowmTasklist'][-1]['nowmIO']:
427  chainDict['nowmTasklist'][-1]['MCPileup']=chainDict['nowmTasklist'][-1]['nowmIO']['pileup']
428  if '--pileup ' in s[2][index]: # catch --pileup (scenarion) and not --pileup_ (dataset to be mixed) => works also making PRE-MIXed dataset
429  processStrPrefix='PU_' # take care of pu overlay done with GEN-SIM mixing
430  if ( s[2][index].split()[ s[2][index].split().index('--pileup')+1 ] ).find('25ns') > 0 :
431  processStrPrefix='PU25ns_'
432  elif ( s[2][index].split()[ s[2][index].split().index('--pileup')+1 ] ).find('50ns') > 0 :
433  processStrPrefix='PU50ns_'
434  if 'premix_stage2' in s[2][index] and '--pileup_input' in s[2][index]: # take care of pu overlay done with DIGI mixing of premixed events
435  if s[2][index].split()[ s[2][index].split().index('--pileup_input')+1 ].find('25ns') > 0 :
436  processStrPrefix='PUpmx25ns_'
437  elif s[2][index].split()[ s[2][index].split().index('--pileup_input')+1 ].find('50ns') > 0 :
438  processStrPrefix='PUpmx50ns_'
439 
440  if acqEra:
441  #chainDict['AcquisitionEra'][step]=(chainDict['CMSSWVersion']+'-PU_'+chainDict['nowmTasklist'][-1]['GlobalTag']).replace('::All','')+thisLabel
442  chainDict['AcquisitionEra'][step]=chainDict['CMSSWVersion']
443  chainDict['ProcessingString'][step]=processStrPrefix+chainDict['nowmTasklist'][-1]['GlobalTag'].replace('::All','').replace('-','_')+thisLabel
444  if 'NANOMERGE' in step :
445  chainDict['ProcessingString'][step]=processStrPrefix+nanoedmGT.replace('::All','').replace('-','_')+thisLabel
446  else:
447  #chainDict['nowmTasklist'][-1]['AcquisitionEra']=(chainDict['CMSSWVersion']+'-PU_'+chainDict['nowmTasklist'][-1]['GlobalTag']).replace('::All','')+thisLabel
448  chainDict['nowmTasklist'][-1]['AcquisitionEra']=chainDict['CMSSWVersion']
449  chainDict['nowmTasklist'][-1]['ProcessingString']=processStrPrefix+chainDict['nowmTasklist'][-1]['GlobalTag'].replace('::All','').replace('-','_')+thisLabel
450  if 'NANOMERGE' in step :
451  chainDict['nowmTasklist'][-1]['ProcessingString']=processStrPrefix+nanoedmGT.replace('::All','').replace('-','_')+thisLabel
452 
453  if (self.batchName):
454  chainDict['nowmTasklist'][-1]['Campaign'] = chainDict['nowmTasklist'][-1]['AcquisitionEra']+self.batchName
455 
456  # specify different ProcessingString for double miniAOD dataset
457  if ('DBLMINIAODMCUP15NODQM' in step):
458  chainDict['nowmTasklist'][-1]['ProcessingString']=chainDict['nowmTasklist'][-1]['ProcessingString']+'_miniAOD'
459 
460  if( chainDict['nowmTasklist'][-1]['Multicore'] ):
461  # the scaling factor of 1.2GB / thread is empirical and measured on a SECOND round of tests with PU samples
462  # the number of threads is NO LONGER assumed to be the same for all tasks
463  # https://hypernews.cern.ch/HyperNews/CMS/get/edmFramework/3509/1/1/1.html
464  # now change to 1.5GB / additional thread according to discussion:
465  # https://hypernews.cern.ch/HyperNews/CMS/get/relval/4817/1/1.html
466 # chainDict['nowmTasklist'][-1]['Memory'] = 3000 + int( chainDict['nowmTasklist'][-1]['Multicore'] -1 )*1500
467  chainDict['nowmTasklist'][-1]['Memory'] = self.memoryOffset + int( chainDict['nowmTasklist'][-1]['Multicore'] -1 ) * self.memPerCore
468 
469  index+=1
470  #end of loop through steps
471  chainDict['RequestString']='RV'+chainDict['CMSSWVersion']+s[1].split('+')[0]
472  if processStrPrefix or thisLabel:
473  chainDict['RequestString']+='_'+processStrPrefix+thisLabel
474  #check candidate WF name
475  self.candidateWFName = self.user+'_'+chainDict['RequestString']
476  if (len(self.candidateWFName)>MAXWORKFLOWLENGTH):
478 
479 
480  chainDict['PrepID'] = chainDict['CMSSWVersion']+'__'+self.batchTime+'-'+s[1].split('+')[0]
481  if(self.batchName):
482  chainDict['PrepID'] = chainDict['CMSSWVersion']+self.batchName+'-'+s[1].split('+')[0]
483  if( 'HIN' in self.batchName ):
484  chainDict['SubRequestType'] = "HIRelVal"
485 
486  #wrap up for this one
487  import pprint
488  #print 'wrapping up'
489  #pprint.pprint(chainDict)
490  #loop on the task list
491  for i_second in reversed(range(len(chainDict['nowmTasklist']))):
492  t_second=chainDict['nowmTasklist'][i_second]
493  #print "t_second taskname", t_second['TaskName']
494  if 'primary' in t_second['nowmIO']:
495  #print t_second['nowmIO']['primary']
496  primary=t_second['nowmIO']['primary'][0].replace('file:','')
497  for i_input in reversed(range(0,i_second)):
498  t_input=chainDict['nowmTasklist'][i_input]
499  for (om,o) in t_input['nowmIO'].items():
500  if primary in o:
501  #print "found",primary,"procuced by",om,"of",t_input['TaskName']
502  #ad-hoc fix due to restriction in TaskName of 50 characters
503  if (len(t_input['TaskName'])>50):
504  if (t_input['TaskName'].find('GenSim') != -1):
505  t_input['TaskName'] = 'GenSimFull'
506  if (t_input['TaskName'].find('Hadronizer') != -1):
507  t_input['TaskName'] = 'HadronizerFull'
508  t_second['InputTask'] = t_input['TaskName']
509  t_second['InputFromOutputModule'] = om
510  #print 't_second',pprint.pformat(t_second)
511  if t_second['TaskName'].startswith('HARVEST'):
512  chainDict.update(copy.deepcopy(self.defaultHarvest))
513  if "_RD" in t_second['TaskName']:
514  chainDict['DQMHarvestUnit'] = "multiRun"
515  chainDict['DQMConfigCacheID']=t_second['ConfigCacheID']
516 
519  break
520 
521  # agreed changes for wm injection:
522  # - Campaign: *optional* string during creation. It will default to AcqEra value if possible.
523  # Otherwise it will be empty.
524  # - AcquisitionEra: *mandatory* string at request level during creation. *optional* string
525  # at task level during creation. "optional" during assignment.
526  # - ProcessingString: *mandatory* string at request level during creation. *optional* string
527  # at task level during creation. "optional" during assignment.
528  # - ProcessingVersion: *optional* during creation (default 1). *optional* during assignment.
529  #
530  # Which requires following changes here:
531  # - reset Global AcuisitionEra, ProcessingString to be the one in the first task
532  # - and also Campaign to be always the same as the AcquisitionEra
533 
534  if acqEra:
535  chainDict['AcquisitionEra'] = chainDict['AcquisitionEra'].values()[0]
536  chainDict['ProcessingString'] = chainDict['ProcessingString'].values()[0]
537  else:
538  chainDict['AcquisitionEra'] = chainDict['nowmTasklist'][0]['AcquisitionEra']
539  chainDict['ProcessingString'] = chainDict['nowmTasklist'][0]['ProcessingString']
540 
541 
543  chainDict['Campaign'] = chainDict['AcquisitionEra']+self.batchName
544 
545 
546  itask=0
547  if self.keep:
548  for i in self.keep:
549  if isinstance(i, int) and i < len(chainDict['nowmTasklist']):
550  chainDict['nowmTasklist'][i]['KeepOutput']=True
551  for (i,t) in enumerate(chainDict['nowmTasklist']):
552  if t['TaskName'].startswith('HARVEST'):
553  continue
554  if not self.keep:
555  t['KeepOutput']=True
556  elif t['TaskName'] in self.keep:
557  t['KeepOutput']=True
558  if t['TaskName'].startswith('HYBRIDRepackHI2015VR'):
559  t['KeepOutput']=False
560  t.pop('nowmIO')
561  itask+=1
562  chainDict['Task%d'%(itask)]=t
563 
564 
565 
566 
567 
568 
569  chainDict['TaskChain']=itask#len(chainDict['nowmTasklist'])
570 
571  chainDict.pop('nowmTasklist')
572  self.chainDicts[n]=chainDict
573 
574 
575  return 0
576 
577  def uploadConf(self,filePath,label,where):
578  labelInCouch=self.label+'_'+label
579  cacheName=filePath.split('/')[-1]
580  if self.testMode:
581  self.count+=1
582  print('\tFake upload of',filePath,'to couch with label',labelInCouch)
583  return self.count
584  else:
585  try:
586  from modules.wma import upload_to_couch,DATABASE_NAME
587  except:
588  print('\n\tUnable to find wmcontrol modules. Please include it in your python path\n')
589  print('\n\t QUIT\n')
590  sys.exit(-16)
591 
592  if cacheName in self.couchCache:
593  print("Not re-uploading",filePath,"to",where,"for",label)
594  cacheId=self.couchCache[cacheName]
595  else:
596  print("Loading",filePath,"to",where,"for",label)
597 
598  pool = multiprocessing.Pool(1)
599  cacheIds = pool.map( upload_to_couch_oneArg, [(filePath,labelInCouch,self.user,self.group,where)] )
600  cacheId = cacheIds[0]
601  self.couchCache[cacheName]=cacheId
602  return cacheId
603 
604  def upload(self):
605  for (n,d) in self.chainDicts.items():
606  for it in d:
607  if it.startswith("Task") and it!='TaskChain':
608  #upload
609  couchID=self.uploadConf(d[it]['ConfigCacheID'],
610  str(n)+d[it]['TaskName'],
611  d['ConfigCacheUrl']
612  )
613  print(d[it]['ConfigCacheID']," uploaded to couchDB for",str(n),"with ID",couchID)
614  d[it]['ConfigCacheID']=couchID
615  if it =='DQMConfigCacheID':
616  couchID=self.uploadConf(d['DQMConfigCacheID'],
617  str(n)+'harvesting',
618  d['ConfigCacheUrl']
619  )
620  print(d['DQMConfigCacheID'],"uploaded to couchDB for",str(n),"with ID",couchID)
621  d['DQMConfigCacheID']=couchID
622 
623 
624  def submit(self):
625  try:
626  from modules.wma import makeRequest,approveRequest
627  from wmcontrol import random_sleep
628  print('\n\tFound wmcontrol\n')
629  except:
630  print('\n\tUnable to find wmcontrol modules. Please include it in your python path\n')
631  if not self.testMode:
632  print('\n\t QUIT\n')
633  sys.exit(-17)
634 
635  import pprint
636  for (n,d) in self.chainDicts.items():
637  if self.testMode:
638  print("Only viewing request",n)
639  print(pprint.pprint(d))
640  else:
641  #submit to wmagent each dict
642  print("For eyes before submitting",n)
643  print(pprint.pprint(d))
644  print("Submitting",n,"...........")
645  workFlow=makeRequest(self.wmagent,d,encodeDict=True)
646  print("...........",n,"submitted")
647  random_sleep()
648  if self.testMode and len(self.longWFName)>0:
649  print("\n*** WARNING: "+str(len(self.longWFName))+" workflows have too long names for submission (>"+str(MAXWORKFLOWLENGTH)+ "characters) ***")
650  print('\n'.join(self.longWFName))
MatrixInjector.MatrixInjector.upload
def upload(self)
Definition: MatrixInjector.py:604
MatrixInjector.MatrixInjector.longWFName
longWFName
Definition: MatrixInjector.py:91
FastTimerService_cff.range
range
Definition: FastTimerService_cff.py:34
resolutioncreator_cfi.object
object
Definition: resolutioncreator_cfi.py:4
MatrixInjector.MatrixInjector
Definition: MatrixInjector.py:41
MatrixInjector.MatrixInjector.defaultInput
defaultInput
Definition: MatrixInjector.py:147
MatrixInjector.MatrixInjector.keep
keep
Definition: MatrixInjector.py:56
MatrixInjector.MatrixInjector.memoryOffset
memoryOffset
Definition: MatrixInjector.py:57
MatrixInjector.MatrixInjector.couch
couch
Definition: MatrixInjector.py:82
join
static std::string join(char **cmd)
Definition: RemoteFile.cc:17
MatrixInjector.MatrixInjector.DbsUrl
DbsUrl
Definition: MatrixInjector.py:74
MatrixInjector.MatrixInjector.couchCache
couchCache
Definition: MatrixInjector.py:84
spr::find
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:19
MatrixInjector.upload_to_couch_oneArg
def upload_to_couch_oneArg(arguments)
Definition: MatrixInjector.py:29
mps_monitormerge.items
list items
Definition: mps_monitormerge.py:29
MatrixInjector.MatrixInjector.wmagent
wmagent
Definition: MatrixInjector.py:47
MatrixInjector.MatrixInjector.batchTime
batchTime
Definition: MatrixInjector.py:64
MatrixInjector.MatrixInjector.candidateWFName
candidateWFName
Definition: MatrixInjector.py:475
MatrixInjector.MatrixInjector.speciallabel
speciallabel
Definition: MatrixInjector.py:88
submitPVValidationJobs.split
def split(sequence, size)
Definition: submitPVValidationJobs.py:352
contentValuesCheck.values
values
Definition: contentValuesCheck.py:38
MatrixInjector.MatrixInjector.uploadConf
def uploadConf(self, filePath, label, where)
the info are not in the task specific dict but in the general dict t_input.update(copy....
Definition: MatrixInjector.py:577
str
#define str(s)
Definition: TestProcessor.cc:51
MatrixInjector.MatrixInjector.defaultHarvest
defaultHarvest
Definition: MatrixInjector.py:125
MatrixInjector.MatrixInjector.prepare
def prepare(self, mReader, directories, mode='init')
Definition: MatrixInjector.py:305
MatrixInjector.MatrixInjector.group
group
Definition: MatrixInjector.py:86
MatrixInjector.MatrixInjector.chainDicts
chainDicts
Definition: MatrixInjector.py:173
MatrixInjector.MatrixInjector.defaultTask
defaultTask
Definition: MatrixInjector.py:159
MatrixInjector.MatrixInjector.testMode
testMode
Definition: MatrixInjector.py:54
MatrixInjector.MatrixInjector.numberOfStreams
numberOfStreams
Definition: MatrixInjector.py:60
print
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:46
mps_setup.append
append
Definition: mps_setup.py:85
MatrixInjector.MatrixInjector.batchName
batchName
Definition: MatrixInjector.py:63
MatrixInjector.MatrixInjector.count
count
Definition: MatrixInjector.py:44
MatrixInjector.MatrixInjector.defaultScratch
defaultScratch
Definition: MatrixInjector.py:132
createfilelist.int
int
Definition: createfilelist.py:10
MatrixInjector.MatrixInjector.memPerCore
memPerCore
Definition: MatrixInjector.py:58
MatrixInjector.MatrixInjector.__init__
def __init__(self, opt, mode='init', options='')
Definition: MatrixInjector.py:43
MatrixInjector.performInjectionOptionTest
def performInjectionOptionTest(opt)
Definition: MatrixInjector.py:12
MatrixInjector.MatrixInjector.submit
def submit(self)
Definition: MatrixInjector.py:624
MatrixInjector.MatrixInjector.get_wmsplit
def get_wmsplit()
Definition: MatrixInjector.py:176
readEcalDQMStatus.read
read
Definition: readEcalDQMStatus.py:38
MatrixInjector.MatrixInjector.label
label
Definition: MatrixInjector.py:87
MatrixInjector.MatrixInjector.numberEventsInLuminosityBlock
numberEventsInLuminosityBlock
Definition: MatrixInjector.py:59
AlignmentPI::index
index
Definition: AlignmentPayloadInspectorHelper.h:46
genParticles_cff.map
map
Definition: genParticles_cff.py:11
MatrixInjector.MatrixInjector.version
version
Definition: MatrixInjector.py:55
MatrixInjector.MatrixInjector.user
user
Definition: MatrixInjector.py:85
MatrixInjector.MatrixInjector.defaultChain
defaultChain
Definition: MatrixInjector.py:101
MatrixInjector.MatrixInjector.dqmgui
dqmgui
Definition: MatrixInjector.py:46
python.rootplot.root2matplotlib.replace
def replace(string, replacements)
Definition: root2matplotlib.py:444