CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Public Attributes
MatrixInjector.MatrixInjector Class Reference
Inheritance diagram for MatrixInjector.MatrixInjector:

Public Member Functions

def __init__
 
def prepare
 
def submit
 
def upload
 
def uploadConf
 

Public Attributes

 chainDicts
 
 couch
 
 couchCache
 
 count
 
 defaultChain
 
 defaultHarvest
 
 defaultInput
 
 defaultScratch
 
 defaultTask
 
 group
 
 keep
 
 label
 
 speciallabel
 
 testMode
 
 user
 
 version
 
 wmagent
 

Detailed Description

Definition at line 36 of file MatrixInjector.py.

Constructor & Destructor Documentation

def MatrixInjector.MatrixInjector.__init__ (   self,
  opt,
  mode = 'init' 
)

Definition at line 38 of file MatrixInjector.py.

38 
39  def __init__(self,opt,mode='init'):
40  self.count=1040
41  self.testMode=((mode!='submit') and (mode!='force'))
42  self.version =1
43  self.keep = opt.keep
44 
45  #wagemt stuff
46  self.wmagent=os.getenv('WMAGENT_REQMGR')
47  if not self.wmagent:
48  self.wmagent = 'cmsweb.cern.ch'
49 
50  #couch stuff
51  self.couch = 'https://'+self.wmagent+'/couchdb'
52 # self.couchDB = 'reqmgr_config_cache'
53  self.couchCache={} # so that we do not upload like crazy, and recyle cfgs
54  self.user = os.getenv('USER')
55  self.group = 'ppd'
56  self.label = 'RelValSet_'+os.getenv('CMSSW_VERSION').replace('-','')+'_v'+str(self.version)
57  self.speciallabel=''
58  if opt.label:
59  self.speciallabel= '_'+opt.label
60 
61 
62  if not os.getenv('WMCORE_ROOT'):
63  print '\n\twmclient is not setup properly. Will not be able to upload or submit requests.\n'
64  if not self.testMode:
65  print '\n\t QUIT\n'
66  sys.exit(-18)
67  else:
68  print '\n\tFound wmclient\n'
69 
70  self.defaultChain={
71  "RequestType" : "TaskChain", #this is how we handle relvals
72  "Requestor": self.user, #Person responsible
73  "Group": self.group, #group for the request
74  "CMSSWVersion": os.getenv('CMSSW_VERSION'), #CMSSW Version (used for all tasks in chain)
75  "Campaign": os.getenv('CMSSW_VERSION'), # only for wmstat purpose
76  "ScramArch": os.getenv('SCRAM_ARCH'), #Scram Arch (used for all tasks in chain)
77  "ProcessingVersion": self.version, #Processing Version (used for all tasks in chain)
78  "GlobalTag": None, #Global Tag (overridden per task)
79  "CouchURL": self.couch, #URL of CouchDB containing Config Cache
80  "ConfigCacheURL": self.couch, #URL of CouchDB containing Config Cache
81  "DbsUrl": "https://cmsweb.cern.ch/dbs/prod/global/DBSReader",
82  #"SiteWhitelist" : ["T2_CH_CERN", "T1_US_FNAL"], #Site whitelist
83  "TaskChain" : None, #Define number of tasks in chain.
84  "nowmTasklist" : [], #a list of tasks as we put them in
85  "unmergedLFNBase" : "/store/unmerged",
86  "mergedLFNBase" : "/store/relval",
87  "dashboardActivity" : "relval",
88  "Memory" : 2400,
89  "SizePerEvent" : 1234,
90  "TimePerEvent" : 20
91  }
92 
93  self.defaultHarvest={
94  "EnableHarvesting" : "True",
95  "DQMUploadUrl" : "https://cmsweb.cern.ch/dqm/relval",
96  "DQMConfigCacheID" : None
97  }
98 
99  self.defaultScratch={
100  "TaskName" : None, #Task Name
101  "ConfigCacheID" : None, #Generator Config id
102  "GlobalTag": None,
103  "SplittingAlgo" : "EventBased", #Splitting Algorithm
104  "EventsPerJob" : None, #Size of jobs in terms of splitting algorithm
105  "RequestNumEvents" : None, #Total number of events to generate
106  "Seeding" : "AutomaticSeeding", #Random seeding method
107  "PrimaryDataset" : None, #Primary Dataset to be created
108  "nowmIO": {},
109  "KeepOutput" : False
110  }
111  self.defaultInput={
112  "TaskName" : "DigiHLT", #Task Name
113  "ConfigCacheID" : None, #Processing Config id
114  "GlobalTag": None,
115  "InputDataset" : None, #Input Dataset to be processed
116  "SplittingAlgo" : "LumiBased", #Splitting Algorithm
117  "LumisPerJob" : 10, #Size of jobs in terms of splitting algorithm
118  "nowmIO": {},
119  "KeepOutput" : False
120  }
121  self.defaultTask={
122  "TaskName" : None, #Task Name
123  "InputTask" : None, #Input Task Name (Task Name field of a previous Task entry)
124  "InputFromOutputModule" : None, #OutputModule name in the input task that will provide files to process
125  "ConfigCacheID" : None, #Processing Config id
126  "GlobalTag": None,
127  "SplittingAlgo" : "LumiBased", #Splitting Algorithm
128  "LumisPerJob" : 10, #Size of jobs in terms of splitting algorithm
129  "nowmIO": {},
130  "KeepOutput" : False
131  }
133  self.chainDicts={}
134 
def replace
Definition: linker.py:10

Member Function Documentation

def MatrixInjector.MatrixInjector.prepare (   self,
  mReader,
  directories,
  mode = 'init' 
)

Definition at line 135 of file MatrixInjector.py.

References python.multivaluedict.append(), MatrixInjector.MatrixInjector.chainDicts, MatrixInjector.MatrixInjector.defaultChain, MatrixInjector.MatrixInjector.defaultHarvest, MatrixInjector.MatrixInjector.defaultInput, MatrixInjector.MatrixInjector.defaultScratch, MatrixInjector.MatrixInjector.defaultTask, MatrixInjector.MatrixInjector.keep, Association.map, SiPixelLorentzAngle_cfi.read, linker.replace(), MatrixInjector.MatrixInjector.speciallabel, split, and makeHLTPrescaleTable.values.

136  def prepare(self,mReader, directories, mode='init'):
137  try:
139  import pprint
140  pprint.pprint(wmsplit)
141  except:
142  print "Not set up for step splitting"
143  wmsplit={}
144 
145  acqEra=False
146  for (n,dir) in directories.items():
147  chainDict=copy.deepcopy(self.defaultChain)
148  print "inspecting",dir
149  nextHasDSInput=None
150  for (x,s) in mReader.workFlowSteps.items():
151  #x has the format (num, prefix)
152  #s has the format (num, name, commands, stepList)
153  if x[0]==n:
154  #print "found",n,s[3]
155  chainDict['RequestString']='RV'+chainDict['CMSSWVersion']+s[1].split('+')[0]
156  index=0
157  splitForThisWf=None
158  thisLabel=self.speciallabel
159  processStrPrefix=''
160  for step in s[3]:
161 
162  if 'INPUT' in step or (not isinstance(s[2][index],str)):
163  nextHasDSInput=s[2][index]
164 
165  else:
166 
167  if (index==0):
168  #first step and not input -> gen part
169  chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultScratch))
170  try:
171  chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
172  except:
173  print "Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created"
174  return -15
175 
176  chainDict['nowmTasklist'][-1]['PrimaryDataset']='RelVal'+s[1].split('+')[0]
177  if not '--relval' in s[2][index]:
178  print 'Impossible to create task from scratch without splitting information with --relval'
179  return -12
180  else:
181  arg=s[2][index].split()
182  ns=map(int,arg[arg.index('--relval')+1].split(','))
183  chainDict['nowmTasklist'][-1]['RequestNumEvents'] = ns[0]
184  chainDict['nowmTasklist'][-1]['EventsPerJob'] = ns[1]
185  if 'FASTSIM' in s[2][index] or '--fast' in s[2][index]:
186  thisLabel+='_FastSim'
187 
188  elif nextHasDSInput:
189  chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultInput))
190  try:
191  chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
192  except:
193  print "Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created"
194  return -15
195  chainDict['nowmTasklist'][-1]['InputDataset']=nextHasDSInput.dataSet
196  splitForThisWf=nextHasDSInput.split
197  chainDict['nowmTasklist'][-1]['LumisPerJob']=splitForThisWf
198  if step in wmsplit:
199  chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit[step]
200  # get the run numbers or #events
201  if len(nextHasDSInput.run):
202  chainDict['nowmTasklist'][-1]['RunWhitelist']=nextHasDSInput.run
203  #print "what is s",s[2][index]
204  if '--data' in s[2][index] and nextHasDSInput.label:
205  thisLabel+='_RelVal_%s'%nextHasDSInput.label
206  if 'filter' in chainDict['nowmTasklist'][-1]['nowmIO']:
207  print "This has an input DS and a filter sequence: very likely to be the PyQuen sample"
208  processStrPrefix='PU_'
209  chainDict['nowmTasklist'][-1]['PrimaryDataset']='RelVal'+s[1].split('+')[0]
210  nextHasDSInput=None
211  else:
212  #not first step and no inputDS
213  chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultTask))
214  try:
215  chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
216  except:
217  print "Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created"
218  return -15
219  if splitForThisWf:
220  chainDict['nowmTasklist'][-1]['LumisPerJob']=splitForThisWf
221  if step in wmsplit:
222  chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit[step]
223  chainDict['nowmTasklist'][-1]['PrimaryDataset']='RelVal'+s[1].split('+')[0]
224  #print step
225  chainDict['nowmTasklist'][-1]['TaskName']=step
226  chainDict['nowmTasklist'][-1]['ConfigCacheID']='%s/%s.py'%(dir,step)
227  chainDict['nowmTasklist'][-1]['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT'] # copy to the proper parameter name
228  chainDict['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT'] #set in general to the last one of the chain
229  if 'pileup' in chainDict['nowmTasklist'][-1]['nowmIO']:
230  chainDict['nowmTasklist'][-1]['MCPileup']=chainDict['nowmTasklist'][-1]['nowmIO']['pileup']
231  if '--pileup' in s[2][index]:
232  processStrPrefix='PU_'
233  if acqEra:
234  #chainDict['AcquisitionEra'][step]=(chainDict['CMSSWVersion']+'-PU_'+chainDict['nowmTasklist'][-1]['GlobalTag']).replace('::All','')+thisLabel
235  chainDict['AcquisitionEra'][step]=chainDict['CMSSWVersion']
236  chainDict['ProcessingString'][step]=processStrPrefix+chainDict['nowmTasklist'][-1]['GlobalTag'].replace('::All','')+thisLabel
237  else:
238  #chainDict['nowmTasklist'][-1]['AcquisitionEra']=(chainDict['CMSSWVersion']+'-PU_'+chainDict['nowmTasklist'][-1]['GlobalTag']).replace('::All','')+thisLabel
239  chainDict['nowmTasklist'][-1]['AcquisitionEra']=chainDict['CMSSWVersion']
240  chainDict['nowmTasklist'][-1]['ProcessingString']=processStrPrefix+chainDict['nowmTasklist'][-1]['GlobalTag'].replace('::All','')+thisLabel
241 
242  index+=1
243 
244  #wrap up for this one
245  import pprint
246  #print 'wrapping up'
247  #pprint.pprint(chainDict)
248  #loop on the task list
249  for i_second in reversed(range(len(chainDict['nowmTasklist']))):
250  t_second=chainDict['nowmTasklist'][i_second]
251  #print "t_second taskname", t_second['TaskName']
252  if 'primary' in t_second['nowmIO']:
253  #print t_second['nowmIO']['primary']
254  primary=t_second['nowmIO']['primary'][0].replace('file:','')
255  for i_input in reversed(range(0,i_second)):
256  t_input=chainDict['nowmTasklist'][i_input]
257  for (om,o) in t_input['nowmIO'].items():
258  if primary in o:
259  #print "found",primary,"procuced by",om,"of",t_input['TaskName']
260  t_second['InputTask'] = t_input['TaskName']
261  t_second['InputFromOutputModule'] = om
262  #print 't_second',pprint.pformat(t_second)
263  if t_second['TaskName'].startswith('HARVEST'):
264  chainDict.update(copy.deepcopy(self.defaultHarvest))
265  chainDict['DQMConfigCacheID']=t_second['ConfigCacheID']
266  ## the info are not in the task specific dict but in the general dict
267  #t_input.update(copy.deepcopy(self.defaultHarvest))
268  #t_input['DQMConfigCacheID']=t_second['ConfigCacheID']
269  break
270 
271  ## there is in fact only one acquisition era
272  #if len(set(chainDict['AcquisitionEra'].values()))==1:
273  # print "setting only one acq"
274  if acqEra:
275  chainDict['AcquisitionEra'] = chainDict['AcquisitionEra'].values()[0]
276 
277  ## clean things up now
278  itask=0
279  if self.keep:
280  for i in self.keep:
281  if type(i)==int and i < len(chainDict['nowmTasklist']):
282  chainDict['nowmTasklist'][i]['KeepOutput']=True
283  for (i,t) in enumerate(chainDict['nowmTasklist']):
284  if t['TaskName'].startswith('HARVEST'):
285  continue
286  if not self.keep:
287  t['KeepOutput']=True
288  elif t['TaskName'] in self.keep:
289  t['KeepOutput']=True
290  t.pop('nowmIO')
291  itask+=1
292  chainDict['Task%d'%(itask)]=t
293 
294 
295  ##
296 
297 
298  ## provide the number of tasks
299  chainDict['TaskChain']=itask#len(chainDict['nowmTasklist'])
300 
301  chainDict.pop('nowmTasklist')
302  self.chainDicts[n]=chainDict
303 
304 
305  return 0
def replace
Definition: linker.py:10
dictionary map
Definition: Association.py:205
double split
Definition: MVATrainer.cc:139
def MatrixInjector.MatrixInjector.submit (   self)

Definition at line 353 of file MatrixInjector.py.

References MatrixInjector.MatrixInjector.testMode, and MatrixInjector.MatrixInjector.wmagent.

354  def submit(self):
355  try:
356  from modules.wma import makeRequest,approveRequest
357  from wmcontrol import random_sleep
358  print '\n\tFound wmcontrol\n'
359  except:
360  print '\n\tUnable to find wmcontrol modules. Please include it in your python path\n'
361  if not self.testMode:
362  print '\n\t QUIT\n'
363  sys.exit(-17)
364 
365  import pprint
366  for (n,d) in self.chainDicts.items():
367  if self.testMode:
368  print "Only viewing request",n
369  print pprint.pprint(d)
370  else:
371  #submit to wmagent each dict
372  print "For eyes before submitting",n
373  print pprint.pprint(d)
374  print "Submitting",n,"..........."
375  workFlow=makeRequest(self.wmagent,d,encodeDict=True)
376  approveRequest(self.wmagent,workFlow)
377  print "...........",n,"submitted"
378  random_sleep()
def MatrixInjector.MatrixInjector.upload (   self)

Definition at line 333 of file MatrixInjector.py.

References MatrixInjector.MatrixInjector.uploadConf().

334  def upload(self):
335  for (n,d) in self.chainDicts.items():
336  for it in d:
337  if it.startswith("Task") and it!='TaskChain':
338  #upload
339  couchID=self.uploadConf(d[it]['ConfigCacheID'],
340  str(n)+d[it]['TaskName'],
341  d['CouchURL']
342  )
343  print d[it]['ConfigCacheID']," uploaded to couchDB for",str(n),"with ID",couchID
344  d[it]['ConfigCacheID']=couchID
345  if it =='DQMConfigCacheID':
346  couchID=self.uploadConf(d['DQMConfigCacheID'],
347  str(n)+'harvesting',
348  d['CouchURL']
349  )
350  print d['DQMConfigCacheID'],"uploaded to couchDB for",str(n),"with ID",couchID
351  d['DQMConfigCacheID']=couchID
352 
def MatrixInjector.MatrixInjector.uploadConf (   self,
  filePath,
  label,
  where 
)

Definition at line 306 of file MatrixInjector.py.

References MatrixInjector.MatrixInjector.couchCache, TmCcu.count, TmApvPair.count, TmModule.count, TmPsu.count, MatrixInjector.MatrixInjector.count, ValidationMisalignedTracker.count, SiStripDetSummary::Values.count, web_results_display.WebResultsDisplay.count, MD5.count, MatrixInjector.MatrixInjector.group, ElectronLikelihoodCategoryData.label, SiPixelFedFillerWordEventNumber.label, HcalLutSet.label, TtEvent::HypoClassKeyStringToEnum.label, L1GtBoardTypeStringToEnum.label, DTResidualCalibration.DTResidualCalibration.label, DTTTrigValid.DTTTrigValid.label, DTDQMHarvesting.DTDQMHarvesting.label, DTTTrigResidualCorr.DTTTrigResidualCorr.label, MatrixInjector.MatrixInjector.label, L1GtPsbQuadStringToEnum.label, ValidationMisalignedTracker.label, L1GtConditionTypeStringToEnum.label, L1GtConditionCategoryStringToEnum.label, PhysicsTools::Calibration::Comparator.label, MatrixInjector.MatrixInjector.testMode, EcalTPGParamReaderFromDB.user, popcon::RpcDataV.user, popcon::RpcDataT.user, popcon::RpcObGasData.user, popcon::RPCObPVSSmapData.user, popcon::RpcDataI.user, popcon::RpcDataS.user, popcon::RpcDataUXC.user, popcon::RpcDataGasMix.user, popcon::RpcDataFebmap.user, and MatrixInjector.MatrixInjector.user.

Referenced by MatrixInjector.MatrixInjector.upload().

307  def uploadConf(self,filePath,label,where):
308  labelInCouch=self.label+'_'+label
309  cacheName=filePath.split('/')[-1]
310  if self.testMode:
311  self.count+=1
312  print '\tFake upload of',filePath,'to couch with label',labelInCouch
313  return self.count
314  else:
315  try:
316  from modules.wma import upload_to_couch,DATABASE_NAME
317  except:
318  print '\n\tUnable to find wmcontrol modules. Please include it in your python path\n'
319  print '\n\t QUIT\n'
320  sys.exit(-16)
321 
322  if cacheName in self.couchCache:
323  print "Not re-uploading",filePath,"to",where,"for",label
324  cacheId=self.couchCache[cacheName]
325  else:
326  print "Loading",filePath,"to",where,"for",label
327  ## totally fork the upload to couch to prevent cross loading of process configurations
328  pool = multiprocessing.Pool(1)
329  cacheIds = pool.map( upload_to_couch_oneArg, [(filePath,labelInCouch,self.user,self.group,where)] )
330  cacheId = cacheIds[0]
331  self.couchCache[cacheName]=cacheId
332  return cacheId

Member Data Documentation

MatrixInjector.MatrixInjector.chainDicts

Definition at line 132 of file MatrixInjector.py.

Referenced by MatrixInjector.MatrixInjector.prepare().

MatrixInjector.MatrixInjector.couch

Definition at line 50 of file MatrixInjector.py.

MatrixInjector.MatrixInjector.couchCache

Definition at line 52 of file MatrixInjector.py.

Referenced by MatrixInjector.MatrixInjector.uploadConf().

MatrixInjector.MatrixInjector.count

Definition at line 39 of file MatrixInjector.py.

Referenced by NodeCut.MatchLessSeen.match(), NodeCut.MatchLessHit.match(), and MatrixInjector.MatrixInjector.uploadConf().

MatrixInjector.MatrixInjector.defaultChain

Definition at line 69 of file MatrixInjector.py.

Referenced by MatrixInjector.MatrixInjector.prepare().

MatrixInjector.MatrixInjector.defaultHarvest

Definition at line 92 of file MatrixInjector.py.

Referenced by MatrixInjector.MatrixInjector.prepare().

MatrixInjector.MatrixInjector.defaultInput

Definition at line 110 of file MatrixInjector.py.

Referenced by MatrixInjector.MatrixInjector.prepare().

MatrixInjector.MatrixInjector.defaultScratch

Definition at line 98 of file MatrixInjector.py.

Referenced by MatrixInjector.MatrixInjector.prepare().

MatrixInjector.MatrixInjector.defaultTask

Definition at line 120 of file MatrixInjector.py.

Referenced by MatrixInjector.MatrixInjector.prepare().

MatrixInjector.MatrixInjector.group

Definition at line 54 of file MatrixInjector.py.

Referenced by MatrixInjector.MatrixInjector.uploadConf().

MatrixInjector.MatrixInjector.keep

Definition at line 42 of file MatrixInjector.py.

Referenced by MatrixInjector.MatrixInjector.prepare().

MatrixInjector.MatrixInjector.label

Definition at line 55 of file MatrixInjector.py.

Referenced by Vispa.Plugins.ConfigEditor.ConfigDataAccessor.ConfigDataAccessor._sort_list(), python.rootplot.root2matplotlib.Hist.bar(), python.rootplot.root2matplotlib.Hist.barh(), python.rootplot.root2matplotlib.Hist.errorbar(), python.rootplot.root2matplotlib.Hist.errorbarh(), Vispa.Plugins.ConfigEditor.ConfigDataAccessor.ConfigDataAccessor.foundIn(), Vispa.Plugins.ConfigEditor.ConfigDataAccessor.ConfigDataAccessor.fullFilename(), Vispa.Plugins.ConfigEditor.ConfigDataAccessor.ConfigDataAccessor.inputEventContent(), Vispa.Plugins.ConfigEditor.ConfigDataAccessor.ConfigDataAccessor.outputEventContent(), Vispa.Plugins.ConfigEditor.ToolDataAccessor.ToolDataAccessor.properties(), Vispa.Plugins.EdmBrowser.EdmDataAccessor.EdmDataAccessor.properties(), Vispa.Plugins.ConfigEditor.ConfigDataAccessor.ConfigDataAccessor.properties(), Vispa.Plugins.ConfigEditor.ConfigDataAccessor.ConfigDataAccessor.readConnections(), Vispa.Plugins.ConfigEditor.ToolDataAccessor.ToolDataAccessor.updateProcess(), MatrixInjector.MatrixInjector.uploadConf(), and Vispa.Plugins.ConfigEditor.ConfigDataAccessor.ConfigDataAccessor.usedBy().

MatrixInjector.MatrixInjector.speciallabel

Definition at line 56 of file MatrixInjector.py.

Referenced by MatrixInjector.MatrixInjector.prepare().

MatrixInjector.MatrixInjector.testMode

Definition at line 40 of file MatrixInjector.py.

Referenced by MatrixInjector.MatrixInjector.submit(), and MatrixInjector.MatrixInjector.uploadConf().

MatrixInjector.MatrixInjector.user

Definition at line 53 of file MatrixInjector.py.

Referenced by cmsPerfSuite.PerfSuite.optionParse(), and MatrixInjector.MatrixInjector.uploadConf().

MatrixInjector.MatrixInjector.version

Definition at line 41 of file MatrixInjector.py.

Referenced by argparse._VersionAction.__call__(), python.rootplot.argparse._VersionAction.__call__(), argparse.ArgumentParser.__init__(), argparse.ArgumentParser.format_version(), and python.rootplot.argparse.ArgumentParser.format_version().

MatrixInjector.MatrixInjector.wmagent

Definition at line 45 of file MatrixInjector.py.

Referenced by MatrixInjector.MatrixInjector.submit().