CMS 3D CMS Logo

MatrixInjector.py
Go to the documentation of this file.
1 from __future__ import print_function
2 import sys
3 import json
4 import os
5 import copy
6 import multiprocessing
7 import time
8 import re
9 
10 MAXWORKFLOWLENGTH = 81
11 
13  if opt.show:
14  print('Not injecting to wmagent in --show mode. Need to run the worklfows.')
15  sys.exit(-1)
16  if opt.wmcontrol=='init':
17  #init means it'll be in test mode
18  opt.nProcs=0
19  if opt.wmcontrol=='test':
20  #means the wf were created already, and we just dryRun it.
21  opt.dryRun=True
22  if opt.wmcontrol=='submit' and opt.nProcs==0:
23  print('Not injecting to wmagent in -j 0 mode. Need to run the worklfows.')
24  sys.exit(-1)
25  if opt.wmcontrol=='force':
26  print("This is an expert setting, you'd better know what you're doing")
27  opt.dryRun=True
28 
29 def upload_to_couch_oneArg(arguments):
30  from modules.wma import upload_to_couch
31  (filePath,labelInCouch,user,group,where) = arguments
32  cacheId=upload_to_couch(filePath,
33  labelInCouch,
34  user,
35  group,
36  test_mode=False,
37  url=where)
38  return cacheId
39 
40 
42 
43  def __init__(self,opt,mode='init',options=''):
44  self.count=1040
45 
46  self.dqmgui=None
47  self.wmagent=None
48  for k in options.split(','):
49  if k.startswith('dqm:'):
50  self.dqmgui=k.split(':',1)[-1]
51  elif k.startswith('wma:'):
52  self.wmagent=k.split(':',1)[-1]
53 
54  self.testMode=((mode!='submit') and (mode!='force'))
55  self.version =1
56  self.keep = opt.keep
57  self.memoryOffset = opt.memoryOffset
58  self.memPerCore = opt.memPerCore
59  self.numberEventsInLuminosityBlock = opt.numberEventsInLuminosityBlock
60  self.batchName = ''
61  self.batchTime = str(int(time.time()))
62  if(opt.batchName):
63  self.batchName = '__'+opt.batchName+'-'+self.batchTime
64 
65  #wagemt stuff
66  if not self.wmagent:
67  self.wmagent=os.getenv('WMAGENT_REQMGR')
68  if not self.wmagent:
69  if not opt.testbed :
70  self.wmagent = 'cmsweb.cern.ch'
71  self.DbsUrl = "https://"+self.wmagent+"/dbs/prod/global/DBSReader"
72  else :
73  self.wmagent = 'cmsweb-testbed.cern.ch'
74  self.DbsUrl = "https://"+self.wmagent+"/dbs/int/global/DBSReader"
75 
76  if not self.dqmgui:
77  self.dqmgui="https://cmsweb.cern.ch/dqm/relval"
78  #couch stuff
79  self.couch = 'https://'+self.wmagent+'/couchdb'
80 # self.couchDB = 'reqmgr_config_cache'
81  self.couchCache={} # so that we do not upload like crazy, and recyle cfgs
82  self.user = os.getenv('USER')
83  self.group = 'ppd'
84  self.label = 'RelValSet_'+os.getenv('CMSSW_VERSION').replace('-','')+'_v'+str(self.version)
85  self.speciallabel=''
86  if opt.label:
87  self.speciallabel= '_'+opt.label
88  self.longWFName = []
89 
90  if not os.getenv('WMCORE_ROOT'):
91  print('\n\twmclient is not setup properly. Will not be able to upload or submit requests.\n')
92  if not self.testMode:
93  print('\n\t QUIT\n')
94  sys.exit(-18)
95  else:
96  print('\n\tFound wmclient\n')
97 
98  self.defaultChain={
99  "RequestType" : "TaskChain", #this is how we handle relvals
100  "SubRequestType" : "RelVal", #this is how we handle relvals, now that TaskChain is also used for central MC production
101  "RequestPriority": 500000,
102  "Requestor": self.user, #Person responsible
103  "Group": self.group, #group for the request
104  "CMSSWVersion": os.getenv('CMSSW_VERSION'), #CMSSW Version (used for all tasks in chain)
105  "Campaign": os.getenv('CMSSW_VERSION'), # = AcquisitionEra, will be reset later to the one of first task, will both be the CMSSW_VERSION
106  "ScramArch": os.getenv('SCRAM_ARCH'), #Scram Arch (used for all tasks in chain)
107  "ProcessingVersion": self.version, #Processing Version (used for all tasks in chain)
108  "GlobalTag": None, #Global Tag (overridden per task)
109  "ConfigCacheUrl": self.couch, #URL of CouchDB containing Config Cache
110  "DbsUrl": self.DbsUrl,
111  #- Will contain all configs for all Tasks
112  #"SiteWhitelist" : ["T2_CH_CERN", "T1_US_FNAL"], #Site whitelist
113  "TaskChain" : None, #Define number of tasks in chain.
114  "nowmTasklist" : [], #a list of tasks as we put them in
115  "Multicore" : 1, # do not set multicore for the whole chain
116  "Memory" : 3000,
117  "SizePerEvent" : 1234,
118  "TimePerEvent" : 10,
119  "PrepID": os.getenv('CMSSW_VERSION')
120  }
121 
123  "EnableHarvesting" : "True",
124  "DQMUploadUrl" : self.dqmgui,
125  "DQMConfigCacheID" : None,
126  "Multicore" : 1 # hardcode Multicore to be 1 for Harvest
127  }
128 
130  "TaskName" : None, #Task Name
131  "ConfigCacheID" : None, #Generator Config id
132  "GlobalTag": None,
133  "SplittingAlgo" : "EventBased", #Splitting Algorithm
134  "EventsPerJob" : None, #Size of jobs in terms of splitting algorithm
135  "EventsPerLumi" : None,
136  "RequestNumEvents" : None, #Total number of events to generate
137  "Seeding" : "AutomaticSeeding", #Random seeding method
138  "PrimaryDataset" : None, #Primary Dataset to be created
139  "nowmIO": {},
140  "Multicore" : opt.nThreads, # this is the per-taskchain Multicore; it's the default assigned to a task if it has no value specified
141  "KeepOutput" : False
142  }
144  "TaskName" : "DigiHLT", #Task Name
145  "ConfigCacheID" : None, #Processing Config id
146  "GlobalTag": None,
147  "InputDataset" : None, #Input Dataset to be processed
148  "SplittingAlgo" : "LumiBased", #Splitting Algorithm
149  "LumisPerJob" : 10, #Size of jobs in terms of splitting algorithm
150  "nowmIO": {},
151  "Multicore" : opt.nThreads, # this is the per-taskchain Multicore; it's the default assigned to a task if it has no value specified
152  "KeepOutput" : False
153  }
154  self.defaultTask={
155  "TaskName" : None, #Task Name
156  "InputTask" : None, #Input Task Name (Task Name field of a previous Task entry)
157  "InputFromOutputModule" : None, #OutputModule name in the input task that will provide files to process
158  "ConfigCacheID" : None, #Processing Config id
159  "GlobalTag": None,
160  "SplittingAlgo" : "LumiBased", #Splitting Algorithm
161  "LumisPerJob" : 10, #Size of jobs in terms of splitting algorithm
162  "nowmIO": {},
163  "Multicore" : opt.nThreads, # this is the per-taskchain Multicore; it's the default assigned to a task if it has no value specified
164  "KeepOutput" : False
165  }
166 
167  self.chainDicts={}
168 
169 
170  def prepare(self,mReader, directories, mode='init'):
171  try:
172  #from Configuration.PyReleaseValidation.relval_steps import wmsplit
173  wmsplit = {}
174  wmsplit['DIGIHI']=5
175  wmsplit['RECOHI']=5
176  wmsplit['HLTD']=5
177  wmsplit['RECODreHLT']=2
178  wmsplit['DIGIPU']=4
179  wmsplit['DIGIPU1']=4
180  wmsplit['RECOPU1']=1
181  wmsplit['DIGIUP15_PU50']=1
182  wmsplit['RECOUP15_PU50']=1
183  wmsplit['DIGIUP15_PU25']=1
184  wmsplit['RECOUP15_PU25']=1
185  wmsplit['DIGIUP15_PU25HS']=1
186  wmsplit['RECOUP15_PU25HS']=1
187  wmsplit['DIGIHIMIX']=5
188  wmsplit['RECOHIMIX']=5
189  wmsplit['RECODSplit']=1
190  wmsplit['SingleMuPt10_UP15_ID']=1
191  wmsplit['DIGIUP15_ID']=1
192  wmsplit['RECOUP15_ID']=1
193  wmsplit['TTbar_13_ID']=1
194  wmsplit['SingleMuPt10FS_ID']=1
195  wmsplit['TTbarFS_ID']=1
196  wmsplit['RECODR2_50nsreHLT']=5
197  wmsplit['RECODR2_25nsreHLT']=5
198  wmsplit['RECODR2_2016reHLT']=5
199  wmsplit['RECODR2_50nsreHLT_HIPM']=5
200  wmsplit['RECODR2_25nsreHLT_HIPM']=5
201  wmsplit['RECODR2_2016reHLT_HIPM']=1
202  wmsplit['RECODR2_2016reHLT_skimSingleMu']=1
203  wmsplit['RECODR2_2016reHLT_skimDoubleEG']=1
204  wmsplit['RECODR2_2016reHLT_skimMuonEG']=1
205  wmsplit['RECODR2_2016reHLT_skimJetHT']=1
206  wmsplit['RECODR2_2016reHLT_skimMET']=1
207  wmsplit['RECODR2_2016reHLT_skimSinglePh']=1
208  wmsplit['RECODR2_2016reHLT_skimMuOnia']=1
209  wmsplit['RECODR2_2016reHLT_skimSingleMu_HIPM']=1
210  wmsplit['RECODR2_2016reHLT_skimDoubleEG_HIPM']=1
211  wmsplit['RECODR2_2016reHLT_skimMuonEG_HIPM']=1
212  wmsplit['RECODR2_2016reHLT_skimJetHT_HIPM']=1
213  wmsplit['RECODR2_2016reHLT_skimMET_HIPM']=1
214  wmsplit['RECODR2_2016reHLT_skimSinglePh_HIPM']=1
215  wmsplit['RECODR2_2016reHLT_skimMuOnia_HIPM']=1
216  wmsplit['RECODR2_2017reHLT_Prompt']=1
217  wmsplit['RECODR2_2017reHLT_skimSingleMu_Prompt_Lumi']=1
218  wmsplit['RECODR2_2017reHLT_skimDoubleEG_Prompt']=1
219  wmsplit['RECODR2_2017reHLT_skimMET_Prompt']=1
220  wmsplit['RECODR2_2017reHLT_skimMuOnia_Prompt']=1
221  wmsplit['RECODR2_2017reHLT_Prompt_L1TEgDQM']=1
222  wmsplit['RECODR2_2018reHLT_Prompt']=1
223  wmsplit['RECODR2_2018reHLT_skimSingleMu_Prompt_Lumi']=1
224  wmsplit['RECODR2_2018reHLT_skimDoubleEG_Prompt']=1
225  wmsplit['RECODR2_2018reHLT_skimJetHT_Prompt']=1
226  wmsplit['RECODR2_2018reHLT_skimMET_Prompt']=1
227  wmsplit['RECODR2_2018reHLT_skimMuOnia_Prompt']=1
228  wmsplit['RECODR2_2018reHLT_skimEGamma_Prompt_L1TEgDQM']=1
229  wmsplit['RECODR2_2018reHLT_skimMuonEG_Prompt']=1
230  wmsplit['RECODR2_2018reHLT_skimCharmonium_Prompt']=1
231  wmsplit['RECODR2_2018reHLT_skimJetHT_Prompt_HEfail']=1
232  wmsplit['RECODR2_2018reHLT_skimJetHT_Prompt_BadHcalMitig']=1
233  wmsplit['RECODR2_2018reHLTAlCaTkCosmics_Prompt']=1
234  wmsplit['RECODR2_2018reHLT_skimDisplacedJet_Prompt']=1
235  wmsplit['RECODR2_2018reHLT_ZBPrompt']=1
236  wmsplit['RECODR2_2018reHLT_Offline']=1
237  wmsplit['RECODR2_2018reHLT_skimSingleMu_Offline_Lumi']=1
238  wmsplit['RECODR2_2018reHLT_skimDoubleEG_Offline']=1
239  wmsplit['RECODR2_2018reHLT_skimJetHT_Offline']=1
240  wmsplit['RECODR2_2018reHLT_skimMET_Offline']=1
241  wmsplit['RECODR2_2018reHLT_skimMuOnia_Offline']=1
242  wmsplit['RECODR2_2018reHLT_skimEGamma_Offline_L1TEgDQM']=1
243  wmsplit['RECODR2_2018reHLT_skimMuonEG_Offline']=1
244  wmsplit['RECODR2_2018reHLT_skimCharmonium_Offline']=1
245  wmsplit['RECODR2_2018reHLT_skimJetHT_Offline_HEfail']=1
246  wmsplit['RECODR2_2018reHLT_skimJetHT_Offline_BadHcalMitig']=1
247  wmsplit['RECODR2_2018reHLTAlCaTkCosmics_Offline']=1
248  wmsplit['RECODR2_2018reHLT_skimDisplacedJet_Offline']=1
249  wmsplit['RECODR2_2018reHLT_ZBOffline']=1
250  wmsplit['HLTDR2_50ns']=1
251  wmsplit['HLTDR2_25ns']=1
252  wmsplit['HLTDR2_2016']=1
253  wmsplit['HLTDR2_2017']=1
254  wmsplit['HLTDR2_2018']=1
255  wmsplit['HLTDR2_2018_BadHcalMitig']=1
256  wmsplit['Hadronizer']=1
257  wmsplit['DIGIUP15']=1
258  wmsplit['RECOUP15']=1
259  wmsplit['RECOAODUP15']=5
260  wmsplit['DBLMINIAODMCUP15NODQM']=5
261  wmsplit['DigiFull']=5
262  wmsplit['RecoFull']=5
263  wmsplit['DigiFullPU']=1
264  wmsplit['RecoFullPU']=1
265  wmsplit['RECOHID11']=1
266  wmsplit['DigiFullTriggerPU_2026D17PU'] = 1
267  wmsplit['RecoFullGlobalPU_2026D17PU']=1
268  wmsplit['DIGIUP17']=1
269  wmsplit['RECOUP17']=1
270  wmsplit['DIGIUP17_PU25']=1
271  wmsplit['RECOUP17_PU25']=1
272  wmsplit['DIGICOS_UP16']=1
273  wmsplit['RECOCOS_UP16']=1
274  wmsplit['DIGICOS_UP17']=1
275  wmsplit['RECOCOS_UP17']=1
276  wmsplit['DIGICOS_UP18']=1
277  wmsplit['RECOCOS_UP18']=1
278  wmsplit['DIGICOS_UP21']=1
279  wmsplit['RECOCOS_UP21']=1
280  wmsplit['HYBRIDRepackHI2015VR']=1
281  wmsplit['HYBRIDZSHI2015']=1
282  wmsplit['RECOHID15']=1
283  wmsplit['RECOHID18']=1
284  wmsplit['DigiFullTriggerPU_2026D35PU'] = 1
285  wmsplit['RecoFullGlobalPU_2026D35PU']=1
286  wmsplit['DigiFullTriggerPU_2026D41PU'] = 1
287  wmsplit['RecoFullGlobalPU_2026D41PU']=1
288  wmsplit['DigiFullTriggerPU_2026D43PU'] = 1
289  wmsplit['RecoFullGlobalPU_2026D43PU']=1
290  wmsplit['DigiFullTriggerPU_2026D44PU'] = 1
291  wmsplit['RecoFullGlobalPU_2026D44PU']=1
292  wmsplit['DigiFullTriggerPU_2026D45PU'] = 1
293  wmsplit['RecoFullGlobalPU_2026D45PU']=1
294  wmsplit['DigiFullTriggerPU_2026D46PU'] = 1
295  wmsplit['RecoFullGlobalPU_2026D46PU']=1
296  wmsplit['DigiFullTriggerPU_2026D47PU'] = 1
297  wmsplit['RecoFullGlobalPU_2026D47PU']=1
298  wmsplit['DigiFullTriggerPU_2026D48PU'] = 1
299  wmsplit['RecoFullGlobalPU_2026D48PU']=1
300  wmsplit['DigiFullTriggerPU_2026D49PU'] = 1
301  wmsplit['RecoFullGlobalPU_2026D49PU']=1
302  wmsplit['DigiFullTriggerPU_2026D50PU'] = 1
303  wmsplit['RecoFullGlobalPU_2026D50PU']=1
304  wmsplit['DigiFullTriggerPU_2026D51PU'] = 1
305  wmsplit['RecoFullGlobalPU_2026D51PU']=1
306  wmsplit['DigiFullTriggerPU_2026D52PU'] = 1
307  wmsplit['RecoFullGlobalPU_2026D52PU']=1
308  wmsplit['DigiFullTriggerPU_2026D57PU'] = 1
309  wmsplit['RecoFullGlobalPU_2026D57PU']=1
310  wmsplit['DigiFullTriggerPU_2026D58PU'] = 1
311  wmsplit['RecoFullGlobalPU_2026D58PU']=1
312 
313  #import pprint
314  #pprint.pprint(wmsplit)
315  except:
316  print("Not set up for step splitting")
317  wmsplit={}
318 
319  acqEra=False
320  for (n,dir) in directories.items():
321  chainDict=copy.deepcopy(self.defaultChain)
322  print("inspecting",dir)
323  nextHasDSInput=None
324  for (x,s) in mReader.workFlowSteps.items():
325  #x has the format (num, prefix)
326  #s has the format (num, name, commands, stepList)
327  if x[0]==n:
328  #print "found",n,s[3]
329  #chainDict['RequestString']='RV'+chainDict['CMSSWVersion']+s[1].split('+')[0]
330  index=0
331  splitForThisWf=None
332  thisLabel=self.speciallabel
333  #if 'HARVESTGEN' in s[3]:
334  if len( [step for step in s[3] if "HARVESTGEN" in step] )>0:
335  chainDict['TimePerEvent']=0.01
336  thisLabel=thisLabel+"_gen"
337  # for double miniAOD test
338  if len( [step for step in s[3] if "DBLMINIAODMCUP15NODQM" in step] )>0:
339  thisLabel=thisLabel+"_dblMiniAOD"
340  processStrPrefix=''
341  setPrimaryDs=None
342  nanoedmGT=''
343  for step in s[3]:
344 
345  if 'INPUT' in step or (not isinstance(s[2][index],str)):
346  nextHasDSInput=s[2][index]
347 
348  else:
349 
350  if (index==0):
351  #first step and not input -> gen part
352  chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultScratch))
353  try:
354  chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
355  except:
356  print("Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created")
357  return -15
358 
359  chainDict['nowmTasklist'][-1]['PrimaryDataset']='RelVal'+s[1].split('+')[0]
360  if not '--relval' in s[2][index]:
361  print('Impossible to create task from scratch without splitting information with --relval')
362  return -12
363  else:
364  arg=s[2][index].split()
365  ns=list(map(int,arg[len(arg) - arg[-1::-1].index('--relval')].split(',')))
366  chainDict['nowmTasklist'][-1]['RequestNumEvents'] = ns[0]
367  chainDict['nowmTasklist'][-1]['EventsPerJob'] = ns[1]
368  chainDict['nowmTasklist'][-1]['EventsPerLumi'] = ns[1]
369  #overwrite EventsPerLumi if numberEventsInLuminosityBlock is set in cmsDriver
370  if 'numberEventsInLuminosityBlock' in s[2][index]:
371  nEventsInLuminosityBlock = re.findall('process.source.numberEventsInLuminosityBlock=cms.untracked.uint32\(([ 0-9 ]*)\)', s[2][index],re.DOTALL)
372  if nEventsInLuminosityBlock[-1].isdigit() and int(nEventsInLuminosityBlock[-1]) < ns[1]:
373  chainDict['nowmTasklist'][-1]['EventsPerLumi'] = int(nEventsInLuminosityBlock[-1])
374  if(self.numberEventsInLuminosityBlock > 0 and self.numberEventsInLuminosityBlock <= ns[1]):
375  chainDict['nowmTasklist'][-1]['EventsPerLumi'] = self.numberEventsInLuminosityBlock
376  if 'FASTSIM' in s[2][index] or '--fast' in s[2][index]:
377  thisLabel+='_FastSim'
378  if 'lhe' in s[2][index] in s[2][index]:
379  chainDict['nowmTasklist'][-1]['LheInputFiles'] =True
380 
381  elif nextHasDSInput:
382  chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultInput))
383  try:
384  chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
385  except:
386  print("Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created")
387  return -15
388  chainDict['nowmTasklist'][-1]['InputDataset']=nextHasDSInput.dataSet
389  if ('DQMHLTonRAWAOD' in step) :
390  chainDict['nowmTasklist'][-1]['IncludeParents']=True
391  splitForThisWf=nextHasDSInput.split
392  chainDict['nowmTasklist'][-1]['LumisPerJob']=splitForThisWf
393  if step in wmsplit:
394  chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit[step]
395  # get the run numbers or #events
396  if len(nextHasDSInput.run):
397  chainDict['nowmTasklist'][-1]['RunWhitelist']=nextHasDSInput.run
398  if len(nextHasDSInput.ls):
399  chainDict['nowmTasklist'][-1]['LumiList']=nextHasDSInput.ls
400  #print "what is s",s[2][index]
401  if '--data' in s[2][index] and nextHasDSInput.label:
402  thisLabel+='_RelVal_%s'%nextHasDSInput.label
403  if 'filter' in chainDict['nowmTasklist'][-1]['nowmIO']:
404  print("This has an input DS and a filter sequence: very likely to be the PyQuen sample")
405  processStrPrefix='PU_'
406  setPrimaryDs = 'RelVal'+s[1].split('+')[0]
407  if setPrimaryDs:
408  chainDict['nowmTasklist'][-1]['PrimaryDataset']=setPrimaryDs
409  nextHasDSInput=None
410  else:
411  #not first step and no inputDS
412  chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultTask))
413  try:
414  chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
415  except:
416  print("Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created")
417  return -15
418  if splitForThisWf:
419  chainDict['nowmTasklist'][-1]['LumisPerJob']=splitForThisWf
420  if step in wmsplit:
421  chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit[step]
422 
423  # change LumisPerJob for Hadronizer steps.
424  if 'Hadronizer' in step:
425  chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit['Hadronizer']
426 
427  #print step
428  chainDict['nowmTasklist'][-1]['TaskName']=step
429  if setPrimaryDs:
430  chainDict['nowmTasklist'][-1]['PrimaryDataset']=setPrimaryDs
431  chainDict['nowmTasklist'][-1]['ConfigCacheID']='%s/%s.py'%(dir,step)
432  chainDict['nowmTasklist'][-1]['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT'] # copy to the proper parameter name
433  chainDict['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT'] #set in general to the last one of the chain
434  if 'NANOEDM' in step :
435  nanoedmGT = chainDict['nowmTasklist'][-1]['nowmIO']['GT']
436  if 'NANOMERGE' in step :
437  chainDict['GlobalTag'] = nanoedmGT
438  if 'pileup' in chainDict['nowmTasklist'][-1]['nowmIO']:
439  chainDict['nowmTasklist'][-1]['MCPileup']=chainDict['nowmTasklist'][-1]['nowmIO']['pileup']
440  if '--pileup ' in s[2][index]: # catch --pileup (scenarion) and not --pileup_ (dataset to be mixed) => works also making PRE-MIXed dataset
441  processStrPrefix='PU_' # take care of pu overlay done with GEN-SIM mixing
442  if ( s[2][index].split()[ s[2][index].split().index('--pileup')+1 ] ).find('25ns') > 0 :
443  processStrPrefix='PU25ns_'
444  elif ( s[2][index].split()[ s[2][index].split().index('--pileup')+1 ] ).find('50ns') > 0 :
445  processStrPrefix='PU50ns_'
446  if 'premix_stage2' in s[2][index] and '--pileup_input' in s[2][index]: # take care of pu overlay done with DIGI mixing of premixed events
447  if s[2][index].split()[ s[2][index].split().index('--pileup_input')+1 ].find('25ns') > 0 :
448  processStrPrefix='PUpmx25ns_'
449  elif s[2][index].split()[ s[2][index].split().index('--pileup_input')+1 ].find('50ns') > 0 :
450  processStrPrefix='PUpmx50ns_'
451 
452  if acqEra:
453  #chainDict['AcquisitionEra'][step]=(chainDict['CMSSWVersion']+'-PU_'+chainDict['nowmTasklist'][-1]['GlobalTag']).replace('::All','')+thisLabel
454  chainDict['AcquisitionEra'][step]=chainDict['CMSSWVersion']
455  chainDict['ProcessingString'][step]=processStrPrefix+chainDict['nowmTasklist'][-1]['GlobalTag'].replace('::All','').replace('-','_')+thisLabel
456  if 'NANOMERGE' in step :
457  chainDict['ProcessingString'][step]=processStrPrefix+nanoedmGT.replace('::All','').replace('-','_')+thisLabel
458  else:
459  #chainDict['nowmTasklist'][-1]['AcquisitionEra']=(chainDict['CMSSWVersion']+'-PU_'+chainDict['nowmTasklist'][-1]['GlobalTag']).replace('::All','')+thisLabel
460  chainDict['nowmTasklist'][-1]['AcquisitionEra']=chainDict['CMSSWVersion']
461  chainDict['nowmTasklist'][-1]['ProcessingString']=processStrPrefix+chainDict['nowmTasklist'][-1]['GlobalTag'].replace('::All','').replace('-','_')+thisLabel
462  if 'NANOMERGE' in step :
463  chainDict['nowmTasklist'][-1]['ProcessingString']=processStrPrefix+nanoedmGT.replace('::All','').replace('-','_')+thisLabel
464 
465  if (self.batchName):
466  chainDict['nowmTasklist'][-1]['Campaign'] = chainDict['nowmTasklist'][-1]['AcquisitionEra']+self.batchName
467 
468  # specify different ProcessingString for double miniAOD dataset
469  if ('DBLMINIAODMCUP15NODQM' in step):
470  chainDict['nowmTasklist'][-1]['ProcessingString']=chainDict['nowmTasklist'][-1]['ProcessingString']+'_miniAOD'
471 
472  if( chainDict['nowmTasklist'][-1]['Multicore'] ):
473  # the scaling factor of 1.2GB / thread is empirical and measured on a SECOND round of tests with PU samples
474  # the number of threads is NO LONGER assumed to be the same for all tasks
475  # https://hypernews.cern.ch/HyperNews/CMS/get/edmFramework/3509/1/1/1.html
476  # now change to 1.5GB / additional thread according to discussion:
477  # https://hypernews.cern.ch/HyperNews/CMS/get/relval/4817/1/1.html
478 # chainDict['nowmTasklist'][-1]['Memory'] = 3000 + int( chainDict['nowmTasklist'][-1]['Multicore'] -1 )*1500
479  chainDict['nowmTasklist'][-1]['Memory'] = self.memoryOffset + int( chainDict['nowmTasklist'][-1]['Multicore'] -1 ) * self.memPerCore
480 
481  index+=1
482  #end of loop through steps
483  chainDict['RequestString']='RV'+chainDict['CMSSWVersion']+s[1].split('+')[0]
484  if processStrPrefix or thisLabel:
485  chainDict['RequestString']+='_'+processStrPrefix+thisLabel
486  #check candidate WF name
487  self.candidateWFName = self.user+'_'+chainDict['RequestString']
488  if (len(self.candidateWFName)>MAXWORKFLOWLENGTH):
490 
491 
492  chainDict['PrepID'] = chainDict['CMSSWVersion']+'__'+self.batchTime+'-'+s[1].split('+')[0]
493  if(self.batchName):
494  chainDict['PrepID'] = chainDict['CMSSWVersion']+self.batchName+'-'+s[1].split('+')[0]
495  if( 'HIN' in self.batchName ):
496  chainDict['SubRequestType'] = "HIRelVal"
497 
498  #wrap up for this one
499  import pprint
500  #print 'wrapping up'
501  #pprint.pprint(chainDict)
502  #loop on the task list
503  for i_second in reversed(range(len(chainDict['nowmTasklist']))):
504  t_second=chainDict['nowmTasklist'][i_second]
505  #print "t_second taskname", t_second['TaskName']
506  if 'primary' in t_second['nowmIO']:
507  #print t_second['nowmIO']['primary']
508  primary=t_second['nowmIO']['primary'][0].replace('file:','')
509  for i_input in reversed(range(0,i_second)):
510  t_input=chainDict['nowmTasklist'][i_input]
511  for (om,o) in t_input['nowmIO'].items():
512  if primary in o:
513  #print "found",primary,"procuced by",om,"of",t_input['TaskName']
514  #ad-hoc fix due to restriction in TaskName of 50 characters
515  if (len(t_input['TaskName'])>50):
516  if (t_input['TaskName'].find('GenSim') != -1):
517  t_input['TaskName'] = 'GenSimFull'
518  if (t_input['TaskName'].find('Hadronizer') != -1):
519  t_input['TaskName'] = 'HadronizerFull'
520  t_second['InputTask'] = t_input['TaskName']
521  t_second['InputFromOutputModule'] = om
522  #print 't_second',pprint.pformat(t_second)
523  if t_second['TaskName'].startswith('HARVEST'):
524  chainDict.update(copy.deepcopy(self.defaultHarvest))
525  if "_RD" in t_second['TaskName']:
526  chainDict['DQMHarvestUnit'] = "multiRun"
527  chainDict['DQMConfigCacheID']=t_second['ConfigCacheID']
528 
531  break
532 
533  # agreed changes for wm injection:
534  # - Campaign: *optional* string during creation. It will default to AcqEra value if possible.
535  # Otherwise it will be empty.
536  # - AcquisitionEra: *mandatory* string at request level during creation. *optional* string
537  # at task level during creation. "optional" during assignment.
538  # - ProcessingString: *mandatory* string at request level during creation. *optional* string
539  # at task level during creation. "optional" during assignment.
540  # - ProcessingVersion: *optional* during creation (default 1). *optional* during assignment.
541  #
542  # Which requires following changes here:
543  # - reset Global AcuisitionEra, ProcessingString to be the one in the first task
544  # - and also Campaign to be always the same as the AcquisitionEra
545 
546  if acqEra:
547  chainDict['AcquisitionEra'] = chainDict['AcquisitionEra'].values()[0]
548  chainDict['ProcessingString'] = chainDict['ProcessingString'].values()[0]
549  else:
550  chainDict['AcquisitionEra'] = chainDict['nowmTasklist'][0]['AcquisitionEra']
551  chainDict['ProcessingString'] = chainDict['nowmTasklist'][0]['ProcessingString']
552 
553 
555  chainDict['Campaign'] = chainDict['AcquisitionEra']+self.batchName
556 
557 
558  itask=0
559  if self.keep:
560  for i in self.keep:
561  if isinstance(i, int) and i < len(chainDict['nowmTasklist']):
562  chainDict['nowmTasklist'][i]['KeepOutput']=True
563  for (i,t) in enumerate(chainDict['nowmTasklist']):
564  if t['TaskName'].startswith('HARVEST'):
565  continue
566  if not self.keep:
567  t['KeepOutput']=True
568  elif t['TaskName'] in self.keep:
569  t['KeepOutput']=True
570  if t['TaskName'].startswith('HYBRIDRepackHI2015VR'):
571  t['KeepOutput']=False
572  t.pop('nowmIO')
573  itask+=1
574  chainDict['Task%d'%(itask)]=t
575 
576 
577 
578 
579 
580 
581  chainDict['TaskChain']=itask#len(chainDict['nowmTasklist'])
582 
583  chainDict.pop('nowmTasklist')
584  self.chainDicts[n]=chainDict
585 
586 
587  return 0
588 
589  def uploadConf(self,filePath,label,where):
590  labelInCouch=self.label+'_'+label
591  cacheName=filePath.split('/')[-1]
592  if self.testMode:
593  self.count+=1
594  print('\tFake upload of',filePath,'to couch with label',labelInCouch)
595  return self.count
596  else:
597  try:
598  from modules.wma import upload_to_couch,DATABASE_NAME
599  except:
600  print('\n\tUnable to find wmcontrol modules. Please include it in your python path\n')
601  print('\n\t QUIT\n')
602  sys.exit(-16)
603 
604  if cacheName in self.couchCache:
605  print("Not re-uploading",filePath,"to",where,"for",label)
606  cacheId=self.couchCache[cacheName]
607  else:
608  print("Loading",filePath,"to",where,"for",label)
609 
610  pool = multiprocessing.Pool(1)
611  cacheIds = pool.map( upload_to_couch_oneArg, [(filePath,labelInCouch,self.user,self.group,where)] )
612  cacheId = cacheIds[0]
613  self.couchCache[cacheName]=cacheId
614  return cacheId
615 
616  def upload(self):
617  for (n,d) in self.chainDicts.items():
618  for it in d:
619  if it.startswith("Task") and it!='TaskChain':
620  #upload
621  couchID=self.uploadConf(d[it]['ConfigCacheID'],
622  str(n)+d[it]['TaskName'],
623  d['ConfigCacheUrl']
624  )
625  print(d[it]['ConfigCacheID']," uploaded to couchDB for",str(n),"with ID",couchID)
626  d[it]['ConfigCacheID']=couchID
627  if it =='DQMConfigCacheID':
628  couchID=self.uploadConf(d['DQMConfigCacheID'],
629  str(n)+'harvesting',
630  d['ConfigCacheUrl']
631  )
632  print(d['DQMConfigCacheID'],"uploaded to couchDB for",str(n),"with ID",couchID)
633  d['DQMConfigCacheID']=couchID
634 
635 
636  def submit(self):
637  try:
638  from modules.wma import makeRequest,approveRequest
639  from wmcontrol import random_sleep
640  print('\n\tFound wmcontrol\n')
641  except:
642  print('\n\tUnable to find wmcontrol modules. Please include it in your python path\n')
643  if not self.testMode:
644  print('\n\t QUIT\n')
645  sys.exit(-17)
646 
647  import pprint
648  for (n,d) in self.chainDicts.items():
649  if self.testMode:
650  print("Only viewing request",n)
651  print(pprint.pprint(d))
652  else:
653  #submit to wmagent each dict
654  print("For eyes before submitting",n)
655  print(pprint.pprint(d))
656  print("Submitting",n,"...........")
657  workFlow=makeRequest(self.wmagent,d,encodeDict=True)
658  print("...........",n,"submitted")
659  random_sleep()
660  if self.testMode and len(self.longWFName)>0:
661  print("\n*** WARNING: "+str(len(self.longWFName))+" workflows have too long names for submission (>"+str(MAXWORKFLOWLENGTH)+ "characters) ***")
662  print('\n'.join(self.longWFName))
MatrixInjector.MatrixInjector.upload
def upload(self)
Definition: MatrixInjector.py:616
MatrixInjector.MatrixInjector.longWFName
longWFName
Definition: MatrixInjector.py:88
FastTimerService_cff.range
range
Definition: FastTimerService_cff.py:34
resolutioncreator_cfi.object
object
Definition: resolutioncreator_cfi.py:4
MatrixInjector.MatrixInjector
Definition: MatrixInjector.py:41
MatrixInjector.MatrixInjector.defaultInput
defaultInput
Definition: MatrixInjector.py:143
MatrixInjector.MatrixInjector.keep
keep
Definition: MatrixInjector.py:56
MatrixInjector.MatrixInjector.memoryOffset
memoryOffset
Definition: MatrixInjector.py:57
MatrixInjector.MatrixInjector.couch
couch
Definition: MatrixInjector.py:79
join
static std::string join(char **cmd)
Definition: RemoteFile.cc:17
MatrixInjector.MatrixInjector.DbsUrl
DbsUrl
Definition: MatrixInjector.py:71
MatrixInjector.MatrixInjector.couchCache
couchCache
Definition: MatrixInjector.py:81
cms::dd::split
std::vector< std::string_view > split(std::string_view, const char *)
spr::find
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:19
MatrixInjector.upload_to_couch_oneArg
def upload_to_couch_oneArg(arguments)
Definition: MatrixInjector.py:29
mps_monitormerge.items
list items
Definition: mps_monitormerge.py:29
MatrixInjector.MatrixInjector.wmagent
wmagent
Definition: MatrixInjector.py:47
MatrixInjector.MatrixInjector.batchTime
batchTime
Definition: MatrixInjector.py:61
MatrixInjector.MatrixInjector.candidateWFName
candidateWFName
Definition: MatrixInjector.py:487
MatrixInjector.MatrixInjector.speciallabel
speciallabel
Definition: MatrixInjector.py:85
contentValuesCheck.values
values
Definition: contentValuesCheck.py:38
MatrixInjector.MatrixInjector.uploadConf
def uploadConf(self, filePath, label, where)
the info are not in the task specific dict but in the general dict t_input.update(copy....
Definition: MatrixInjector.py:589
str
#define str(s)
Definition: TestProcessor.cc:48
MatrixInjector.MatrixInjector.defaultHarvest
defaultHarvest
Definition: MatrixInjector.py:122
MatrixInjector.MatrixInjector.prepare
def prepare(self, mReader, directories, mode='init')
Definition: MatrixInjector.py:170
MatrixInjector.MatrixInjector.group
group
Definition: MatrixInjector.py:83
MatrixInjector.MatrixInjector.chainDicts
chainDicts
Definition: MatrixInjector.py:167
MatrixInjector.MatrixInjector.defaultTask
defaultTask
Definition: MatrixInjector.py:154
MatrixInjector.MatrixInjector.testMode
testMode
Definition: MatrixInjector.py:54
mps_setup.append
append
Definition: mps_setup.py:85
MatrixInjector.MatrixInjector.batchName
batchName
Definition: MatrixInjector.py:60
MatrixInjector.MatrixInjector.count
count
Definition: MatrixInjector.py:44
MatrixInjector.MatrixInjector.defaultScratch
defaultScratch
Definition: MatrixInjector.py:129
createfilelist.int
int
Definition: createfilelist.py:10
MatrixInjector.MatrixInjector.memPerCore
memPerCore
Definition: MatrixInjector.py:58
MatrixInjector.MatrixInjector.__init__
def __init__(self, opt, mode='init', options='')
Definition: MatrixInjector.py:43
MatrixInjector.performInjectionOptionTest
def performInjectionOptionTest(opt)
Definition: MatrixInjector.py:12
MatrixInjector.MatrixInjector.submit
def submit(self)
Definition: MatrixInjector.py:636
edm::print
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
readEcalDQMStatus.read
read
Definition: readEcalDQMStatus.py:38
MatrixInjector.MatrixInjector.label
label
Definition: MatrixInjector.py:84
MatrixInjector.MatrixInjector.numberEventsInLuminosityBlock
numberEventsInLuminosityBlock
Definition: MatrixInjector.py:59
AlignmentPI::index
index
Definition: AlignmentPayloadInspectorHelper.h:46
list
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger list("!*", "!HLTx*" if it matches 2 triggers or more) will accept the event if all the matching triggers are FAIL. It will reject the event if any of the triggers are PASS or EXCEPTION(this matches the behavior of "!*" before the partial wildcard feature was incorporated). Triggers which are in the READY state are completely ignored.(READY should never be returned since the trigger paths have been run
genParticles_cff.map
map
Definition: genParticles_cff.py:11
MatrixInjector.MatrixInjector.version
version
Definition: MatrixInjector.py:55
MatrixInjector.MatrixInjector.user
user
Definition: MatrixInjector.py:82
MatrixInjector.MatrixInjector.defaultChain
defaultChain
Definition: MatrixInjector.py:98
MatrixInjector.MatrixInjector.dqmgui
dqmgui
Definition: MatrixInjector.py:46
python.rootplot.root2matplotlib.replace
def replace(string, replacements)
Definition: root2matplotlib.py:444