CMS 3D CMS Logo

All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
MatrixInjector.py
Go to the documentation of this file.
1 import sys
2 import json
3 import os
4 import copy
5 import multiprocessing
6 
8  if opt.show:
9  print 'Not injecting to wmagent in --show mode. Need to run the worklfows.'
10  sys.exit(-1)
11  if opt.wmcontrol=='init':
12  #init means it'll be in test mode
13  opt.nThreads=0
14  if opt.wmcontrol=='test':
15  #means the wf were created already, and we just dryRun it.
16  opt.dryRun=True
17  if opt.wmcontrol=='submit' and opt.nThreads==0:
18  print 'Not injecting to wmagent in -j 0 mode. Need to run the worklfows.'
19  sys.exit(-1)
20  if opt.wmcontrol=='force':
21  print "This is an expert setting, you'd better know what you're doing"
22  opt.dryRun=True
23 
24 def upload_to_couch_oneArg(arguments):
25  from modules.wma import upload_to_couch
26  (filePath,labelInCouch,user,group,where) = arguments
27  cacheId=upload_to_couch(filePath,
28  labelInCouch,
29  user,
30  group,
31  test_mode=False,
32  url=where)
33  return cacheId
34 
35 
37 
38  def __init__(self,opt,mode='init'):
39  self.count=1040
40  self.testMode=((mode!='submit') and (mode!='force'))
41  self.version =1
42  self.keep = opt.keep
43 
44  #wagemt stuff
45  self.wmagent=os.getenv('WMAGENT_REQMGR')
46  if not self.wmagent:
47  self.wmagent = 'cmsweb.cern.ch'
48 
49  #couch stuff
50  self.couch = 'https://'+self.wmagent+'/couchdb'
51 # self.couchDB = 'reqmgr_config_cache'
52  self.couchCache={} # so that we do not upload like crazy, and recyle cfgs
53  self.user = os.getenv('USER')
54  self.group = 'ppd'
55  self.label = 'RelValSet_'+os.getenv('CMSSW_VERSION').replace('-','')+'_v'+str(self.version)
56  self.speciallabel=''
57  if opt.label:
58  self.speciallabel= '_'+opt.label
59 
60 
61  if not os.getenv('WMCORE_ROOT'):
62  print '\n\twmclient is not setup properly. Will not be able to upload or submit requests.\n'
63  if not self.testMode:
64  print '\n\t QUIT\n'
65  sys.exit(-18)
66  else:
67  print '\n\tFound wmclient\n'
68 
69  self.defaultChain={
70  "RequestType" : "TaskChain", #this is how we handle relvals
71  "Requestor": self.user, #Person responsible
72  "Group": self.group, #group for the request
73  "CMSSWVersion": os.getenv('CMSSW_VERSION'), #CMSSW Version (used for all tasks in chain)
74  "Campaign": os.getenv('CMSSW_VERSION'), # only for wmstat purpose
75  "ScramArch": os.getenv('SCRAM_ARCH'), #Scram Arch (used for all tasks in chain)
76  "ProcessingVersion": self.version, #Processing Version (used for all tasks in chain)
77  "GlobalTag": None, #Global Tag (overridden per task)
78  "CouchURL": self.couch, #URL of CouchDB containing Config Cache
79  "ConfigCacheURL": self.couch, #URL of CouchDB containing Config Cache
80  "DbsUrl": "https://cmsweb.cern.ch/dbs/prod/global/DBSReader",
81  #"SiteWhitelist" : ["T2_CH_CERN", "T1_US_FNAL"], #Site whitelist
82  "TaskChain" : None, #Define number of tasks in chain.
83  "nowmTasklist" : [], #a list of tasks as we put them in
84  "unmergedLFNBase" : "/store/unmerged",
85  "mergedLFNBase" : "/store/relval",
86  "dashboardActivity" : "relval",
87  "Memory" : 2400,
88  "SizePerEvent" : 1234,
89  "TimePerEvent" : 20
90  }
91 
93  "EnableHarvesting" : "True",
94  "DQMUploadUrl" : "https://cmsweb.cern.ch/dqm/relval",
95  "DQMConfigCacheID" : None
96  }
97 
99  "TaskName" : None, #Task Name
100  "ConfigCacheID" : None, #Generator Config id
101  "GlobalTag": None,
102  "SplittingAlgo" : "EventBased", #Splitting Algorithm
103  "EventsPerJob" : None, #Size of jobs in terms of splitting algorithm
104  "RequestNumEvents" : None, #Total number of events to generate
105  "Seeding" : "AutomaticSeeding", #Random seeding method
106  "PrimaryDataset" : None, #Primary Dataset to be created
107  "nowmIO": {},
108  "KeepOutput" : False
109  }
111  "TaskName" : "DigiHLT", #Task Name
112  "ConfigCacheID" : None, #Processing Config id
113  "GlobalTag": None,
114  "InputDataset" : None, #Input Dataset to be processed
115  "SplittingAlgo" : "LumiBased", #Splitting Algorithm
116  "LumisPerJob" : 10, #Size of jobs in terms of splitting algorithm
117  "nowmIO": {},
118  "KeepOutput" : False
119  }
120  self.defaultTask={
121  "TaskName" : None, #Task Name
122  "InputTask" : None, #Input Task Name (Task Name field of a previous Task entry)
123  "InputFromOutputModule" : None, #OutputModule name in the input task that will provide files to process
124  "ConfigCacheID" : None, #Processing Config id
125  "GlobalTag": None,
126  "SplittingAlgo" : "LumiBased", #Splitting Algorithm
127  "LumisPerJob" : 10, #Size of jobs in terms of splitting algorithm
128  "nowmIO": {},
129  "KeepOutput" : False
130  }
131 
132  self.chainDicts={}
133 
134 
135  def prepare(self,mReader, directories, mode='init'):
136  try:
138  import pprint
139  pprint.pprint(wmsplit)
140  except:
141  print "Not set up for step splitting"
142  wmsplit={}
143 
144  acqEra=False
145  for (n,dir) in directories.items():
146  chainDict=copy.deepcopy(self.defaultChain)
147  print "inspecting",dir
148  nextHasDSInput=None
149  for (x,s) in mReader.workFlowSteps.items():
150  #x has the format (num, prefix)
151  #s has the format (num, name, commands, stepList)
152  if x[0]==n:
153  #print "found",n,s[3]
154  chainDict['RequestString']='RV'+chainDict['CMSSWVersion']+s[1].split('+')[0]
155  index=0
156  splitForThisWf=None
157  thisLabel=self.speciallabel
158  processStrPrefix=''
159  for step in s[3]:
160 
161  if 'INPUT' in step or (not isinstance(s[2][index],str)):
162  nextHasDSInput=s[2][index]
163 
164  else:
165 
166  if (index==0):
167  #first step and not input -> gen part
168  chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultScratch))
169  try:
170  chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
171  except:
172  print "Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created"
173  return -15
174 
175  chainDict['nowmTasklist'][-1]['PrimaryDataset']='RelVal'+s[1].split('+')[0]
176  if not '--relval' in s[2][index]:
177  print 'Impossible to create task from scratch without splitting information with --relval'
178  return -12
179  else:
180  arg=s[2][index].split()
181  ns=map(int,arg[arg.index('--relval')+1].split(','))
182  chainDict['nowmTasklist'][-1]['RequestNumEvents'] = ns[0]
183  chainDict['nowmTasklist'][-1]['EventsPerJob'] = ns[1]
184  if 'FASTSIM' in s[2][index] or '--fast' in s[2][index]:
185  thisLabel+='_FastSim'
186 
187  elif nextHasDSInput:
188  chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultInput))
189  try:
190  chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
191  except:
192  print "Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created"
193  return -15
194  chainDict['nowmTasklist'][-1]['InputDataset']=nextHasDSInput.dataSet
195  splitForThisWf=nextHasDSInput.split
196  chainDict['nowmTasklist'][-1]['LumisPerJob']=splitForThisWf
197  if step in wmsplit:
198  chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit[step]
199  # get the run numbers or #events
200  if len(nextHasDSInput.run):
201  chainDict['nowmTasklist'][-1]['RunWhitelist']=nextHasDSInput.run
202  #print "what is s",s[2][index]
203  if '--data' in s[2][index] and nextHasDSInput.label:
204  thisLabel+='_RelVal_%s'%nextHasDSInput.label
205  if 'filter' in chainDict['nowmTasklist'][-1]['nowmIO']:
206  print "This has an input DS and a filter sequence: very likely to be the PyQuen sample"
207  processStrPrefix='PU_'
208  chainDict['nowmTasklist'][-1]['PrimaryDataset']='RelVal'+s[1].split('+')[0]
209  nextHasDSInput=None
210  else:
211  #not first step and no inputDS
212  chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultTask))
213  try:
214  chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
215  except:
216  print "Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created"
217  return -15
218  if splitForThisWf:
219  chainDict['nowmTasklist'][-1]['LumisPerJob']=splitForThisWf
220  if step in wmsplit:
221  chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit[step]
222  chainDict['nowmTasklist'][-1]['PrimaryDataset']='RelVal'+s[1].split('+')[0]
223  #print step
224  chainDict['nowmTasklist'][-1]['TaskName']=step
225  chainDict['nowmTasklist'][-1]['ConfigCacheID']='%s/%s.py'%(dir,step)
226  chainDict['nowmTasklist'][-1]['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT'] # copy to the proper parameter name
227  chainDict['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT'] #set in general to the last one of the chain
228  if 'pileup' in chainDict['nowmTasklist'][-1]['nowmIO']:
229  chainDict['nowmTasklist'][-1]['MCPileup']=chainDict['nowmTasklist'][-1]['nowmIO']['pileup']
230  if '--pileup' in s[2][index]:
231  processStrPrefix='PU_'
232  if acqEra:
233  #chainDict['AcquisitionEra'][step]=(chainDict['CMSSWVersion']+'-PU_'+chainDict['nowmTasklist'][-1]['GlobalTag']).replace('::All','')+thisLabel
234  chainDict['AcquisitionEra'][step]=chainDict['CMSSWVersion']
235  chainDict['ProcessingString'][step]=processStrPrefix+chainDict['nowmTasklist'][-1]['GlobalTag'].replace('::All','')+thisLabel
236  else:
237  #chainDict['nowmTasklist'][-1]['AcquisitionEra']=(chainDict['CMSSWVersion']+'-PU_'+chainDict['nowmTasklist'][-1]['GlobalTag']).replace('::All','')+thisLabel
238  chainDict['nowmTasklist'][-1]['AcquisitionEra']=chainDict['CMSSWVersion']
239  chainDict['nowmTasklist'][-1]['ProcessingString']=processStrPrefix+chainDict['nowmTasklist'][-1]['GlobalTag'].replace('::All','')+thisLabel
240 
241  index+=1
242 
243  #wrap up for this one
244  import pprint
245  #print 'wrapping up'
246  #pprint.pprint(chainDict)
247  #loop on the task list
248  for i_second in reversed(range(len(chainDict['nowmTasklist']))):
249  t_second=chainDict['nowmTasklist'][i_second]
250  #print "t_second taskname", t_second['TaskName']
251  if 'primary' in t_second['nowmIO']:
252  #print t_second['nowmIO']['primary']
253  primary=t_second['nowmIO']['primary'][0].replace('file:','')
254  for i_input in reversed(range(0,i_second)):
255  t_input=chainDict['nowmTasklist'][i_input]
256  for (om,o) in t_input['nowmIO'].items():
257  if primary in o:
258  #print "found",primary,"procuced by",om,"of",t_input['TaskName']
259  t_second['InputTask'] = t_input['TaskName']
260  t_second['InputFromOutputModule'] = om
261  #print 't_second',pprint.pformat(t_second)
262  if t_second['TaskName'].startswith('HARVEST'):
263  chainDict.update(copy.deepcopy(self.defaultHarvest))
264  chainDict['DQMConfigCacheID']=t_second['ConfigCacheID']
265  ## the info are not in the task specific dict but in the general dict
266  #t_input.update(copy.deepcopy(self.defaultHarvest))
267  #t_input['DQMConfigCacheID']=t_second['ConfigCacheID']
268  break
269 
270  ## there is in fact only one acquisition era
271  #if len(set(chainDict['AcquisitionEra'].values()))==1:
272  # print "setting only one acq"
273  if acqEra:
274  chainDict['AcquisitionEra'] = chainDict['AcquisitionEra'].values()[0]
275 
276  ## clean things up now
277  itask=0
278  if self.keep:
279  for i in self.keep:
280  if type(i)==int and i < len(chainDict['nowmTasklist']):
281  chainDict['nowmTasklist'][i]['KeepOutput']=True
282  for (i,t) in enumerate(chainDict['nowmTasklist']):
283  if t['TaskName'].startswith('HARVEST'):
284  continue
285  if not self.keep:
286  t['KeepOutput']=True
287  elif t['TaskName'] in self.keep:
288  t['KeepOutput']=True
289  t.pop('nowmIO')
290  itask+=1
291  chainDict['Task%d'%(itask)]=t
292 
293 
294  ##
295 
296 
297  ## provide the number of tasks
298  chainDict['TaskChain']=itask#len(chainDict['nowmTasklist'])
299 
300  chainDict.pop('nowmTasklist')
301  self.chainDicts[n]=chainDict
302 
303 
304  return 0
305 
306  def uploadConf(self,filePath,label,where):
307  labelInCouch=self.label+'_'+label
308  cacheName=filePath.split('/')[-1]
309  if self.testMode:
310  self.count+=1
311  print '\tFake upload of',filePath,'to couch with label',labelInCouch
312  return self.count
313  else:
314  try:
315  from modules.wma import upload_to_couch,DATABASE_NAME
316  except:
317  print '\n\tUnable to find wmcontrol modules. Please include it in your python path\n'
318  print '\n\t QUIT\n'
319  sys.exit(-16)
320 
321  if cacheName in self.couchCache:
322  print "Not re-uploading",filePath,"to",where,"for",label
323  cacheId=self.couchCache[cacheName]
324  else:
325  print "Loading",filePath,"to",where,"for",label
326  ## totally fork the upload to couch to prevent cross loading of process configurations
327  pool = multiprocessing.Pool(1)
328  cacheIds = pool.map( upload_to_couch_oneArg, [(filePath,labelInCouch,self.user,self.group,where)] )
329  cacheId = cacheIds[0]
330  self.couchCache[cacheName]=cacheId
331  return cacheId
332 
333  def upload(self):
334  for (n,d) in self.chainDicts.items():
335  for it in d:
336  if it.startswith("Task") and it!='TaskChain':
337  #upload
338  couchID=self.uploadConf(d[it]['ConfigCacheID'],
339  str(n)+d[it]['TaskName'],
340  d['CouchURL']
341  )
342  print d[it]['ConfigCacheID']," uploaded to couchDB for",str(n),"with ID",couchID
343  d[it]['ConfigCacheID']=couchID
344  if it =='DQMConfigCacheID':
345  couchID=self.uploadConf(d['DQMConfigCacheID'],
346  str(n)+'harvesting',
347  d['CouchURL']
348  )
349  print d['DQMConfigCacheID'],"uploaded to couchDB for",str(n),"with ID",couchID
350  d['DQMConfigCacheID']=couchID
351 
352 
353  def submit(self):
354  try:
355  from modules.wma import makeRequest,approveRequest
356  from wmcontrol import random_sleep
357  print '\n\tFound wmcontrol\n'
358  except:
359  print '\n\tUnable to find wmcontrol modules. Please include it in your python path\n'
360  if not self.testMode:
361  print '\n\t QUIT\n'
362  sys.exit(-17)
363 
364  import pprint
365  for (n,d) in self.chainDicts.items():
366  if self.testMode:
367  print "Only viewing request",n
368  print pprint.pprint(d)
369  else:
370  #submit to wmagent each dict
371  print "For eyes before submitting",n
372  print pprint.pprint(d)
373  print "Submitting",n,"..........."
374  workFlow=makeRequest(self.wmagent,d,encodeDict=True)
375  approveRequest(self.wmagent,workFlow)
376  print "...........",n,"submitted"
377  random_sleep()
def replace
Definition: linker.py:10
dictionary map
Definition: Association.py:205
list object
Definition: dbtoconf.py:77
def performInjectionOptionTest
def upload_to_couch_oneArg
double split
Definition: MVATrainer.cc:139