CMS 3D CMS Logo

MatrixInjector.py
Go to the documentation of this file.
1 from __future__ import print_function
2 import sys
3 import json
4 import os
5 import copy
6 import multiprocessing
7 import time
8 import re
9 
10 MAXWORKFLOWLENGTH = 81
11 
13  if opt.show:
14  print('Not injecting to wmagent in --show mode. Need to run the worklfows.')
15  sys.exit(-1)
16  if opt.wmcontrol=='init':
17  #init means it'll be in test mode
18  opt.nProcs=0
19  if opt.wmcontrol=='test':
20  #means the wf were created already, and we just dryRun it.
21  opt.dryRun=True
22  if opt.wmcontrol=='submit' and opt.nProcs==0:
23  print('Not injecting to wmagent in -j 0 mode. Need to run the worklfows.')
24  sys.exit(-1)
25  if opt.wmcontrol=='force':
26  print("This is an expert setting, you'd better know what you're doing")
27  opt.dryRun=True
28 
29 def upload_to_couch_oneArg(arguments):
30  from modules.wma import upload_to_couch
31  (filePath,labelInCouch,user,group,where) = arguments
32  cacheId=upload_to_couch(filePath,
33  labelInCouch,
34  user,
35  group,
36  test_mode=False,
37  url=where)
38  return cacheId
39 
40 
42 
43  def __init__(self,opt,mode='init',options=''):
44  self.count=1040
45 
46  self.dqmgui=None
47  self.wmagent=None
48  for k in options.split(','):
49  if k.startswith('dqm:'):
50  self.dqmgui=k.split(':',1)[-1]
51  elif k.startswith('wma:'):
52  self.wmagent=k.split(':',1)[-1]
53 
54  self.testMode=((mode!='submit') and (mode!='force'))
55  self.version =1
56  self.keep = opt.keep
57  self.memoryOffset = opt.memoryOffset
58  self.memPerCore = opt.memPerCore
59  self.numberEventsInLuminosityBlock = opt.numberEventsInLuminosityBlock
60  self.numberOfStreams = 0
61  if(opt.nStreams>0):
62  self.numberOfStreams = opt.nStreams
63  self.batchName = ''
64  self.batchTime = str(int(time.time()))
65  if(opt.batchName):
66  self.batchName = '__'+opt.batchName+'-'+self.batchTime
67 
68 
72  self.RequiresGPU = opt.gpu
73  self.GPUMemoryMB = opt.GPUMemoryMB
74  self.CUDACapabilities = opt.CUDACapabilities
75  self.CUDARuntime = opt.CUDARuntime
76  # optional
77  self.GPUName = opt.GPUName
78  self.CUDADriverVersion = opt.CUDADriverVersion
79  self.CUDARuntimeVersion = opt.CUDARuntimeVersion
80 
81  # WMagent url
82  if not self.wmagent:
83  # Overwrite with env variable
84  self.wmagent = os.getenv('WMAGENT_REQMGR')
85 
86  if not self.wmagent:
87  # Default values
88  if not opt.testbed:
89  self.wmagent = 'cmsweb.cern.ch'
90  else:
91  self.wmagent = 'cmsweb-testbed.cern.ch'
92 
93  # DBSReader url
94  if opt.dbsUrl is not None:
95  self.DbsUrl = opt.dbsUrl
96  elif os.getenv('CMS_DBSREADER_URL') is not None:
97  self.DbsUrl = os.getenv('CMS_DBSREADER_URL')
98  else:
99  # Default values
100  if not opt.testbed:
101  self.DbsUrl = "https://cmsweb-prod.cern.ch/dbs/prod/global/DBSReader"
102  else:
103  self.DbsUrl = "https://cmsweb-testbed.cern.ch/dbs/int/global/DBSReader"
104 
105  if not self.dqmgui:
106  self.dqmgui="https://cmsweb.cern.ch/dqm/relval"
107  #couch stuff
108  self.couch = 'https://'+self.wmagent+'/couchdb'
109 # self.couchDB = 'reqmgr_config_cache'
110  self.couchCache={} # so that we do not upload like crazy, and recyle cfgs
111  self.user = os.getenv('USER')
112  self.group = 'ppd'
113  self.label = 'RelValSet_'+os.getenv('CMSSW_VERSION').replace('-','')+'_v'+str(self.version)
114  self.speciallabel=''
115  if opt.label:
116  self.speciallabel= '_'+opt.label
117  self.longWFName = []
118 
119  if not os.getenv('WMCORE_ROOT'):
120  print('\n\twmclient is not setup properly. Will not be able to upload or submit requests.\n')
121  if not self.testMode:
122  print('\n\t QUIT\n')
123  sys.exit(-18)
124  else:
125  print('\n\tFound wmclient\n')
126 
128  "RequestType" : "TaskChain", #this is how we handle relvals
129  "SubRequestType" : "RelVal", #this is how we handle relvals, now that TaskChain is also used for central MC production
130  "RequestPriority": 500000,
131  "Requestor": self.user, #Person responsible
132  "Group": self.group, #group for the request
133  "CMSSWVersion": os.getenv('CMSSW_VERSION'), #CMSSW Version (used for all tasks in chain)
134  "Campaign": os.getenv('CMSSW_VERSION'), # = AcquisitionEra, will be reset later to the one of first task, will both be the CMSSW_VERSION
135  "ScramArch": os.getenv('SCRAM_ARCH'), #Scram Arch (used for all tasks in chain)
136  "ProcessingVersion": self.version, #Processing Version (used for all tasks in chain)
137  "GlobalTag": None, #Global Tag (overridden per task)
138  "ConfigCacheUrl": self.couch, #URL of CouchDB containing Config Cache
139  "DbsUrl": self.DbsUrl,
140  #- Will contain all configs for all Tasks
141  #"SiteWhitelist" : ["T2_CH_CERN", "T1_US_FNAL"], #Site whitelist
142  "TaskChain" : None, #Define number of tasks in chain.
143  "nowmTasklist" : [], #a list of tasks as we put them in
144  "Multicore" : 1, # do not set multicore for the whole chain
145  "Memory" : 3000,
146  "SizePerEvent" : 1234,
147  "TimePerEvent" : 10,
148  "PrepID": os.getenv('CMSSW_VERSION')
149  }
150 
152  "EnableHarvesting" : "True",
153  "DQMUploadUrl" : self.dqmgui,
154  "DQMConfigCacheID" : None,
155  "Multicore" : 1 # hardcode Multicore to be 1 for Harvest
156  }
157 
159  "TaskName" : None, #Task Name
160  "ConfigCacheID" : None, #Generator Config id
161  "GlobalTag": None,
162  "SplittingAlgo" : "EventBased", #Splitting Algorithm
163  "EventsPerJob" : None, #Size of jobs in terms of splitting algorithm
164  "EventsPerLumi" : None,
165  "RequestNumEvents" : None, #Total number of events to generate
166  "Seeding" : "AutomaticSeeding", #Random seeding method
167  "PrimaryDataset" : None, #Primary Dataset to be created
168  "nowmIO": {},
169  "Multicore" : opt.nThreads, # this is the per-taskchain Multicore; it's the default assigned to a task if it has no value specified
170  "EventStreams": self.numberOfStreams,
171  "KeepOutput" : False
172  }
174  "TaskName" : "DigiHLT", #Task Name
175  "ConfigCacheID" : None, #Processing Config id
176  "GlobalTag": None,
177  "InputDataset" : None, #Input Dataset to be processed
178  "SplittingAlgo" : "LumiBased", #Splitting Algorithm
179  "LumisPerJob" : 10, #Size of jobs in terms of splitting algorithm
180  "nowmIO": {},
181  "Multicore" : opt.nThreads, # this is the per-taskchain Multicore; it's the default assigned to a task if it has no value specified
182  "EventStreams": self.numberOfStreams,
183  "KeepOutput" : False
184  }
185  self.defaultTask={
186  "TaskName" : None, #Task Name
187  "InputTask" : None, #Input Task Name (Task Name field of a previous Task entry)
188  "InputFromOutputModule" : None, #OutputModule name in the input task that will provide files to process
189  "ConfigCacheID" : None, #Processing Config id
190  "GlobalTag": None,
191  "SplittingAlgo" : "LumiBased", #Splitting Algorithm
192  "LumisPerJob" : 10, #Size of jobs in terms of splitting algorithm
193  "nowmIO": {},
194  "Multicore" : opt.nThreads, # this is the per-taskchain Multicore; it's the default assigned to a task if it has no value specified
195  "EventStreams": self.numberOfStreams,
196  "KeepOutput" : False,
197  "RequiresGPU" : None,
198  "GPUParams": None
199  }
201  "GPUMemoryMB": self.GPUMemoryMB,
202  "CUDACapabilities": self.CUDACapabilities,
203  "CUDARuntime": self.CUDARuntime
204  }
205  if self.GPUName: self.defaultGPUParams.update({"GPUName": self.GPUName})
206  if self.CUDADriverVersion: self.defaultGPUParams.update({"CUDADriverVersion": self.CUDADriverVersion})
207  if self.CUDARuntimeVersion: self.defaultGPUParams.update({"CUDARuntimeVersion": self.CUDARuntimeVersion})
208 
209  self.chainDicts={}
210 
211  @staticmethod
212  def get_wmsplit():
213  """
214  Return a "wmsplit" dictionary that contain non-default LumisPerJob values
215  """
216  wmsplit = {}
217  try:
218  wmsplit['DIGIHI'] = 5
219  wmsplit['RECOHI'] = 5
220  wmsplit['HLTD'] = 5
221  wmsplit['RECODreHLT'] = 2
222  wmsplit['DIGIPU'] = 4
223  wmsplit['DIGIPU1'] = 4
224  wmsplit['RECOPU1'] = 1
225  wmsplit['DIGIUP15_PU50'] = 1
226  wmsplit['RECOUP15_PU50'] = 1
227  wmsplit['DIGIUP15_PU25'] = 1
228  wmsplit['RECOUP15_PU25'] = 1
229  wmsplit['DIGIUP15_PU25HS'] = 1
230  wmsplit['RECOUP15_PU25HS'] = 1
231  wmsplit['DIGIHIMIX'] = 5
232  wmsplit['RECOHIMIX'] = 5
233  wmsplit['RECODSplit'] = 1
234  wmsplit['SingleMuPt10_UP15_ID'] = 1
235  wmsplit['DIGIUP15_ID'] = 1
236  wmsplit['RECOUP15_ID'] = 1
237  wmsplit['TTbar_13_ID'] = 1
238  wmsplit['SingleMuPt10FS_ID'] = 1
239  wmsplit['TTbarFS_ID'] = 1
240  wmsplit['RECODR2_50nsreHLT'] = 5
241  wmsplit['RECODR2_25nsreHLT'] = 5
242  wmsplit['RECODR2_2016reHLT'] = 5
243  wmsplit['RECODR2_50nsreHLT_HIPM'] = 5
244  wmsplit['RECODR2_25nsreHLT_HIPM'] = 5
245  wmsplit['RECODR2_2016reHLT_HIPM'] = 1
246  wmsplit['RECODR2_2016reHLT_skimSingleMu'] = 1
247  wmsplit['RECODR2_2016reHLT_skimDoubleEG'] = 1
248  wmsplit['RECODR2_2016reHLT_skimMuonEG'] = 1
249  wmsplit['RECODR2_2016reHLT_skimJetHT'] = 1
250  wmsplit['RECODR2_2016reHLT_skimMET'] = 1
251  wmsplit['RECODR2_2016reHLT_skimSinglePh'] = 1
252  wmsplit['RECODR2_2016reHLT_skimMuOnia'] = 1
253  wmsplit['RECODR2_2016reHLT_skimSingleMu_HIPM'] = 1
254  wmsplit['RECODR2_2016reHLT_skimDoubleEG_HIPM'] = 1
255  wmsplit['RECODR2_2016reHLT_skimMuonEG_HIPM'] = 1
256  wmsplit['RECODR2_2016reHLT_skimJetHT_HIPM'] = 1
257  wmsplit['RECODR2_2016reHLT_skimMET_HIPM'] = 1
258  wmsplit['RECODR2_2016reHLT_skimSinglePh_HIPM'] = 1
259  wmsplit['RECODR2_2016reHLT_skimMuOnia_HIPM'] = 1
260  wmsplit['RECODR2_2017reHLT_Prompt'] = 1
261  wmsplit['RECODR2_2017reHLT_skimSingleMu_Prompt_Lumi'] = 1
262  wmsplit['RECODR2_2017reHLT_skimDoubleEG_Prompt'] = 1
263  wmsplit['RECODR2_2017reHLT_skimMET_Prompt'] = 1
264  wmsplit['RECODR2_2017reHLT_skimMuOnia_Prompt'] = 1
265  wmsplit['RECODR2_2017reHLT_Prompt_L1TEgDQM'] = 1
266  wmsplit['RECODR2_2018reHLT_Prompt'] = 1
267  wmsplit['RECODR2_2018reHLT_skimSingleMu_Prompt_Lumi'] = 1
268  wmsplit['RECODR2_2018reHLT_skimDoubleEG_Prompt'] = 1
269  wmsplit['RECODR2_2018reHLT_skimJetHT_Prompt'] = 1
270  wmsplit['RECODR2_2018reHLT_skimMET_Prompt'] = 1
271  wmsplit['RECODR2_2018reHLT_skimMuOnia_Prompt'] = 1
272  wmsplit['RECODR2_2018reHLT_skimEGamma_Prompt_L1TEgDQM'] = 1
273  wmsplit['RECODR2_2018reHLT_skimMuonEG_Prompt'] = 1
274  wmsplit['RECODR2_2018reHLT_skimCharmonium_Prompt'] = 1
275  wmsplit['RECODR2_2018reHLT_skimJetHT_Prompt_HEfail'] = 1
276  wmsplit['RECODR2_2018reHLT_skimJetHT_Prompt_BadHcalMitig'] = 1
277  wmsplit['RECODR2_2018reHLTAlCaTkCosmics_Prompt'] = 1
278  wmsplit['RECODR2_2018reHLT_skimDisplacedJet_Prompt'] = 1
279  wmsplit['RECODR2_2018reHLT_ZBPrompt'] = 1
280  wmsplit['RECODR2_2018reHLT_Offline'] = 1
281  wmsplit['RECODR2_2018reHLT_skimSingleMu_Offline_Lumi'] = 1
282  wmsplit['RECODR2_2018reHLT_skimDoubleEG_Offline'] = 1
283  wmsplit['RECODR2_2018reHLT_skimJetHT_Offline'] = 1
284  wmsplit['RECODR2_2018reHLT_skimMET_Offline'] = 1
285  wmsplit['RECODR2_2018reHLT_skimMuOnia_Offline'] = 1
286  wmsplit['RECODR2_2018reHLT_skimEGamma_Offline_L1TEgDQM'] = 1
287  wmsplit['RECODR2_2018reHLT_skimMuonEG_Offline'] = 1
288  wmsplit['RECODR2_2018reHLT_skimCharmonium_Offline'] = 1
289  wmsplit['RECODR2_2018reHLT_skimJetHT_Offline_HEfail'] = 1
290  wmsplit['RECODR2_2018reHLT_skimJetHT_Offline_BadHcalMitig'] = 1
291  wmsplit['RECODR2_2018reHLTAlCaTkCosmics_Offline'] = 1
292  wmsplit['RECODR2_2018reHLT_skimDisplacedJet_Offline'] = 1
293  wmsplit['RECODR2_2018reHLT_ZBOffline'] = 1
294  wmsplit['HLTDR2_50ns'] = 1
295  wmsplit['HLTDR2_25ns'] = 1
296  wmsplit['HLTDR2_2016'] = 1
297  wmsplit['HLTDR2_2017'] = 1
298  wmsplit['HLTDR2_2018'] = 1
299  wmsplit['HLTDR2_2018_BadHcalMitig'] = 1
300  wmsplit['Hadronizer'] = 1
301  wmsplit['DIGIUP15'] = 1
302  wmsplit['RECOUP15'] = 1
303  wmsplit['RECOAODUP15'] = 5
304  wmsplit['DBLMINIAODMCUP15NODQM'] = 5
305  wmsplit['Digi'] = 5
306  wmsplit['Reco'] = 5
307  wmsplit['DigiPU'] = 1
308  wmsplit['RecoPU'] = 1
309  wmsplit['RECOHID11'] = 1
310  wmsplit['DIGIUP17'] = 1
311  wmsplit['RECOUP17'] = 1
312  wmsplit['DIGIUP17_PU25'] = 1
313  wmsplit['RECOUP17_PU25'] = 1
314  wmsplit['DIGICOS_UP16'] = 1
315  wmsplit['RECOCOS_UP16'] = 1
316  wmsplit['DIGICOS_UP17'] = 1
317  wmsplit['RECOCOS_UP17'] = 1
318  wmsplit['DIGICOS_UP18'] = 1
319  wmsplit['RECOCOS_UP18'] = 1
320  wmsplit['DIGICOS_UP21'] = 1
321  wmsplit['RECOCOS_UP21'] = 1
322  wmsplit['HYBRIDRepackHI2015VR'] = 1
323  wmsplit['HYBRIDZSHI2015'] = 1
324  wmsplit['RECOHID15'] = 1
325  wmsplit['RECOHID18'] = 1
326  wmsplit['HLTDR3_2023'] = 1
327  wmsplit['RECONANORUN3_reHLT'] = 1
328  wmsplit['HARVESTRUN3'] = 1
329  # automate for phase 2
330  from .upgradeWorkflowComponents import upgradeKeys
331  for key in upgradeKeys[2026]:
332  if 'PU' not in key:
333  continue
334 
335  wmsplit['DigiTriggerPU_' + key] = 1
336  wmsplit['RecoGlobalPU_' + key] = 1
337 
338  except Exception as ex:
339  print('Exception while building a wmsplit dictionary: %s' % (str(ex)))
340  return {}
341 
342  return wmsplit
343 
344  def prepare(self, mReader, directories, mode='init'):
345  wmsplit = MatrixInjector.get_wmsplit()
346  acqEra=False
347  for (n,dir) in directories.items():
348  chainDict=copy.deepcopy(self.defaultChain)
349  print("inspecting",dir)
350  nextHasDSInput=None
351  for (x,s) in mReader.workFlowSteps.items():
352  #x has the format (num, prefix)
353  #s has the format (num, name, commands, stepList)
354  if x[0]==n:
355  #print "found",n,s[3]
356  #chainDict['RequestString']='RV'+chainDict['CMSSWVersion']+s[1].split('+')[0]
357  index=0
358  splitForThisWf=None
359  thisLabel=self.speciallabel
360  #if 'HARVESTGEN' in s[3]:
361  if len( [step for step in s[3] if "HARVESTGEN" in step] )>0:
362  chainDict['TimePerEvent']=0.01
363  thisLabel=thisLabel+"_gen"
364  # for double miniAOD test
365  if len( [step for step in s[3] if "DBLMINIAODMCUP15NODQM" in step] )>0:
366  thisLabel=thisLabel+"_dblMiniAOD"
367  processStrPrefix=''
368  setPrimaryDs=None
369  nanoedmGT=''
370  for step in s[3]:
371 
372  if 'INPUT' in step or (not isinstance(s[2][index],str)):
373  nextHasDSInput=s[2][index]
374 
375  else:
376 
377  if (index==0):
378  #first step and not input -> gen part
379  chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultScratch))
380  try:
381  chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
382  except:
383  print("Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created")
384  return -15
385 
386  chainDict['nowmTasklist'][-1]['PrimaryDataset']='RelVal'+s[1].split('+')[0]
387  if not '--relval' in s[2][index]:
388  print('Impossible to create task from scratch without splitting information with --relval')
389  return -12
390  else:
391  arg=s[2][index].split()
392  ns=list(map(int,arg[len(arg) - arg[-1::-1].index('--relval')].split(',')))
393  chainDict['nowmTasklist'][-1]['RequestNumEvents'] = ns[0]
394  chainDict['nowmTasklist'][-1]['EventsPerJob'] = ns[1]
395  chainDict['nowmTasklist'][-1]['EventsPerLumi'] = ns[1]
396  #overwrite EventsPerLumi if numberEventsInLuminosityBlock is set in cmsDriver
397  if 'numberEventsInLuminosityBlock' in s[2][index]:
398  nEventsInLuminosityBlock = re.findall('process.source.numberEventsInLuminosityBlock=cms.untracked.uint32\\(([ 0-9 ]*)\\)', s[2][index],re.DOTALL)
399  if nEventsInLuminosityBlock[-1].isdigit() and int(nEventsInLuminosityBlock[-1]) < ns[1]:
400  chainDict['nowmTasklist'][-1]['EventsPerLumi'] = int(nEventsInLuminosityBlock[-1])
402  chainDict['nowmTasklist'][-1]['EventsPerLumi'] = self.numberEventsInLuminosityBlock
403  if 'FASTSIM' in s[2][index] or '--fast' in s[2][index]:
404  thisLabel+='_FastSim'
405  if 'lhe' in s[2][index] in s[2][index]:
406  chainDict['nowmTasklist'][-1]['LheInputFiles'] =True
407 
408  elif nextHasDSInput:
409  chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultInput))
410  try:
411  chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
412  except:
413  print("Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created")
414  return -15
415  chainDict['nowmTasklist'][-1]['InputDataset']=nextHasDSInput.dataSet
416  if ('DQMHLTonRAWAOD' in step) :
417  chainDict['nowmTasklist'][-1]['IncludeParents']=True
418  splitForThisWf=nextHasDSInput.split
419  chainDict['nowmTasklist'][-1]['LumisPerJob']=splitForThisWf
420  if step in wmsplit:
421  chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit[step]
422  # get the run numbers or #events
423  if len(nextHasDSInput.run):
424  chainDict['nowmTasklist'][-1]['RunWhitelist']=nextHasDSInput.run
425  if len(nextHasDSInput.ls):
426  chainDict['nowmTasklist'][-1]['LumiList']=nextHasDSInput.ls
427  #print "what is s",s[2][index]
428  if '--data' in s[2][index] and nextHasDSInput.label:
429  thisLabel+='_RelVal_%s'%nextHasDSInput.label
430  if 'filter' in chainDict['nowmTasklist'][-1]['nowmIO']:
431  print("This has an input DS and a filter sequence: very likely to be the PyQuen sample")
432  processStrPrefix='PU_'
433  setPrimaryDs = 'RelVal'+s[1].split('+')[0]
434  if setPrimaryDs:
435  chainDict['nowmTasklist'][-1]['PrimaryDataset']=setPrimaryDs
436  nextHasDSInput=None
437  if 'GPU' in step and self.RequiresGPU != 'forbidden':
438  chainDict['nowmTasklist'][-1]['RequiresGPU'] = self.RequiresGPU
439  chainDict['nowmTasklist'][-1]['GPUParams']=json.dumps(self.defaultGPUParams)
440  else:
441  #not first step and no inputDS
442  chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultTask))
443  try:
444  chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
445  except:
446  print("Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created")
447  return -15
448  if splitForThisWf:
449  chainDict['nowmTasklist'][-1]['LumisPerJob']=splitForThisWf
450  if step in wmsplit:
451  chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit[step]
452  if 'GPU' in step and self.RequiresGPU != 'forbidden':
453  chainDict['nowmTasklist'][-1]['RequiresGPU'] = self.RequiresGPU
454  chainDict['nowmTasklist'][-1]['GPUParams']=json.dumps(self.defaultGPUParams)
455 
456  # change LumisPerJob for Hadronizer steps.
457  if 'Hadronizer' in step:
458  chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit['Hadronizer']
459 
460  #print step
461  chainDict['nowmTasklist'][-1]['TaskName']=step
462  if setPrimaryDs:
463  chainDict['nowmTasklist'][-1]['PrimaryDataset']=setPrimaryDs
464  chainDict['nowmTasklist'][-1]['ConfigCacheID']='%s/%s.py'%(dir,step)
465  chainDict['nowmTasklist'][-1]['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT'] # copy to the proper parameter name
466  chainDict['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT'] #set in general to the last one of the chain
467  if 'NANOEDM' in step :
468  nanoedmGT = chainDict['nowmTasklist'][-1]['nowmIO']['GT']
469  if 'NANOMERGE' in step :
470  chainDict['GlobalTag'] = nanoedmGT
471  if 'pileup' in chainDict['nowmTasklist'][-1]['nowmIO']:
472  chainDict['nowmTasklist'][-1]['MCPileup']=chainDict['nowmTasklist'][-1]['nowmIO']['pileup']
473  if '--pileup ' in s[2][index]: # catch --pileup (scenarion) and not --pileup_ (dataset to be mixed) => works also making PRE-MIXed dataset
474  pileupString = s[2][index].split()[s[2][index].split().index('--pileup')+1]
475  processStrPrefix='PU_' # take care of pu overlay done with GEN-SIM mixing
476  if pileupString.find('25ns') > 0 :
477  processStrPrefix='PU25ns_'
478  elif pileupString.find('50ns') > 0 :
479  processStrPrefix='PU50ns_'
480  elif 'nopu' in pileupString.lower():
481  processStrPrefix=''
482  if 'premix_stage2' in s[2][index] and '--pileup_input' in s[2][index]: # take care of pu overlay done with DIGI mixing of premixed events
483  if s[2][index].split()[ s[2][index].split().index('--pileup_input')+1 ].find('25ns') > 0 :
484  processStrPrefix='PUpmx25ns_'
485  elif s[2][index].split()[ s[2][index].split().index('--pileup_input')+1 ].find('50ns') > 0 :
486  processStrPrefix='PUpmx50ns_'
487 
488  if acqEra:
489  #chainDict['AcquisitionEra'][step]=(chainDict['CMSSWVersion']+'-PU_'+chainDict['nowmTasklist'][-1]['GlobalTag']).replace('::All','')+thisLabel
490  chainDict['AcquisitionEra'][step]=chainDict['CMSSWVersion']
491  chainDict['ProcessingString'][step]=processStrPrefix+chainDict['nowmTasklist'][-1]['GlobalTag'].replace('::All','').replace('-','_')+thisLabel
492  if 'NANOMERGE' in step :
493  chainDict['ProcessingString'][step]=processStrPrefix+nanoedmGT.replace('::All','').replace('-','_')+thisLabel
494  else:
495  #chainDict['nowmTasklist'][-1]['AcquisitionEra']=(chainDict['CMSSWVersion']+'-PU_'+chainDict['nowmTasklist'][-1]['GlobalTag']).replace('::All','')+thisLabel
496  chainDict['nowmTasklist'][-1]['AcquisitionEra']=chainDict['CMSSWVersion']
497  chainDict['nowmTasklist'][-1]['ProcessingString']=processStrPrefix+chainDict['nowmTasklist'][-1]['GlobalTag'].replace('::All','').replace('-','_')+thisLabel
498  if 'NANOMERGE' in step :
499  chainDict['nowmTasklist'][-1]['ProcessingString']=processStrPrefix+nanoedmGT.replace('::All','').replace('-','_')+thisLabel
500 
501  if (self.batchName):
502  chainDict['nowmTasklist'][-1]['Campaign'] = chainDict['nowmTasklist'][-1]['AcquisitionEra']+self.batchName
503 
504  # specify different ProcessingString for double miniAOD dataset
505  if ('DBLMINIAODMCUP15NODQM' in step):
506  chainDict['nowmTasklist'][-1]['ProcessingString']=chainDict['nowmTasklist'][-1]['ProcessingString']+'_miniAOD'
507 
508  if( chainDict['nowmTasklist'][-1]['Multicore'] ):
509  # the scaling factor of 1.2GB / thread is empirical and measured on a SECOND round of tests with PU samples
510  # the number of threads is NO LONGER assumed to be the same for all tasks
511  # https://hypernews.cern.ch/HyperNews/CMS/get/edmFramework/3509/1/1/1.html
512  # now change to 1.5GB / additional thread according to discussion:
513  # https://hypernews.cern.ch/HyperNews/CMS/get/relval/4817/1/1.html
514 # chainDict['nowmTasklist'][-1]['Memory'] = 3000 + int( chainDict['nowmTasklist'][-1]['Multicore'] -1 )*1500
515  chainDict['nowmTasklist'][-1]['Memory'] = self.memoryOffset + int( chainDict['nowmTasklist'][-1]['Multicore'] -1 ) * self.memPerCore
516 
517  index+=1
518  #end of loop through steps
519  chainDict['RequestString']='RV'+chainDict['CMSSWVersion']+s[1].split('+')[0]
520  if processStrPrefix or thisLabel:
521  chainDict['RequestString']+='_'+processStrPrefix+thisLabel
522  #check candidate WF name
523  self.candidateWFName = self.user+'_'+chainDict['RequestString']
524  if (len(self.candidateWFName)>MAXWORKFLOWLENGTH):
526 
527 
528  chainDict['PrepID'] = chainDict['CMSSWVersion']+'__'+self.batchTime+'-'+s[1].split('+')[0]
529  if(self.batchName):
530  chainDict['PrepID'] = chainDict['CMSSWVersion']+self.batchName+'-'+s[1].split('+')[0]
531  if( 'HIN' in self.batchName ):
532  chainDict['SubRequestType'] = "HIRelVal"
533 
534  #wrap up for this one
535  import pprint
536  #print 'wrapping up'
537  #pprint.pprint(chainDict)
538  #loop on the task list
539  for i_second in reversed(range(len(chainDict['nowmTasklist']))):
540  t_second=chainDict['nowmTasklist'][i_second]
541  #print "t_second taskname", t_second['TaskName']
542  if 'primary' in t_second['nowmIO']:
543  #print t_second['nowmIO']['primary']
544  primary=t_second['nowmIO']['primary'][0].replace('file:','')
545  for i_input in reversed(range(0,i_second)):
546  t_input=chainDict['nowmTasklist'][i_input]
547  for (om,o) in t_input['nowmIO'].items():
548  if primary in o:
549  #print "found",primary,"procuced by",om,"of",t_input['TaskName']
550  #ad-hoc fix due to restriction in TaskName of 50 characters
551  if (len(t_input['TaskName'])>50):
552  if (t_input['TaskName'].find('GenSim') != -1):
553  t_input['TaskName'] = 'GenSimFull'
554  if (t_input['TaskName'].find('Hadronizer') != -1):
555  t_input['TaskName'] = 'HadronizerFull'
556  t_second['InputTask'] = t_input['TaskName']
557  t_second['InputFromOutputModule'] = om
558  #print 't_second',pprint.pformat(t_second)
559  if t_second['TaskName'].startswith('HARVEST'):
560  chainDict.update(copy.deepcopy(self.defaultHarvest))
561  if "_RD" in t_second['TaskName']:
562  chainDict['DQMHarvestUnit'] = "multiRun"
563  chainDict['DQMConfigCacheID']=t_second['ConfigCacheID']
564 
567  break
568 
569  # agreed changes for wm injection:
570  # - Campaign: *optional* string during creation. It will default to AcqEra value if possible.
571  # Otherwise it will be empty.
572  # - AcquisitionEra: *mandatory* string at request level during creation. *optional* string
573  # at task level during creation. "optional" during assignment.
574  # - ProcessingString: *mandatory* string at request level during creation. *optional* string
575  # at task level during creation. "optional" during assignment.
576  # - ProcessingVersion: *optional* during creation (default 1). *optional* during assignment.
577  #
578  # Which requires following changes here:
579  # - reset Global AcuisitionEra, ProcessingString to be the one in the first task
580  # - and also Campaign to be always the same as the AcquisitionEra
581 
582  if acqEra:
583  chainDict['AcquisitionEra'] = chainDict['AcquisitionEra'].values()[0]
584  chainDict['ProcessingString'] = chainDict['ProcessingString'].values()[0]
585  else:
586  chainDict['AcquisitionEra'] = chainDict['nowmTasklist'][0]['AcquisitionEra']
587  chainDict['ProcessingString'] = chainDict['nowmTasklist'][0]['ProcessingString']
588 
589 
591  chainDict['Campaign'] = chainDict['AcquisitionEra']+self.batchName
592 
593 
594  itask=0
595  if self.keep:
596  for i in self.keep:
597  if isinstance(i, int) and i < len(chainDict['nowmTasklist']):
598  chainDict['nowmTasklist'][i]['KeepOutput']=True
599  for (i,t) in enumerate(chainDict['nowmTasklist']):
600  if t['TaskName'].startswith('HARVEST'):
601  continue
602  if not self.keep:
603  t['KeepOutput']=True
604  elif t['TaskName'] in self.keep:
605  t['KeepOutput']=True
606  if t['TaskName'].startswith('HYBRIDRepackHI2015VR'):
607  t['KeepOutput']=False
608  t.pop('nowmIO')
609  itask+=1
610  chainDict['Task%d'%(itask)]=t
611 
612 
613 
614 
615 
616 
617  chainDict['TaskChain']=itask#len(chainDict['nowmTasklist'])
618 
619  chainDict.pop('nowmTasklist')
620  self.chainDicts[n]=chainDict
621 
622 
623  return 0
624 
625  def uploadConf(self,filePath,label,where):
626  labelInCouch=self.label+'_'+label
627  cacheName=filePath.split('/')[-1]
628  if self.testMode:
629  self.count+=1
630  print('\tFake upload of',filePath,'to couch with label',labelInCouch)
631  return self.count
632  else:
633  try:
634  from modules.wma import upload_to_couch,DATABASE_NAME
635  except:
636  print('\n\tUnable to find wmcontrol modules. Please include it in your python path\n')
637  print('\n\t QUIT\n')
638  sys.exit(-16)
639 
640  if cacheName in self.couchCache:
641  print("Not re-uploading",filePath,"to",where,"for",label)
642  cacheId=self.couchCache[cacheName]
643  else:
644  print("Loading",filePath,"to",where,"for",label)
645 
646  pool = multiprocessing.Pool(1)
647  cacheIds = pool.map( upload_to_couch_oneArg, [(filePath,labelInCouch,self.user,self.group,where)] )
648  cacheId = cacheIds[0]
649  self.couchCache[cacheName]=cacheId
650  return cacheId
651 
652  def upload(self):
653  for (n,d) in self.chainDicts.items():
654  for it in d:
655  if it.startswith("Task") and it!='TaskChain':
656  #upload
657  couchID=self.uploadConf(d[it]['ConfigCacheID'],
658  str(n)+d[it]['TaskName'],
659  d['ConfigCacheUrl']
660  )
661  print(d[it]['ConfigCacheID']," uploaded to couchDB for",str(n),"with ID",couchID)
662  d[it]['ConfigCacheID']=couchID
663  if it =='DQMConfigCacheID':
664  couchID=self.uploadConf(d['DQMConfigCacheID'],
665  str(n)+'harvesting',
666  d['ConfigCacheUrl']
667  )
668  print(d['DQMConfigCacheID'],"uploaded to couchDB for",str(n),"with ID",couchID)
669  d['DQMConfigCacheID']=couchID
670 
671 
672  def submit(self):
673  try:
674  from modules.wma import makeRequest,approveRequest
675  from wmcontrol import random_sleep
676  print('\n\tFound wmcontrol\n')
677  except:
678  print('\n\tUnable to find wmcontrol modules. Please include it in your python path\n')
679  if not self.testMode:
680  print('\n\t QUIT\n')
681  sys.exit(-17)
682 
683  import pprint
684  for (n,d) in self.chainDicts.items():
685  if self.testMode:
686  print("Only viewing request",n)
687  print(pprint.pprint(d))
688  else:
689  #submit to wmagent each dict
690  print("For eyes before submitting",n)
691  print(pprint.pprint(d))
692  print("Submitting",n,"...........")
693  workFlow=makeRequest(self.wmagent,d,encodeDict=True)
694  print("...........",n,"submitted")
695  random_sleep()
696  if self.testMode and len(self.longWFName)>0:
697  print("\n*** WARNING: "+str(len(self.longWFName))+" workflows have too long names for submission (>"+str(MAXWORKFLOWLENGTH)+ "characters) ***")
698  print('\n'.join(self.longWFName))
def prepare(self, mReader, directories, mode='init')
def performInjectionOptionTest(opt)
def uploadConf(self, filePath, label, where)
provide the number of tasks
def replace(string, replacements)
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:19
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
RequiresGPU
Checking and setting up GPU attributes.
static std::string join(char **cmd)
Definition: RemoteFile.cc:21
def __init__(self, opt, mode='init', options='')
#define update(a, b)
#define str(s)
if(threadIdxLocalY==0 &&threadIdxLocalX==0)
def upload_to_couch_oneArg(arguments)