CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
MatrixInjector.py
Go to the documentation of this file.
1 import sys
2 import json
3 import os
4 import copy
5 import multiprocessing
6 
8  if opt.show:
9  print 'Not injecting to wmagent in --show mode. Need to run the worklfows.'
10  sys.exit(-1)
11  if opt.wmcontrol=='init':
12  #init means it'll be in test mode
13  opt.nProcs=0
14  if opt.wmcontrol=='test':
15  #means the wf were created already, and we just dryRun it.
16  opt.dryRun=True
17  if opt.wmcontrol=='submit' and opt.nProcs==0:
18  print 'Not injecting to wmagent in -j 0 mode. Need to run the worklfows.'
19  sys.exit(-1)
20  if opt.wmcontrol=='force':
21  print "This is an expert setting, you'd better know what you're doing"
22  opt.dryRun=True
23 
24 def upload_to_couch_oneArg(arguments):
25  from modules.wma import upload_to_couch
26  (filePath,labelInCouch,user,group,where) = arguments
27  cacheId=upload_to_couch(filePath,
28  labelInCouch,
29  user,
30  group,
31  test_mode=False,
32  url=where)
33  return cacheId
34 
35 
36 class MatrixInjector(object):
37 
38  def __init__(self,opt,mode='init',options=''):
39  self.count=1040
40 
41  self.dqmgui=None
42  self.wmagent=None
43  for k in options.split(','):
44  if k.startswith('dqm:'):
45  self.dqmgui=k.split(':',1)[-1]
46  elif k.startswith('wma:'):
47  self.wmagent=k.split(':',1)[-1]
48 
49  self.testMode=((mode!='submit') and (mode!='force'))
50  self.version =1
51  self.keep = opt.keep
52 
53  #wagemt stuff
54  if not self.wmagent:
55  self.wmagent=os.getenv('WMAGENT_REQMGR')
56  if not self.wmagent:
57  if not opt.testbed :
58  self.wmagent = 'cmsweb.cern.ch'
59  self.DbsUrl = "https://"+self.wmagent+"/dbs/prod/global/DBSReader"
60  else :
61  self.wmagent = 'cmsweb-testbed.cern.ch'
62  self.DbsUrl = "https://"+self.wmagent+"/dbs/int/global/DBSReader"
63 
64  if not self.dqmgui:
65  self.dqmgui="https://cmsweb.cern.ch/dqm/relval"
66  #couch stuff
67  self.couch = 'https://'+self.wmagent+'/couchdb'
68 # self.couchDB = 'reqmgr_config_cache'
69  self.couchCache={} # so that we do not upload like crazy, and recyle cfgs
70  self.user = os.getenv('USER')
71  self.group = 'ppd'
72  self.label = 'RelValSet_'+os.getenv('CMSSW_VERSION').replace('-','')+'_v'+str(self.version)
73  self.speciallabel=''
74  if opt.label:
75  self.speciallabel= '_'+opt.label
76 
77 
78  if not os.getenv('WMCORE_ROOT'):
79  print '\n\twmclient is not setup properly. Will not be able to upload or submit requests.\n'
80  if not self.testMode:
81  print '\n\t QUIT\n'
82  sys.exit(-18)
83  else:
84  print '\n\tFound wmclient\n'
85 
86  self.defaultChain={
87  "RequestType" : "TaskChain", #this is how we handle relvals
88  "SubRequestType" : "RelVal", #this is how we handle relvals, now that TaskChain is also used for central MC production
89  "RequestPriority": 500000,
90  "Requestor": self.user, #Person responsible
91  "Group": self.group, #group for the request
92  "CMSSWVersion": os.getenv('CMSSW_VERSION'), #CMSSW Version (used for all tasks in chain)
93  "Campaign": os.getenv('CMSSW_VERSION'), # only for wmstat purpose
94  "ScramArch": os.getenv('SCRAM_ARCH'), #Scram Arch (used for all tasks in chain)
95  "ProcessingVersion": self.version, #Processing Version (used for all tasks in chain)
96  "GlobalTag": None, #Global Tag (overridden per task)
97  "CouchURL": self.couch, #URL of CouchDB containing Config Cache
98  "ConfigCacheURL": self.couch, #URL of CouchDB containing Config Cache
99  "DbsUrl": self.DbsUrl,
100  #- Will contain all configs for all Tasks
101  #"SiteWhitelist" : ["T2_CH_CERN", "T1_US_FNAL"], #Site whitelist
102  "TaskChain" : None, #Define number of tasks in chain.
103  "nowmTasklist" : [], #a list of tasks as we put them in
104  "unmergedLFNBase" : "/store/unmerged",
105  "mergedLFNBase" : "/store/relval",
106  "dashboardActivity" : "relval",
107  "Multicore" : opt.nThreads,
108  "Memory" : 3000,
109  "SizePerEvent" : 1234,
110  "TimePerEvent" : 0.1
111  }
112 
114  "EnableHarvesting" : "True",
115  "DQMUploadUrl" : self.dqmgui,
116  "DQMConfigCacheID" : None
117  }
118 
120  "TaskName" : None, #Task Name
121  "ConfigCacheID" : None, #Generator Config id
122  "GlobalTag": None,
123  "SplittingAlgo" : "EventBased", #Splitting Algorithm
124  "EventsPerJob" : None, #Size of jobs in terms of splitting algorithm
125  "RequestNumEvents" : None, #Total number of events to generate
126  "Seeding" : "AutomaticSeeding", #Random seeding method
127  "PrimaryDataset" : None, #Primary Dataset to be created
128  "nowmIO": {},
129  "KeepOutput" : False
130  }
132  "TaskName" : "DigiHLT", #Task Name
133  "ConfigCacheID" : None, #Processing Config id
134  "GlobalTag": None,
135  "InputDataset" : None, #Input Dataset to be processed
136  "SplittingAlgo" : "LumiBased", #Splitting Algorithm
137  "LumisPerJob" : 10, #Size of jobs in terms of splitting algorithm
138  "nowmIO": {},
139  "KeepOutput" : False
140  }
141  self.defaultTask={
142  "TaskName" : None, #Task Name
143  "InputTask" : None, #Input Task Name (Task Name field of a previous Task entry)
144  "InputFromOutputModule" : None, #OutputModule name in the input task that will provide files to process
145  "ConfigCacheID" : None, #Processing Config id
146  "GlobalTag": None,
147  "SplittingAlgo" : "LumiBased", #Splitting Algorithm
148  "LumisPerJob" : 10, #Size of jobs in terms of splitting algorithm
149  "nowmIO": {},
150  "KeepOutput" : False
151  }
152 
153  self.chainDicts={}
154 
155 
156  def prepare(self,mReader, directories, mode='init'):
157  try:
158  #from Configuration.PyReleaseValidation.relval_steps import wmsplit
159  wmsplit = {}
160  wmsplit['DIGIHI']=5
161  wmsplit['RECOHI']=5
162  wmsplit['HLTD']=5
163  wmsplit['RECODreHLT']=2
164  wmsplit['DIGIPU']=4
165  wmsplit['DIGIPU1']=4
166  wmsplit['RECOPU1']=1
167  wmsplit['DIGIUP15_PU50']=1
168  wmsplit['RECOUP15_PU50']=1
169  wmsplit['DIGIUP15_PU25']=1
170  wmsplit['RECOUP15_PU25']=1
171  wmsplit['DIGIUP15_PU25HS']=1
172  wmsplit['RECOUP15_PU25HS']=1
173  wmsplit['DIGIHIMIX']=5
174  wmsplit['RECOHIMIX']=5
175  wmsplit['RECODSplit']=1
176  wmsplit['SingleMuPt10_UP15_ID']=1
177  wmsplit['DIGIUP15_ID']=1
178  wmsplit['RECOUP15_ID']=1
179  wmsplit['TTbar_13_ID']=1
180  wmsplit['SingleMuPt10FS_ID']=1
181  wmsplit['TTbarFS_ID']=1
182  wmsplit['RECODR2_50nsreHLT']=1
183  wmsplit['RECODR2_25nsreHLT']=1
184  wmsplit['HLTDR2_50ns']=1
185  wmsplit['HLTDR2_25ns']=1
186  wmsplit['Hadronizer']=1
187  wmsplit['DIGIUP15']=5
188  wmsplit['RECOUP15']=5
189  wmsplit['RECOAODUP15']=5
190  wmsplit['DBLMINIAODMCUP15NODQM']=5
191 
192  #import pprint
193  #pprint.pprint(wmsplit)
194  except:
195  print "Not set up for step splitting"
196  wmsplit={}
197 
198  acqEra=False
199  for (n,dir) in directories.items():
200  chainDict=copy.deepcopy(self.defaultChain)
201  print "inspecting",dir
202  nextHasDSInput=None
203  for (x,s) in mReader.workFlowSteps.items():
204  #x has the format (num, prefix)
205  #s has the format (num, name, commands, stepList)
206  if x[0]==n:
207  #print "found",n,s[3]
208  #chainDict['RequestString']='RV'+chainDict['CMSSWVersion']+s[1].split('+')[0]
209  index=0
210  splitForThisWf=None
211  thisLabel=self.speciallabel
212  #if 'HARVESTGEN' in s[3]:
213  if len( [step for step in s[3] if "HARVESTGEN" in step] )>0:
214  chainDict['TimePerEvent']=0.01
215  thisLabel=thisLabel+"_gen"
216  # for double miniAOD test
217  if len( [step for step in s[3] if "DBLMINIAODMCUP15NODQM" in step] )>0:
218  thisLabel=thisLabel+"_dblMiniAOD"
219  processStrPrefix=''
220  setPrimaryDs=None
221  for step in s[3]:
222 
223  if 'INPUT' in step or (not isinstance(s[2][index],str)):
224  nextHasDSInput=s[2][index]
225 
226  else:
227 
228  if (index==0):
229  #first step and not input -> gen part
230  chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultScratch))
231  try:
232  chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
233  except:
234  print "Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created"
235  return -15
236 
237  chainDict['nowmTasklist'][-1]['PrimaryDataset']='RelVal'+s[1].split('+')[0]
238  if not '--relval' in s[2][index]:
239  print 'Impossible to create task from scratch without splitting information with --relval'
240  return -12
241  else:
242  arg=s[2][index].split()
243  ns=map(int,arg[arg.index('--relval')+1].split(','))
244  chainDict['nowmTasklist'][-1]['RequestNumEvents'] = ns[0]
245  chainDict['nowmTasklist'][-1]['EventsPerJob'] = ns[1]
246  if 'FASTSIM' in s[2][index] or '--fast' in s[2][index]:
247  thisLabel+='_FastSim'
248  if 'lhe' in s[2][index] in s[2][index]:
249  chainDict['nowmTasklist'][-1]['LheInputFiles'] =True
250 
251  elif nextHasDSInput:
252  chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultInput))
253  try:
254  chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
255  except:
256  print "Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created"
257  return -15
258  chainDict['nowmTasklist'][-1]['InputDataset']=nextHasDSInput.dataSet
259  splitForThisWf=nextHasDSInput.split
260  chainDict['nowmTasklist'][-1]['LumisPerJob']=splitForThisWf
261  if step in wmsplit:
262  chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit[step]
263  # get the run numbers or #events
264  if len(nextHasDSInput.run):
265  chainDict['nowmTasklist'][-1]['RunWhitelist']=nextHasDSInput.run
266  if len(nextHasDSInput.ls):
267  chainDict['nowmTasklist'][-1]['LumiList']=nextHasDSInput.ls
268  #print "what is s",s[2][index]
269  if '--data' in s[2][index] and nextHasDSInput.label:
270  thisLabel+='_RelVal_%s'%nextHasDSInput.label
271  if 'filter' in chainDict['nowmTasklist'][-1]['nowmIO']:
272  print "This has an input DS and a filter sequence: very likely to be the PyQuen sample"
273  processStrPrefix='PU_'
274  setPrimaryDs = 'RelVal'+s[1].split('+')[0]
275  if setPrimaryDs:
276  chainDict['nowmTasklist'][-1]['PrimaryDataset']=setPrimaryDs
277  nextHasDSInput=None
278  else:
279  #not first step and no inputDS
280  chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultTask))
281  try:
282  chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
283  except:
284  print "Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created"
285  return -15
286  if splitForThisWf:
287  chainDict['nowmTasklist'][-1]['LumisPerJob']=splitForThisWf
288  if step in wmsplit:
289  chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit[step]
290 
291  # change LumisPerJob for Hadronizer steps.
292  if 'Hadronizer' in step:
293  chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit['Hadronizer']
294 
295  #print step
296  chainDict['nowmTasklist'][-1]['TaskName']=step
297  if setPrimaryDs:
298  chainDict['nowmTasklist'][-1]['PrimaryDataset']=setPrimaryDs
299  chainDict['nowmTasklist'][-1]['ConfigCacheID']='%s/%s.py'%(dir,step)
300  chainDict['nowmTasklist'][-1]['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT'] # copy to the proper parameter name
301  chainDict['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT'] #set in general to the last one of the chain
302  if 'pileup' in chainDict['nowmTasklist'][-1]['nowmIO']:
303  chainDict['nowmTasklist'][-1]['MCPileup']=chainDict['nowmTasklist'][-1]['nowmIO']['pileup']
304  if '--pileup ' in s[2][index]: # catch --pileup (scenarion) and not --pileup_ (dataset to be mixed) => works also making PRE-MIXed dataset
305  processStrPrefix='PU_' # take care of pu overlay done with GEN-SIM mixing
306  if ( s[2][index].split()[ s[2][index].split().index('--pileup')+1 ] ).find('25ns') > 0 :
307  processStrPrefix='PU25ns_'
308  elif ( s[2][index].split()[ s[2][index].split().index('--pileup')+1 ] ).find('50ns') > 0 :
309  processStrPrefix='PU50ns_'
310  if 'DIGIPREMIX_S2' in s[2][index] : # take care of pu overlay done with DIGI mixing of premixed events
311  if s[2][index].split()[ s[2][index].split().index('--pileup_input')+1 ].find('25ns') > 0 :
312  processStrPrefix='PUpmx25ns_'
313  elif s[2][index].split()[ s[2][index].split().index('--pileup_input')+1 ].find('50ns') > 0 :
314  processStrPrefix='PUpmx50ns_'
315 
316  if acqEra:
317  #chainDict['AcquisitionEra'][step]=(chainDict['CMSSWVersion']+'-PU_'+chainDict['nowmTasklist'][-1]['GlobalTag']).replace('::All','')+thisLabel
318  chainDict['AcquisitionEra'][step]=chainDict['CMSSWVersion']
319  chainDict['ProcessingString'][step]=processStrPrefix+chainDict['nowmTasklist'][-1]['GlobalTag'].replace('::All','')+thisLabel
320  else:
321  #chainDict['nowmTasklist'][-1]['AcquisitionEra']=(chainDict['CMSSWVersion']+'-PU_'+chainDict['nowmTasklist'][-1]['GlobalTag']).replace('::All','')+thisLabel
322  chainDict['nowmTasklist'][-1]['AcquisitionEra']=chainDict['CMSSWVersion']
323  chainDict['nowmTasklist'][-1]['ProcessingString']=processStrPrefix+chainDict['nowmTasklist'][-1]['GlobalTag'].replace('::All','')+thisLabel
324 
325  # specify different ProcessingString for double miniAOD dataset
326  if ('DBLMINIAODMCUP15NODQM' in step):
327  chainDict['nowmTasklist'][-1]['ProcessingString']=chainDict['nowmTasklist'][-1]['ProcessingString']+'_miniAOD'
328 
329  # modify memory settings for Multicore processing
330  # implemented with reference to franzoni:multithread-CMSSW_7_4_3
331  if(chainDict['Multicore']>1):
332  # the scaling factor of 1.2GB / thread is empirical and measured on a SECOND round of tests with PU samples
333  # the number of threads is assumed to be the same for all tasks
334  # https://hypernews.cern.ch/HyperNews/CMS/get/edmFramework/3509/1/1/1.html
335  chainDict['nowmTasklist'][-1]['Memory']= 3000 + int( chainDict['Multicore'] -1)*1200
336  # set also the overall memory to the same value; the agreement (in the phasing in) is that
337  chainDict['Memory'] = 3000 + int( chainDict['Multicore'] -1)*1200
338 
339  index+=1
340  #end of loop through steps
341  chainDict['RequestString']='RV'+chainDict['CMSSWVersion']+s[1].split('+')[0]
342  if processStrPrefix or thisLabel:
343  chainDict['RequestString']+='_'+processStrPrefix+thisLabel
344 
345 
346 
347  #wrap up for this one
348  import pprint
349  #print 'wrapping up'
350  #pprint.pprint(chainDict)
351  #loop on the task list
352  for i_second in reversed(range(len(chainDict['nowmTasklist']))):
353  t_second=chainDict['nowmTasklist'][i_second]
354  #print "t_second taskname", t_second['TaskName']
355  if 'primary' in t_second['nowmIO']:
356  #print t_second['nowmIO']['primary']
357  primary=t_second['nowmIO']['primary'][0].replace('file:','')
358  for i_input in reversed(range(0,i_second)):
359  t_input=chainDict['nowmTasklist'][i_input]
360  for (om,o) in t_input['nowmIO'].items():
361  if primary in o:
362  #print "found",primary,"procuced by",om,"of",t_input['TaskName']
363  t_second['InputTask'] = t_input['TaskName']
364  t_second['InputFromOutputModule'] = om
365  #print 't_second',pprint.pformat(t_second)
366  if t_second['TaskName'].startswith('HARVEST'):
367  chainDict.update(copy.deepcopy(self.defaultHarvest))
368  chainDict['DQMConfigCacheID']=t_second['ConfigCacheID']
369  ## the info are not in the task specific dict but in the general dict
370  #t_input.update(copy.deepcopy(self.defaultHarvest))
371  #t_input['DQMConfigCacheID']=t_second['ConfigCacheID']
372  break
373 
374  ## there is in fact only one acquisition era
375  #if len(set(chainDict['AcquisitionEra'].values()))==1:
376  # print "setting only one acq"
377  if acqEra:
378  chainDict['AcquisitionEra'] = chainDict['AcquisitionEra'].values()[0]
379 
380  ## clean things up now
381  itask=0
382  if self.keep:
383  for i in self.keep:
384  if type(i)==int and i < len(chainDict['nowmTasklist']):
385  chainDict['nowmTasklist'][i]['KeepOutput']=True
386  for (i,t) in enumerate(chainDict['nowmTasklist']):
387  if t['TaskName'].startswith('HARVEST'):
388  continue
389  if not self.keep:
390  t['KeepOutput']=True
391  elif t['TaskName'] in self.keep:
392  t['KeepOutput']=True
393  t.pop('nowmIO')
394  itask+=1
395  chainDict['Task%d'%(itask)]=t
396 
397 
398  ##
399 
400 
401  ## provide the number of tasks
402  chainDict['TaskChain']=itask#len(chainDict['nowmTasklist'])
403 
404  chainDict.pop('nowmTasklist')
405  self.chainDicts[n]=chainDict
406 
407 
408  return 0
409 
410  def uploadConf(self,filePath,label,where):
411  labelInCouch=self.label+'_'+label
412  cacheName=filePath.split('/')[-1]
413  if self.testMode:
414  self.count+=1
415  print '\tFake upload of',filePath,'to couch with label',labelInCouch
416  return self.count
417  else:
418  try:
419  from modules.wma import upload_to_couch,DATABASE_NAME
420  except:
421  print '\n\tUnable to find wmcontrol modules. Please include it in your python path\n'
422  print '\n\t QUIT\n'
423  sys.exit(-16)
424 
425  if cacheName in self.couchCache:
426  print "Not re-uploading",filePath,"to",where,"for",label
427  cacheId=self.couchCache[cacheName]
428  else:
429  print "Loading",filePath,"to",where,"for",label
430  ## totally fork the upload to couch to prevent cross loading of process configurations
431  pool = multiprocessing.Pool(1)
432  cacheIds = pool.map( upload_to_couch_oneArg, [(filePath,labelInCouch,self.user,self.group,where)] )
433  cacheId = cacheIds[0]
434  self.couchCache[cacheName]=cacheId
435  return cacheId
436 
437  def upload(self):
438  for (n,d) in self.chainDicts.items():
439  for it in d:
440  if it.startswith("Task") and it!='TaskChain':
441  #upload
442  couchID=self.uploadConf(d[it]['ConfigCacheID'],
443  str(n)+d[it]['TaskName'],
444  d['CouchURL']
445  )
446  print d[it]['ConfigCacheID']," uploaded to couchDB for",str(n),"with ID",couchID
447  d[it]['ConfigCacheID']=couchID
448  if it =='DQMConfigCacheID':
449  couchID=self.uploadConf(d['DQMConfigCacheID'],
450  str(n)+'harvesting',
451  d['CouchURL']
452  )
453  print d['DQMConfigCacheID'],"uploaded to couchDB for",str(n),"with ID",couchID
454  d['DQMConfigCacheID']=couchID
455 
456 
457  def submit(self):
458  try:
459  from modules.wma import makeRequest,approveRequest
460  from wmcontrol import random_sleep
461  print '\n\tFound wmcontrol\n'
462  except:
463  print '\n\tUnable to find wmcontrol modules. Please include it in your python path\n'
464  if not self.testMode:
465  print '\n\t QUIT\n'
466  sys.exit(-17)
467 
468  import pprint
469  for (n,d) in self.chainDicts.items():
470  if self.testMode:
471  print "Only viewing request",n
472  print pprint.pprint(d)
473  else:
474  #submit to wmagent each dict
475  print "For eyes before submitting",n
476  print pprint.pprint(d)
477  print "Submitting",n,"..........."
478  workFlow=makeRequest(self.wmagent,d,encodeDict=True)
479  approveRequest(self.wmagent,workFlow)
480  print "...........",n,"submitted"
481  random_sleep()
482 
483 
484 
boost::dynamic_bitset append(const boost::dynamic_bitset<> &bs1, const boost::dynamic_bitset<> &bs2)
this method takes two bitsets bs1 and bs2 and returns result of bs2 appended to the end of bs1 ...
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:7
def performInjectionOptionTest
if(dp >Float(M_PI)) dp-
def upload_to_couch_oneArg
double split
Definition: MVATrainer.cc:139