CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
MatrixInjector.py
Go to the documentation of this file.
1 import sys
2 import json
3 import os
4 import copy
5 import multiprocessing
6 
8  if opt.show:
9  print 'Not injecting to wmagent in --show mode. Need to run the worklfows.'
10  sys.exit(-1)
11  if opt.wmcontrol=='init':
12  #init means it'll be in test mode
13  opt.nThreads=0
14  if opt.wmcontrol=='test':
15  #means the wf were created already, and we just dryRun it.
16  opt.dryRun=True
17  if opt.wmcontrol=='submit' and opt.nThreads==0:
18  print 'Not injecting to wmagent in -j 0 mode. Need to run the worklfows.'
19  sys.exit(-1)
20  if opt.wmcontrol=='force':
21  print "This is an expert setting, you'd better know what you're doing"
22  opt.dryRun=True
23 
24 def upload_to_couch_oneArg(arguments):
25  from modules.wma import upload_to_couch
26  (filePath,labelInCouch,user,group,where) = arguments
27  cacheId=upload_to_couch(filePath,
28  labelInCouch,
29  user,
30  group,
31  test_mode=False,
32  url=where)
33  return cacheId
34 
35 
37 
38  def __init__(self,opt,mode='init',options=''):
39  self.count=1040
40 
41  self.dqmgui=None
42  self.wmagent=None
43  for k in options.split(','):
44  if k.startswith('dqm:'):
45  self.dqmgui=k.split(':',1)[-1]
46  elif k.startswith('wma:'):
47  self.wmagent=k.split(':',1)[-1]
48 
49  self.testMode=((mode!='submit') and (mode!='force'))
50  self.version =1
51  self.keep = opt.keep
52 
53  #wagemt stuff
54  if not self.wmagent:
55  self.wmagent=os.getenv('WMAGENT_REQMGR')
56  if not self.wmagent:
57  self.wmagent = 'cmsweb.cern.ch'
58 
59  if not self.dqmgui:
60  self.dqmgui="https://cmsweb.cern.ch/dqm/relval"
61  #couch stuff
62  self.couch = 'https://'+self.wmagent+'/couchdb'
63 # self.couchDB = 'reqmgr_config_cache'
64  self.couchCache={} # so that we do not upload like crazy, and recyle cfgs
65  self.user = os.getenv('USER')
66  self.group = 'ppd'
67  self.label = 'RelValSet_'+os.getenv('CMSSW_VERSION').replace('-','')+'_v'+str(self.version)
68  self.speciallabel=''
69  if opt.label:
70  self.speciallabel= '_'+opt.label
71 
72 
73  if not os.getenv('WMCORE_ROOT'):
74  print '\n\twmclient is not setup properly. Will not be able to upload or submit requests.\n'
75  if not self.testMode:
76  print '\n\t QUIT\n'
77  sys.exit(-18)
78  else:
79  print '\n\tFound wmclient\n'
80 
81  self.defaultChain={
82  "RequestType" : "TaskChain", #this is how we handle relvals
83  "Requestor": self.user, #Person responsible
84  "Group": self.group, #group for the request
85  "CMSSWVersion": os.getenv('CMSSW_VERSION'), #CMSSW Version (used for all tasks in chain)
86  "Campaign": os.getenv('CMSSW_VERSION'), # only for wmstat purpose
87  "ScramArch": os.getenv('SCRAM_ARCH'), #Scram Arch (used for all tasks in chain)
88  "ProcessingVersion": self.version, #Processing Version (used for all tasks in chain)
89  "GlobalTag": None, #Global Tag (overridden per task)
90  "CouchURL": self.couch, #URL of CouchDB containing Config Cache
91  "ConfigCacheURL": self.couch, #URL of CouchDB containing Config Cache
92  "DbsUrl": "https://cmsweb.cern.ch/dbs/prod/global/DBSReader",
93  #- Will contain all configs for all Tasks
94  #"SiteWhitelist" : ["T2_CH_CERN", "T1_US_FNAL"], #Site whitelist
95  "TaskChain" : None, #Define number of tasks in chain.
96  "nowmTasklist" : [], #a list of tasks as we put them in
97  "unmergedLFNBase" : "/store/unmerged",
98  "mergedLFNBase" : "/store/relval",
99  "dashboardActivity" : "relval",
100  "Memory" : 2400,
101  "SizePerEvent" : 1234,
102  "TimePerEvent" : 20
103  }
104 
106  "EnableHarvesting" : "True",
107  "DQMUploadUrl" : self.dqmgui,
108  "DQMConfigCacheID" : None
109  }
110 
112  "TaskName" : None, #Task Name
113  "ConfigCacheID" : None, #Generator Config id
114  "GlobalTag": None,
115  "SplittingAlgo" : "EventBased", #Splitting Algorithm
116  "EventsPerJob" : None, #Size of jobs in terms of splitting algorithm
117  "RequestNumEvents" : None, #Total number of events to generate
118  "Seeding" : "AutomaticSeeding", #Random seeding method
119  "PrimaryDataset" : None, #Primary Dataset to be created
120  "nowmIO": {},
121  "KeepOutput" : False
122  }
124  "TaskName" : "DigiHLT", #Task Name
125  "ConfigCacheID" : None, #Processing Config id
126  "GlobalTag": None,
127  "InputDataset" : None, #Input Dataset to be processed
128  "SplittingAlgo" : "LumiBased", #Splitting Algorithm
129  "LumisPerJob" : 10, #Size of jobs in terms of splitting algorithm
130  "nowmIO": {},
131  "KeepOutput" : False
132  }
133  self.defaultTask={
134  "TaskName" : None, #Task Name
135  "InputTask" : None, #Input Task Name (Task Name field of a previous Task entry)
136  "InputFromOutputModule" : None, #OutputModule name in the input task that will provide files to process
137  "ConfigCacheID" : None, #Processing Config id
138  "GlobalTag": None,
139  "SplittingAlgo" : "LumiBased", #Splitting Algorithm
140  "LumisPerJob" : 10, #Size of jobs in terms of splitting algorithm
141  "nowmIO": {},
142  "KeepOutput" : False
143  }
144 
145  self.chainDicts={}
146 
147 
148  def prepare(self,mReader, directories, mode='init'):
149  try:
150  #from Configuration.PyReleaseValidation.relval_steps import wmsplit
151  wmsplit = {}
152  wmsplit['DIGIHI']=5
153  wmsplit['RECOHI']=5
154  wmsplit['HLTD']=5
155  wmsplit['RECODreHLT']=2
156  wmsplit['DIGIPU']=4
157  wmsplit['DIGIPU1']=4
158  wmsplit['RECOPU1']=1
159  wmsplit['DIGIUP15_PU50']=1
160  wmsplit['RECOUP15_PU50']=1
161  wmsplit['DIGIUP15_PU25']=1
162  wmsplit['RECOUP15_PU25']=1
163  wmsplit['DIGIHISt3']=5
164  wmsplit['RECODSplit']=1
165  wmsplit['SingleMuPt10_ID']=1
166  wmsplit['DIGI_ID']=1
167  wmsplit['RECO_ID']=1
168  wmsplit['TTbar_ID']=1
169  wmsplit['SingleMuPt10FS_ID']=1
170  wmsplit['TTbarFS_ID']=1
171 
172  #import pprint
173  #pprint.pprint(wmsplit)
174  except:
175  print "Not set up for step splitting"
176  wmsplit={}
177 
178  acqEra=False
179  for (n,dir) in directories.items():
180  chainDict=copy.deepcopy(self.defaultChain)
181  print "inspecting",dir
182  nextHasDSInput=None
183  for (x,s) in mReader.workFlowSteps.items():
184  #x has the format (num, prefix)
185  #s has the format (num, name, commands, stepList)
186  if x[0]==n:
187  #print "found",n,s[3]
188  #chainDict['RequestString']='RV'+chainDict['CMSSWVersion']+s[1].split('+')[0]
189  index=0
190  splitForThisWf=None
191  thisLabel=self.speciallabel
192  processStrPrefix=''
193  setPrimaryDs=None
194  for step in s[3]:
195 
196  if 'INPUT' in step or (not isinstance(s[2][index],str)):
197  nextHasDSInput=s[2][index]
198 
199  else:
200 
201  if (index==0):
202  #first step and not input -> gen part
203  chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultScratch))
204  try:
205  chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
206  except:
207  print "Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created"
208  return -15
209 
210  chainDict['nowmTasklist'][-1]['PrimaryDataset']='RelVal'+s[1].split('+')[0]
211  if not '--relval' in s[2][index]:
212  print 'Impossible to create task from scratch without splitting information with --relval'
213  return -12
214  else:
215  arg=s[2][index].split()
216  ns=map(int,arg[arg.index('--relval')+1].split(','))
217  chainDict['nowmTasklist'][-1]['RequestNumEvents'] = ns[0]
218  chainDict['nowmTasklist'][-1]['EventsPerJob'] = ns[1]
219  if 'FASTSIM' in s[2][index] or '--fast' in s[2][index]:
220  thisLabel+='_FastSim'
221  if 'lhe' in s[2][index] in s[2][index]:
222  chainDict['nowmTasklist'][-1]['LheInputFiles'] =True
223 
224  elif nextHasDSInput:
225  chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultInput))
226  try:
227  chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
228  except:
229  print "Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created"
230  return -15
231  chainDict['nowmTasklist'][-1]['InputDataset']=nextHasDSInput.dataSet
232  splitForThisWf=nextHasDSInput.split
233  chainDict['nowmTasklist'][-1]['LumisPerJob']=splitForThisWf
234  if step in wmsplit:
235  chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit[step]
236  # get the run numbers or #events
237  if len(nextHasDSInput.run):
238  chainDict['nowmTasklist'][-1]['RunWhitelist']=nextHasDSInput.run
239  #print "what is s",s[2][index]
240  if '--data' in s[2][index] and nextHasDSInput.label:
241  thisLabel+='_RelVal_%s'%nextHasDSInput.label
242  if 'filter' in chainDict['nowmTasklist'][-1]['nowmIO']:
243  print "This has an input DS and a filter sequence: very likely to be the PyQuen sample"
244  processStrPrefix='PU_'
245  setPrimaryDs = 'RelVal'+s[1].split('+')[0]
246  if setPrimaryDs:
247  chainDict['nowmTasklist'][-1]['PrimaryDataset']=setPrimaryDs
248  nextHasDSInput=None
249  else:
250  #not first step and no inputDS
251  chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultTask))
252  try:
253  chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
254  except:
255  print "Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created"
256  return -15
257  if splitForThisWf:
258  chainDict['nowmTasklist'][-1]['LumisPerJob']=splitForThisWf
259  if step in wmsplit:
260  chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit[step]
261 
262  #print step
263  chainDict['nowmTasklist'][-1]['TaskName']=step
264  if setPrimaryDs:
265  chainDict['nowmTasklist'][-1]['PrimaryDataset']=setPrimaryDs
266  chainDict['nowmTasklist'][-1]['ConfigCacheID']='%s/%s.py'%(dir,step)
267  chainDict['nowmTasklist'][-1]['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT'] # copy to the proper parameter name
268  chainDict['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT'] #set in general to the last one of the chain
269  if 'pileup' in chainDict['nowmTasklist'][-1]['nowmIO']:
270  chainDict['nowmTasklist'][-1]['MCPileup']=chainDict['nowmTasklist'][-1]['nowmIO']['pileup']
271  if '--pileup ' in s[2][index]: # catch --pileup (scenarion) and not --pileup_ (dataset to be mixed) => works also making PRE-MIXed dataset
272  processStrPrefix='PU_' # take care of pu overlay done with GEN-SIM mixing
273  if ( s[2][index].split()[ s[2][index].split().index('--pileup')+1 ] ).find('25ns') > 0 :
274  processStrPrefix='PU25ns_'
275  elif ( s[2][index].split()[ s[2][index].split().index('--pileup')+1 ] ).find('50ns') > 0 :
276  processStrPrefix='PU50ns_'
277  if 'DIGIPREMIX_S2' in s[2][index] : # take care of pu overlay done with DIGI mixing of premixed events
278  if s[2][index].split()[ s[2][index].split().index('--pileup_input')+1 ].find('25ns') > 0 :
279  processStrPrefix='PUpmx25ns_'
280  elif s[2][index].split()[ s[2][index].split().index('--pileup_input')+1 ].find('50ns') > 0 :
281  processStrPrefix='PUpmx50ns_'
282 
283  if acqEra:
284  #chainDict['AcquisitionEra'][step]=(chainDict['CMSSWVersion']+'-PU_'+chainDict['nowmTasklist'][-1]['GlobalTag']).replace('::All','')+thisLabel
285  chainDict['AcquisitionEra'][step]=chainDict['CMSSWVersion']
286  chainDict['ProcessingString'][step]=processStrPrefix+chainDict['nowmTasklist'][-1]['GlobalTag'].replace('::All','')+thisLabel
287  else:
288  #chainDict['nowmTasklist'][-1]['AcquisitionEra']=(chainDict['CMSSWVersion']+'-PU_'+chainDict['nowmTasklist'][-1]['GlobalTag']).replace('::All','')+thisLabel
289  chainDict['nowmTasklist'][-1]['AcquisitionEra']=chainDict['CMSSWVersion']
290  chainDict['nowmTasklist'][-1]['ProcessingString']=processStrPrefix+chainDict['nowmTasklist'][-1]['GlobalTag'].replace('::All','')+thisLabel
291 
292  index+=1
293  #end of loop through steps
294  chainDict['RequestString']='RV'+chainDict['CMSSWVersion']+s[1].split('+')[0]
295  if processStrPrefix or thisLabel:
296  chainDict['RequestString']+='_'+processStrPrefix+thisLabel
297 
298 
299 
300  #wrap up for this one
301  import pprint
302  #print 'wrapping up'
303  #pprint.pprint(chainDict)
304  #loop on the task list
305  for i_second in reversed(range(len(chainDict['nowmTasklist']))):
306  t_second=chainDict['nowmTasklist'][i_second]
307  #print "t_second taskname", t_second['TaskName']
308  if 'primary' in t_second['nowmIO']:
309  #print t_second['nowmIO']['primary']
310  primary=t_second['nowmIO']['primary'][0].replace('file:','')
311  for i_input in reversed(range(0,i_second)):
312  t_input=chainDict['nowmTasklist'][i_input]
313  for (om,o) in t_input['nowmIO'].items():
314  if primary in o:
315  #print "found",primary,"procuced by",om,"of",t_input['TaskName']
316  t_second['InputTask'] = t_input['TaskName']
317  t_second['InputFromOutputModule'] = om
318  #print 't_second',pprint.pformat(t_second)
319  if t_second['TaskName'].startswith('HARVEST'):
320  chainDict.update(copy.deepcopy(self.defaultHarvest))
321  chainDict['DQMConfigCacheID']=t_second['ConfigCacheID']
322  ## the info are not in the task specific dict but in the general dict
323  #t_input.update(copy.deepcopy(self.defaultHarvest))
324  #t_input['DQMConfigCacheID']=t_second['ConfigCacheID']
325  break
326 
327  ## there is in fact only one acquisition era
328  #if len(set(chainDict['AcquisitionEra'].values()))==1:
329  # print "setting only one acq"
330  if acqEra:
331  chainDict['AcquisitionEra'] = chainDict['AcquisitionEra'].values()[0]
332 
333  ## clean things up now
334  itask=0
335  if self.keep:
336  for i in self.keep:
337  if type(i)==int and i < len(chainDict['nowmTasklist']):
338  chainDict['nowmTasklist'][i]['KeepOutput']=True
339  for (i,t) in enumerate(chainDict['nowmTasklist']):
340  if t['TaskName'].startswith('HARVEST'):
341  continue
342  if not self.keep:
343  t['KeepOutput']=True
344  elif t['TaskName'] in self.keep:
345  t['KeepOutput']=True
346  t.pop('nowmIO')
347  itask+=1
348  chainDict['Task%d'%(itask)]=t
349 
350 
351  ##
352 
353 
354  ## provide the number of tasks
355  chainDict['TaskChain']=itask#len(chainDict['nowmTasklist'])
356 
357  chainDict.pop('nowmTasklist')
358  self.chainDicts[n]=chainDict
359 
360 
361  return 0
362 
363  def uploadConf(self,filePath,label,where):
364  labelInCouch=self.label+'_'+label
365  cacheName=filePath.split('/')[-1]
366  if self.testMode:
367  self.count+=1
368  print '\tFake upload of',filePath,'to couch with label',labelInCouch
369  return self.count
370  else:
371  try:
372  from modules.wma import upload_to_couch,DATABASE_NAME
373  except:
374  print '\n\tUnable to find wmcontrol modules. Please include it in your python path\n'
375  print '\n\t QUIT\n'
376  sys.exit(-16)
377 
378  if cacheName in self.couchCache:
379  print "Not re-uploading",filePath,"to",where,"for",label
380  cacheId=self.couchCache[cacheName]
381  else:
382  print "Loading",filePath,"to",where,"for",label
383  ## totally fork the upload to couch to prevent cross loading of process configurations
384  pool = multiprocessing.Pool(1)
385  cacheIds = pool.map( upload_to_couch_oneArg, [(filePath,labelInCouch,self.user,self.group,where)] )
386  cacheId = cacheIds[0]
387  self.couchCache[cacheName]=cacheId
388  return cacheId
389 
390  def upload(self):
391  for (n,d) in self.chainDicts.items():
392  for it in d:
393  if it.startswith("Task") and it!='TaskChain':
394  #upload
395  couchID=self.uploadConf(d[it]['ConfigCacheID'],
396  str(n)+d[it]['TaskName'],
397  d['CouchURL']
398  )
399  print d[it]['ConfigCacheID']," uploaded to couchDB for",str(n),"with ID",couchID
400  d[it]['ConfigCacheID']=couchID
401  if it =='DQMConfigCacheID':
402  couchID=self.uploadConf(d['DQMConfigCacheID'],
403  str(n)+'harvesting',
404  d['CouchURL']
405  )
406  print d['DQMConfigCacheID'],"uploaded to couchDB for",str(n),"with ID",couchID
407  d['DQMConfigCacheID']=couchID
408 
409 
410  def submit(self):
411  try:
412  from modules.wma import makeRequest,approveRequest
413  from wmcontrol import random_sleep
414  print '\n\tFound wmcontrol\n'
415  except:
416  print '\n\tUnable to find wmcontrol modules. Please include it in your python path\n'
417  if not self.testMode:
418  print '\n\t QUIT\n'
419  sys.exit(-17)
420 
421  import pprint
422  for (n,d) in self.chainDicts.items():
423  if self.testMode:
424  print "Only viewing request",n
425  print pprint.pprint(d)
426  else:
427  #submit to wmagent each dict
428  print "For eyes before submitting",n
429  print pprint.pprint(d)
430  print "Submitting",n,"..........."
431  workFlow=makeRequest(self.wmagent,d,encodeDict=True)
432  approveRequest(self.wmagent,workFlow)
433  print "...........",n,"submitted"
434  random_sleep()
435 
436 
437 
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:7
list object
Definition: dbtoconf.py:77
def performInjectionOptionTest
def upload_to_couch_oneArg
double split
Definition: MVATrainer.cc:139