CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
MatrixInjector.py
Go to the documentation of this file.
1 import sys
2 import json
3 import os
4 import copy
5 import multiprocessing
6 
8  if opt.show:
9  print 'Not injecting to wmagent in --show mode. Need to run the worklfows.'
10  sys.exit(-1)
11  if opt.wmcontrol=='init':
12  #init means it'll be in test mode
13  opt.nThreads=0
14  if opt.wmcontrol=='test':
15  #means the wf were created already, and we just dryRun it.
16  opt.dryRun=True
17  if opt.wmcontrol=='submit' and opt.nThreads==0:
18  print 'Not injecting to wmagent in -j 0 mode. Need to run the worklfows.'
19  sys.exit(-1)
20  if opt.wmcontrol=='force':
21  print "This is an expert setting, you'd better know what you're doing"
22  opt.dryRun=True
23 
24 def upload_to_couch_oneArg(arguments):
25  from modules.wma import upload_to_couch
26  (filePath,labelInCouch,user,group,where) = arguments
27  cacheId=upload_to_couch(filePath,
28  labelInCouch,
29  user,
30  group,
31  test_mode=False,
32  url=where)
33  return cacheId
34 
35 
37 
38  def __init__(self,opt,mode='init'):
39  self.count=1040
40  self.testMode=((mode!='submit') and (mode!='force'))
41  self.version =1
42  self.keep = opt.keep
43 
44  #wagemt stuff
45  self.wmagent=os.getenv('WMAGENT_REQMGR')
46  if not self.wmagent:
47  self.wmagent = 'cmsweb.cern.ch'
48 
49  #couch stuff
50  self.couch = 'https://'+self.wmagent+'/couchdb'
51 # self.couchDB = 'reqmgr_config_cache'
52  self.couchCache={} # so that we do not upload like crazy, and recyle cfgs
53  self.user = os.getenv('USER')
54  self.group = 'ppd'
55  self.label = 'RelValSet_'+os.getenv('CMSSW_VERSION').replace('-','')+'_v'+str(self.version)
56  self.speciallabel=''
57  if opt.label:
58  self.speciallabel= '_'+opt.label
59 
60 
61  if not os.getenv('WMCORE_ROOT'):
62  print '\n\twmclient is not setup properly. Will not be able to upload or submit requests.\n'
63  if not self.testMode:
64  print '\n\t QUIT\n'
65  sys.exit(-18)
66  else:
67  print '\n\tFound wmclient\n'
68 
69  self.defaultChain={
70  "RequestType" : "TaskChain", #this is how we handle relvals
71  "Requestor": self.user, #Person responsible
72  "Group": self.group, #group for the request
73  "CMSSWVersion": os.getenv('CMSSW_VERSION'), #CMSSW Version (used for all tasks in chain)
74  "Campaign": os.getenv('CMSSW_VERSION'), # only for wmstat purpose
75  "ScramArch": os.getenv('SCRAM_ARCH'), #Scram Arch (used for all tasks in chain)
76  "ProcessingVersion": self.version, #Processing Version (used for all tasks in chain)
77  "GlobalTag": None, #Global Tag (overridden per task)
78  "CouchURL": self.couch, #URL of CouchDB containing Config Cache
79  "ConfigCacheURL": self.couch, #URL of CouchDB containing Config Cache
80  "DbsUrl": "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet",
81  #"CouchDBName": self.couchDB, #Name of Couch Database containing config cache
82  #- Will contain all configs for all Tasks
83  "SiteWhitelist" : ["T2_CH_CERN", "T1_US_FNAL"], #Site whitelist
84  "TaskChain" : None, #Define number of tasks in chain.
85  "nowmTasklist" : [], #a list of tasks as we put them in
86  "unmergedLFNBase" : "/store/unmerged",
87  "mergedLFNBase" : "/store/relval",
88  "dashboardActivity" : "relval",
89  "Memory" : 2400,
90  "SizePerEvent" : 1234,
91  "TimePerEvent" : 20
92  }
93 
95  "EnableHarvesting" : "True",
96  "DQMUploadUrl" : "https://cmsweb.cern.ch/dqm/relval",
97  "DQMConfigCacheID" : None
98  }
99 
101  "TaskName" : None, #Task Name
102  "ConfigCacheID" : None, #Generator Config id
103  "GlobalTag": None,
104  "SplittingAlgo" : "EventBased", #Splitting Algorithm
105  "EventsPerJob" : None, #Size of jobs in terms of splitting algorithm
106  "RequestNumEvents" : None, #Total number of events to generate
107  "Seeding" : "AutomaticSeeding", #Random seeding method
108  "PrimaryDataset" : None, #Primary Dataset to be created
109  "nowmIO": {},
110  "KeepOutput" : False
111  }
113  "TaskName" : "DigiHLT", #Task Name
114  "ConfigCacheID" : None, #Processing Config id
115  "GlobalTag": None,
116  "InputDataset" : None, #Input Dataset to be processed
117  "SplittingAlgo" : "LumiBased", #Splitting Algorithm
118  "LumisPerJob" : 10, #Size of jobs in terms of splitting algorithm
119  "nowmIO": {},
120  "KeepOutput" : False
121  }
122  self.defaultTask={
123  "TaskName" : None, #Task Name
124  "InputTask" : None, #Input Task Name (Task Name field of a previous Task entry)
125  "InputFromOutputModule" : None, #OutputModule name in the input task that will provide files to process
126  "ConfigCacheID" : None, #Processing Config id
127  "GlobalTag": None,
128  "SplittingAlgo" : "LumiBased", #Splitting Algorithm
129  "LumisPerJob" : 10, #Size of jobs in terms of splitting algorithm
130  "nowmIO": {},
131  "KeepOutput" : False
132  }
133 
134  self.chainDicts={}
135 
136 
137  def prepare(self,mReader, directories, mode='init'):
138  try:
140  import pprint
141  pprint.pprint(wmsplit)
142  except:
143  print "Not set up for step splitting"
144  wmsplit={}
145 
146  acqEra=False
147  for (n,dir) in directories.items():
148  chainDict=copy.deepcopy(self.defaultChain)
149  print "inspecting",dir
150  nextHasDSInput=None
151  for (x,s) in mReader.workFlowSteps.items():
152  #x has the format (num, prefix)
153  #s has the format (num, name, commands, stepList)
154  if x[0]==n:
155  #print "found",n,s[3]
156  chainDict['RequestString']='RV'+chainDict['CMSSWVersion']+s[1].split('+')[0]
157  index=0
158  splitForThisWf=None
159  thisLabel=self.speciallabel
160  processStrPrefix=''
161  for step in s[3]:
162 
163  if 'INPUT' in step or (not isinstance(s[2][index],str)):
164  nextHasDSInput=s[2][index]
165 
166  else:
167 
168  if (index==0):
169  #first step and not input -> gen part
170  chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultScratch))
171  try:
172  chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
173  except:
174  print "Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created"
175  return -15
176 
177  chainDict['nowmTasklist'][-1]['PrimaryDataset']='RelVal'+s[1].split('+')[0]
178  if not '--relval' in s[2][index]:
179  print 'Impossible to create task from scratch without splitting information with --relval'
180  return -12
181  else:
182  arg=s[2][index].split()
183  ns=map(int,arg[arg.index('--relval')+1].split(','))
184  chainDict['nowmTasklist'][-1]['RequestNumEvents'] = ns[0]
185  chainDict['nowmTasklist'][-1]['EventsPerJob'] = ns[1]
186  if 'FASTSIM' in s[2][index] or '--fast' in s[2][index]:
187  thisLabel+='_FastSim'
188 
189  elif nextHasDSInput:
190  chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultInput))
191  try:
192  chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
193  except:
194  print "Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created"
195  return -15
196  chainDict['nowmTasklist'][-1]['InputDataset']=nextHasDSInput.dataSet
197  splitForThisWf=nextHasDSInput.split
198  chainDict['nowmTasklist'][-1]['LumisPerJob']=splitForThisWf
199  if step in wmsplit:
200  chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit[step]
201  # get the run numbers or #events
202  if len(nextHasDSInput.run):
203  chainDict['nowmTasklist'][-1]['RunWhitelist']=nextHasDSInput.run
204  #print "what is s",s[2][index]
205  if '--data' in s[2][index] and nextHasDSInput.label:
206  thisLabel+='_RelVal_%s'%nextHasDSInput.label
207  if 'filter' in chainDict['nowmTasklist'][-1]['nowmIO']:
208  print "This has an input DS and a filter sequence: very likely to be the PyQuen sample"
209  processStrPrefix='PU_'
210  chainDict['nowmTasklist'][-1]['PrimaryDataset']='RelVal'+s[1].split('+')[0]
211  nextHasDSInput=None
212  else:
213  #not first step and no inputDS
214  chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultTask))
215  try:
216  chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
217  except:
218  print "Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created"
219  return -15
220  if splitForThisWf:
221  chainDict['nowmTasklist'][-1]['LumisPerJob']=splitForThisWf
222  if step in wmsplit:
223  chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit[step]
224 
225  #print step
226  chainDict['nowmTasklist'][-1]['TaskName']=step
227  chainDict['nowmTasklist'][-1]['ConfigCacheID']='%s/%s.py'%(dir,step)
228  chainDict['nowmTasklist'][-1]['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT'] # copy to the proper parameter name
229  chainDict['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT'] #set in general to the last one of the chain
230  if 'pileup' in chainDict['nowmTasklist'][-1]['nowmIO']:
231  chainDict['nowmTasklist'][-1]['MCPileup']=chainDict['nowmTasklist'][-1]['nowmIO']['pileup']
232  if '--pileup' in s[2][index]:
233  processStrPrefix='PU_'
234  if acqEra:
235  #chainDict['AcquisitionEra'][step]=(chainDict['CMSSWVersion']+'-PU_'+chainDict['nowmTasklist'][-1]['GlobalTag']).replace('::All','')+thisLabel
236  chainDict['AcquisitionEra'][step]=chainDict['CMSSWVersion']
237  chainDict['ProcessingString'][step]=processStrPrefix+chainDict['nowmTasklist'][-1]['GlobalTag'].replace('::All','')+thisLabel
238  else:
239  #chainDict['nowmTasklist'][-1]['AcquisitionEra']=(chainDict['CMSSWVersion']+'-PU_'+chainDict['nowmTasklist'][-1]['GlobalTag']).replace('::All','')+thisLabel
240  chainDict['nowmTasklist'][-1]['AcquisitionEra']=chainDict['CMSSWVersion']
241  chainDict['nowmTasklist'][-1]['ProcessingString']=processStrPrefix+chainDict['nowmTasklist'][-1]['GlobalTag'].replace('::All','')+thisLabel
242 
243  index+=1
244 
245  #wrap up for this one
246  import pprint
247  #print 'wrapping up'
248  #pprint.pprint(chainDict)
249  #loop on the task list
250  for i_second in reversed(range(len(chainDict['nowmTasklist']))):
251  t_second=chainDict['nowmTasklist'][i_second]
252  #print "t_second taskname", t_second['TaskName']
253  if 'primary' in t_second['nowmIO']:
254  #print t_second['nowmIO']['primary']
255  primary=t_second['nowmIO']['primary'][0].replace('file:','')
256  for i_input in reversed(range(0,i_second)):
257  t_input=chainDict['nowmTasklist'][i_input]
258  for (om,o) in t_input['nowmIO'].items():
259  if primary in o:
260  #print "found",primary,"procuced by",om,"of",t_input['TaskName']
261  t_second['InputTask'] = t_input['TaskName']
262  t_second['InputFromOutputModule'] = om
263  #print 't_second',pprint.pformat(t_second)
264  if t_second['TaskName'].startswith('HARVEST'):
265  chainDict.update(copy.deepcopy(self.defaultHarvest))
266  chainDict['DQMConfigCacheID']=t_second['ConfigCacheID']
267  ## the info are not in the task specific dict but in the general dict
268  #t_input.update(copy.deepcopy(self.defaultHarvest))
269  #t_input['DQMConfigCacheID']=t_second['ConfigCacheID']
270  break
271 
272  ## there is in fact only one acquisition era
273  #if len(set(chainDict['AcquisitionEra'].values()))==1:
274  # print "setting only one acq"
275  if acqEra:
276  chainDict['AcquisitionEra'] = chainDict['AcquisitionEra'].values()[0]
277 
278  ## clean things up now
279  itask=0
280  if self.keep:
281  for i in self.keep:
282  if type(i)==int and i < len(chainDict['nowmTasklist']):
283  chainDict['nowmTasklist'][i]['KeepOutput']=True
284  for (i,t) in enumerate(chainDict['nowmTasklist']):
285  if t['TaskName'].startswith('HARVEST'):
286  continue
287  if not self.keep:
288  t['KeepOutput']=True
289  elif t['TaskName'] in self.keep:
290  t['KeepOutput']=True
291  t.pop('nowmIO')
292  itask+=1
293  chainDict['Task%d'%(itask)]=t
294 
295 
296  ##
297 
298 
299  ## provide the number of tasks
300  chainDict['TaskChain']=itask#len(chainDict['nowmTasklist'])
301 
302  chainDict.pop('nowmTasklist')
303  self.chainDicts[n]=chainDict
304 
305 
306  return 0
307 
308  def uploadConf(self,filePath,label,where):
309  labelInCouch=self.label+'_'+label
310  cacheName=filePath.split('/')[-1]
311  if self.testMode:
312  self.count+=1
313  print '\tFake upload of',filePath,'to couch with label',labelInCouch
314  return self.count
315  else:
316  try:
317  from modules.wma import upload_to_couch,DATABASE_NAME
318  except:
319  print '\n\tUnable to find wmcontrol modules. Please include it in your python path\n'
320  print '\n\t QUIT\n'
321  sys.exit(-16)
322 
323  if cacheName in self.couchCache:
324  print "Not re-uploading",filePath,"to",where,"for",label
325  cacheId=self.couchCache[cacheName]
326  else:
327  print "Loading",filePath,"to",where,"for",label
328  ## totally fork the upload to couch to prevent cross loading of process configurations
329  pool = multiprocessing.Pool(1)
330  cacheIds = pool.map( upload_to_couch_oneArg, [(filePath,labelInCouch,self.user,self.group,where)] )
331  cacheId = cacheIds[0]
332  self.couchCache[cacheName]=cacheId
333  return cacheId
334 
335  def upload(self):
336  for (n,d) in self.chainDicts.items():
337  for it in d:
338  if it.startswith("Task") and it!='TaskChain':
339  #upload
340  couchID=self.uploadConf(d[it]['ConfigCacheID'],
341  str(n)+d[it]['TaskName'],
342  d['CouchURL']
343  )
344  print d[it]['ConfigCacheID']," uploaded to couchDB for",str(n),"with ID",couchID
345  d[it]['ConfigCacheID']=couchID
346  if it =='DQMConfigCacheID':
347  couchID=self.uploadConf(d['DQMConfigCacheID'],
348  str(n)+'harvesting',
349  d['CouchURL']
350  )
351  print d['DQMConfigCacheID'],"uploaded to couchDB for",str(n),"with ID",couchID
352  d['DQMConfigCacheID']=couchID
353 
354 
355  def submit(self):
356  try:
357  from modules.wma import makeRequest,approveRequest
358  from wmcontrol import random_sleep
359  print '\n\tFound wmcontrol\n'
360  except:
361  print '\n\tUnable to find wmcontrol modules. Please include it in your python path\n'
362  if not self.testMode:
363  print '\n\t QUIT\n'
364  sys.exit(-17)
365 
366  import pprint
367  for (n,d) in self.chainDicts.items():
368  if self.testMode:
369  print "Only viewing request",n
370  print pprint.pprint(d)
371  else:
372  #submit to wmagent each dict
373  print "For eyes before submitting",n
374  print pprint.pprint(d)
375  print "Submitting",n,"..........."
376  workFlow=makeRequest(self.wmagent,d,encodeDict=True)
377  approveRequest(self.wmagent,workFlow)
378  print "...........",n,"submitted"
379  random_sleep()
def replace
Definition: linker.py:10
dictionary map
Definition: Association.py:196
list object
Definition: dbtoconf.py:77
def performInjectionOptionTest
def upload_to_couch_oneArg
double split
Definition: MVATrainer.cc:139