CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Public Attributes
MatrixInjector.MatrixInjector Class Reference
Inheritance diagram for MatrixInjector.MatrixInjector:

Public Member Functions

def __init__
 
def prepare
 
def submit
 
def upload
 
def uploadConf
 

Public Attributes

 chainDicts
 
 couch
 
 couchCache
 
 count
 
 defaultChain
 
 defaultHarvest
 
 defaultInput
 
 defaultScratch
 
 defaultTask
 
 group
 
 keep
 
 label
 
 speciallabel
 
 testMode
 
 user
 
 version
 
 wmagent
 

Detailed Description

Definition at line 36 of file MatrixInjector.py.

Constructor & Destructor Documentation

def MatrixInjector.MatrixInjector.__init__ (   self,
  opt,
  mode = 'init' 
)

Definition at line 38 of file MatrixInjector.py.

38 
39  def __init__(self,opt,mode='init'):
40  self.count=1040
41  self.testMode=((mode!='submit') and (mode!='force'))
42  self.version =1
43  self.keep = opt.keep
44 
45  #wagemt stuff
46  self.wmagent=os.getenv('WMAGENT_REQMGR')
47  if not self.wmagent:
48  self.wmagent = 'cmsweb.cern.ch'
49 
50  #couch stuff
51  self.couch = 'https://'+self.wmagent+'/couchdb'
52 # self.couchDB = 'reqmgr_config_cache'
53  self.couchCache={} # so that we do not upload like crazy, and recyle cfgs
54  self.user = os.getenv('USER')
55  self.group = 'ppd'
56  self.label = 'RelValSet_'+os.getenv('CMSSW_VERSION').replace('-','')+'_v'+str(self.version)
57  self.speciallabel=''
58  if opt.label:
59  self.speciallabel= '_'+opt.label
60 
61 
62  if not os.getenv('WMCORE_ROOT'):
63  print '\n\twmclient is not setup properly. Will not be able to upload or submit requests.\n'
64  if not self.testMode:
65  print '\n\t QUIT\n'
66  sys.exit(-18)
67  else:
68  print '\n\tFound wmclient\n'
69 
70  self.defaultChain={
71  "RequestType" : "TaskChain", #this is how we handle relvals
72  "SubRequestType" : "RelVal", #this is how we handle relvals, now that TaskChain is also used for central MC production
73  "RequestPriority": 999999,
74  "Requestor": self.user, #Person responsible
75  "Group": self.group, #group for the request
76  "CMSSWVersion": os.getenv('CMSSW_VERSION'), #CMSSW Version (used for all tasks in chain)
77  "Campaign": os.getenv('CMSSW_VERSION'), # only for wmstat purpose
78  "ScramArch": os.getenv('SCRAM_ARCH'), #Scram Arch (used for all tasks in chain)
79  "ProcessingVersion": self.version, #Processing Version (used for all tasks in chain)
80  "GlobalTag": None, #Global Tag (overridden per task)
81  "CouchURL": self.couch, #URL of CouchDB containing Config Cache
82  "ConfigCacheURL": self.couch, #URL of CouchDB containing Config Cache
83  "DbsUrl": "https://cmsweb.cern.ch/dbs/prod/global/DBSReader",
84  #"SiteWhitelist" : ["T2_CH_CERN", "T1_US_FNAL"], #Site whitelist
85  "TaskChain" : None, #Define number of tasks in chain.
86  "nowmTasklist" : [], #a list of tasks as we put them in
87  "unmergedLFNBase" : "/store/unmerged",
88  "mergedLFNBase" : "/store/relval",
89  "dashboardActivity" : "relval",
90  "Memory" : 2400,
91  "SizePerEvent" : 1234,
92  "TimePerEvent" : 1
93  }
94 
95  self.defaultHarvest={
96  "EnableHarvesting" : "True",
97  "DQMUploadUrl" : "https://cmsweb.cern.ch/dqm/relval",
98  "DQMConfigCacheID" : None
99  }
100 
101  self.defaultScratch={
102  "TaskName" : None, #Task Name
103  "ConfigCacheID" : None, #Generator Config id
104  "GlobalTag": None,
105  "SplittingAlgo" : "EventBased", #Splitting Algorithm
106  "EventsPerJob" : None, #Size of jobs in terms of splitting algorithm
107  "RequestNumEvents" : None, #Total number of events to generate
108  "Seeding" : "AutomaticSeeding", #Random seeding method
109  "PrimaryDataset" : None, #Primary Dataset to be created
110  "nowmIO": {},
111  "KeepOutput" : False
112  }
113  self.defaultInput={
114  "TaskName" : "DigiHLT", #Task Name
115  "ConfigCacheID" : None, #Processing Config id
116  "GlobalTag": None,
117  "InputDataset" : None, #Input Dataset to be processed
118  "SplittingAlgo" : "LumiBased", #Splitting Algorithm
119  "LumisPerJob" : 10, #Size of jobs in terms of splitting algorithm
120  "nowmIO": {},
121  "KeepOutput" : False
122  }
123  self.defaultTask={
124  "TaskName" : None, #Task Name
125  "InputTask" : None, #Input Task Name (Task Name field of a previous Task entry)
126  "InputFromOutputModule" : None, #OutputModule name in the input task that will provide files to process
127  "ConfigCacheID" : None, #Processing Config id
128  "GlobalTag": None,
129  "SplittingAlgo" : "LumiBased", #Splitting Algorithm
130  "LumisPerJob" : 10, #Size of jobs in terms of splitting algorithm
131  "nowmIO": {},
132  "KeepOutput" : False
133  }
135  self.chainDicts={}
136 

Member Function Documentation

def MatrixInjector.MatrixInjector.prepare (   self,
  mReader,
  directories,
  mode = 'init' 
)

Definition at line 137 of file MatrixInjector.py.

References python.multivaluedict.append(), MatrixInjector.MatrixInjector.chainDicts, MatrixInjector.MatrixInjector.defaultChain, MatrixInjector.MatrixInjector.defaultHarvest, MatrixInjector.MatrixInjector.defaultInput, MatrixInjector.MatrixInjector.defaultScratch, MatrixInjector.MatrixInjector.defaultTask, MatrixInjector.MatrixInjector.keep, python.multivaluedict.map(), SiPixelLorentzAngle_cfi.read, python.rootplot.root2matplotlib.replace(), MatrixInjector.MatrixInjector.speciallabel, split, and makeHLTPrescaleTable.values.

138  def prepare(self,mReader, directories, mode='init'):
139  try:
141  import pprint
142  pprint.pprint(wmsplit)
143  except:
144  print "Not set up for step splitting"
145  wmsplit={}
146 
147  acqEra=False
148  for (n,dir) in directories.items():
149  chainDict=copy.deepcopy(self.defaultChain)
150  print "inspecting",dir
151  nextHasDSInput=None
152  for (x,s) in mReader.workFlowSteps.items():
153  #x has the format (num, prefix)
154  #s has the format (num, name, commands, stepList)
155  if x[0]==n:
156  #print "found",n,s[3]
157  chainDict['RequestString']='RV'+chainDict['CMSSWVersion']+s[1].split('+')[0]
158  index=0
159  splitForThisWf=None
160  thisLabel=self.speciallabel
161  processStrPrefix=''
162  for step in s[3]:
163 
164  if 'INPUT' in step or (not isinstance(s[2][index],str)):
165  nextHasDSInput=s[2][index]
166 
167  else:
168 
169  if (index==0):
170  #first step and not input -> gen part
171  chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultScratch))
172  try:
173  chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
174  except:
175  print "Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created"
176  return -15
177 
178  chainDict['nowmTasklist'][-1]['PrimaryDataset']='RelVal'+s[1].split('+')[0]
179  if not '--relval' in s[2][index]:
180  print 'Impossible to create task from scratch without splitting information with --relval'
181  return -12
182  else:
183  arg=s[2][index].split()
184  ns=map(int,arg[arg.index('--relval')+1].split(','))
185  chainDict['nowmTasklist'][-1]['RequestNumEvents'] = ns[0]
186  chainDict['nowmTasklist'][-1]['EventsPerJob'] = ns[1]
187  if 'FASTSIM' in s[2][index] or '--fast' in s[2][index]:
188  thisLabel+='_FastSim'
189 
190  elif nextHasDSInput:
191  chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultInput))
192  try:
193  chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
194  except:
195  print "Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created"
196  return -15
197  chainDict['nowmTasklist'][-1]['InputDataset']=nextHasDSInput.dataSet
198  splitForThisWf=nextHasDSInput.split
199  chainDict['nowmTasklist'][-1]['LumisPerJob']=splitForThisWf
200  if step in wmsplit:
201  chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit[step]
202  # get the run numbers or #events
203  if len(nextHasDSInput.run):
204  chainDict['nowmTasklist'][-1]['RunWhitelist']=nextHasDSInput.run
205  #print "what is s",s[2][index]
206  if '--data' in s[2][index] and nextHasDSInput.label:
207  thisLabel+='_RelVal_%s'%nextHasDSInput.label
208  if 'filter' in chainDict['nowmTasklist'][-1]['nowmIO']:
209  print "This has an input DS and a filter sequence: very likely to be the PyQuen sample"
210  processStrPrefix='PU_'
211  chainDict['nowmTasklist'][-1]['PrimaryDataset']='RelVal'+s[1].split('+')[0]
212  nextHasDSInput=None
213  else:
214  #not first step and no inputDS
215  chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultTask))
216  try:
217  chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
218  except:
219  print "Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created"
220  return -15
221  if splitForThisWf:
222  chainDict['nowmTasklist'][-1]['LumisPerJob']=splitForThisWf
223  if step in wmsplit:
224  chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit[step]
225  chainDict['nowmTasklist'][-1]['PrimaryDataset']='RelVal'+s[1].split('+')[0]
226  #print step
227  chainDict['nowmTasklist'][-1]['TaskName']=step
228  chainDict['nowmTasklist'][-1]['ConfigCacheID']='%s/%s.py'%(dir,step)
229  chainDict['nowmTasklist'][-1]['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT'] # copy to the proper parameter name
230  chainDict['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT'] #set in general to the last one of the chain
231  if 'pileup' in chainDict['nowmTasklist'][-1]['nowmIO']:
232  chainDict['nowmTasklist'][-1]['MCPileup']=chainDict['nowmTasklist'][-1]['nowmIO']['pileup']
233  if '--pileup' in s[2][index]:
234  processStrPrefix='PU_'
235  if acqEra:
236  #chainDict['AcquisitionEra'][step]=(chainDict['CMSSWVersion']+'-PU_'+chainDict['nowmTasklist'][-1]['GlobalTag']).replace('::All','')+thisLabel
237  chainDict['AcquisitionEra'][step]=chainDict['CMSSWVersion']
238  chainDict['ProcessingString'][step]=processStrPrefix+chainDict['nowmTasklist'][-1]['GlobalTag'].replace('::All','')+thisLabel
239  else:
240  #chainDict['nowmTasklist'][-1]['AcquisitionEra']=(chainDict['CMSSWVersion']+'-PU_'+chainDict['nowmTasklist'][-1]['GlobalTag']).replace('::All','')+thisLabel
241  chainDict['nowmTasklist'][-1]['AcquisitionEra']=chainDict['CMSSWVersion']
242  chainDict['nowmTasklist'][-1]['ProcessingString']=processStrPrefix+chainDict['nowmTasklist'][-1]['GlobalTag'].replace('::All','')+thisLabel
243 
244  index+=1
245 
246  #wrap up for this one
247  import pprint
248  #print 'wrapping up'
249  #pprint.pprint(chainDict)
250  #loop on the task list
251  for i_second in reversed(range(len(chainDict['nowmTasklist']))):
252  t_second=chainDict['nowmTasklist'][i_second]
253  #print "t_second taskname", t_second['TaskName']
254  if 'primary' in t_second['nowmIO']:
255  #print t_second['nowmIO']['primary']
256  primary=t_second['nowmIO']['primary'][0].replace('file:','')
257  for i_input in reversed(range(0,i_second)):
258  t_input=chainDict['nowmTasklist'][i_input]
259  for (om,o) in t_input['nowmIO'].items():
260  if primary in o:
261  #print "found",primary,"procuced by",om,"of",t_input['TaskName']
262  t_second['InputTask'] = t_input['TaskName']
263  t_second['InputFromOutputModule'] = om
264  #print 't_second',pprint.pformat(t_second)
265  if t_second['TaskName'].startswith('HARVEST'):
266  chainDict.update(copy.deepcopy(self.defaultHarvest))
267  chainDict['DQMConfigCacheID']=t_second['ConfigCacheID']
268  ## the info are not in the task specific dict but in the general dict
269  #t_input.update(copy.deepcopy(self.defaultHarvest))
270  #t_input['DQMConfigCacheID']=t_second['ConfigCacheID']
271  break
272 
273  ## there is in fact only one acquisition era
274  #if len(set(chainDict['AcquisitionEra'].values()))==1:
275  # print "setting only one acq"
276  if acqEra:
277  chainDict['AcquisitionEra'] = chainDict['AcquisitionEra'].values()[0]
278 
279  ## clean things up now
280  itask=0
281  if self.keep:
282  for i in self.keep:
283  if type(i)==int and i < len(chainDict['nowmTasklist']):
284  chainDict['nowmTasklist'][i]['KeepOutput']=True
285  for (i,t) in enumerate(chainDict['nowmTasklist']):
286  if t['TaskName'].startswith('HARVEST'):
287  continue
288  if not self.keep:
289  t['KeepOutput']=True
290  elif t['TaskName'] in self.keep:
291  t['KeepOutput']=True
292  t.pop('nowmIO')
293  itask+=1
294  chainDict['Task%d'%(itask)]=t
295 
296 
297  ##
298 
299 
300  ## provide the number of tasks
301  chainDict['TaskChain']=itask#len(chainDict['nowmTasklist'])
302 
303  chainDict.pop('nowmTasklist')
304  self.chainDicts[n]=chainDict
305 
306 
307  return 0
double split
Definition: MVATrainer.cc:139
def MatrixInjector.MatrixInjector.submit (   self)

Definition at line 355 of file MatrixInjector.py.

References MatrixInjector.MatrixInjector.testMode, and MatrixInjector.MatrixInjector.wmagent.

356  def submit(self):
357  try:
358  from modules.wma import makeRequest,approveRequest
359  from wmcontrol import random_sleep
360  print '\n\tFound wmcontrol\n'
361  except:
362  print '\n\tUnable to find wmcontrol modules. Please include it in your python path\n'
363  if not self.testMode:
364  print '\n\t QUIT\n'
365  sys.exit(-17)
366 
367  import pprint
368  for (n,d) in self.chainDicts.items():
369  if self.testMode:
370  print "Only viewing request",n
371  print pprint.pprint(d)
372  else:
373  #submit to wmagent each dict
374  print "For eyes before submitting",n
375  print pprint.pprint(d)
376  print "Submitting",n,"..........."
377  workFlow=makeRequest(self.wmagent,d,encodeDict=True)
378  approveRequest(self.wmagent,workFlow)
379  print "...........",n,"submitted"
380  random_sleep()
def MatrixInjector.MatrixInjector.upload (   self)

Definition at line 335 of file MatrixInjector.py.

References MatrixInjector.MatrixInjector.uploadConf().

336  def upload(self):
337  for (n,d) in self.chainDicts.items():
338  for it in d:
339  if it.startswith("Task") and it!='TaskChain':
340  #upload
341  couchID=self.uploadConf(d[it]['ConfigCacheID'],
342  str(n)+d[it]['TaskName'],
343  d['CouchURL']
344  )
345  print d[it]['ConfigCacheID']," uploaded to couchDB for",str(n),"with ID",couchID
346  d[it]['ConfigCacheID']=couchID
347  if it =='DQMConfigCacheID':
348  couchID=self.uploadConf(d['DQMConfigCacheID'],
349  str(n)+'harvesting',
350  d['CouchURL']
351  )
352  print d['DQMConfigCacheID'],"uploaded to couchDB for",str(n),"with ID",couchID
353  d['DQMConfigCacheID']=couchID
354 
def MatrixInjector.MatrixInjector.uploadConf (   self,
  filePath,
  label,
  where 
)

Definition at line 308 of file MatrixInjector.py.

References MatrixInjector.MatrixInjector.couchCache, TmCcu.count, TmApvPair.count, TmModule.count, TmPsu.count, MatrixInjector.MatrixInjector.count, ValidationMisalignedTracker.count, SiStripDetSummary::Values.count, web_results_display.WebResultsDisplay.count, MD5.count, MatrixInjector.MatrixInjector.group, ElectronLikelihoodCategoryData.label, SiPixelFedFillerWordEventNumber.label, HcalLutSet.label, TtEvent::HypoClassKeyStringToEnum.label, L1GtBoardTypeStringToEnum.label, DTResidualCalibration.DTResidualCalibration.label, DTTTrigValid.DTTTrigValid.label, DTDQMHarvesting.DTDQMHarvesting.label, DTTTrigResidualCorr.DTTTrigResidualCorr.label, MatrixInjector.MatrixInjector.label, L1GtPsbQuadStringToEnum.label, ValidationMisalignedTracker.label, L1GtConditionTypeStringToEnum.label, L1GtConditionCategoryStringToEnum.label, PhysicsTools::Calibration::Comparator.label, MatrixInjector.MatrixInjector.testMode, EcalTPGParamReaderFromDB.user, popcon::RpcDataV.user, popcon::RpcDataT.user, popcon::RpcObGasData.user, popcon::RPCObPVSSmapData.user, popcon::RpcDataI.user, popcon::RpcDataS.user, popcon::RpcDataUXC.user, popcon::RpcDataGasMix.user, popcon::RpcDataFebmap.user, and MatrixInjector.MatrixInjector.user.

Referenced by MatrixInjector.MatrixInjector.upload().

309  def uploadConf(self,filePath,label,where):
310  labelInCouch=self.label+'_'+label
311  cacheName=filePath.split('/')[-1]
312  if self.testMode:
313  self.count+=1
314  print '\tFake upload of',filePath,'to couch with label',labelInCouch
315  return self.count
316  else:
317  try:
318  from modules.wma import upload_to_couch,DATABASE_NAME
319  except:
320  print '\n\tUnable to find wmcontrol modules. Please include it in your python path\n'
321  print '\n\t QUIT\n'
322  sys.exit(-16)
323 
324  if cacheName in self.couchCache:
325  print "Not re-uploading",filePath,"to",where,"for",label
326  cacheId=self.couchCache[cacheName]
327  else:
328  print "Loading",filePath,"to",where,"for",label
329  ## totally fork the upload to couch to prevent cross loading of process configurations
330  pool = multiprocessing.Pool(1)
331  cacheIds = pool.map( upload_to_couch_oneArg, [(filePath,labelInCouch,self.user,self.group,where)] )
332  cacheId = cacheIds[0]
333  self.couchCache[cacheName]=cacheId
334  return cacheId

Member Data Documentation

MatrixInjector.MatrixInjector.chainDicts

Definition at line 134 of file MatrixInjector.py.

Referenced by MatrixInjector.MatrixInjector.prepare().

MatrixInjector.MatrixInjector.couch

Definition at line 50 of file MatrixInjector.py.

MatrixInjector.MatrixInjector.couchCache

Definition at line 52 of file MatrixInjector.py.

Referenced by MatrixInjector.MatrixInjector.uploadConf().

MatrixInjector.MatrixInjector.count

Definition at line 39 of file MatrixInjector.py.

Referenced by NodeCut.MatchLessSeen.match(), NodeCut.MatchLessHit.match(), and MatrixInjector.MatrixInjector.uploadConf().

MatrixInjector.MatrixInjector.defaultChain

Definition at line 69 of file MatrixInjector.py.

Referenced by MatrixInjector.MatrixInjector.prepare().

MatrixInjector.MatrixInjector.defaultHarvest

Definition at line 94 of file MatrixInjector.py.

Referenced by MatrixInjector.MatrixInjector.prepare().

MatrixInjector.MatrixInjector.defaultInput

Definition at line 112 of file MatrixInjector.py.

Referenced by MatrixInjector.MatrixInjector.prepare().

MatrixInjector.MatrixInjector.defaultScratch

Definition at line 100 of file MatrixInjector.py.

Referenced by MatrixInjector.MatrixInjector.prepare().

MatrixInjector.MatrixInjector.defaultTask

Definition at line 122 of file MatrixInjector.py.

Referenced by MatrixInjector.MatrixInjector.prepare().

MatrixInjector.MatrixInjector.group

Definition at line 54 of file MatrixInjector.py.

Referenced by MatrixInjector.MatrixInjector.uploadConf().

MatrixInjector.MatrixInjector.keep

Definition at line 42 of file MatrixInjector.py.

Referenced by MatrixInjector.MatrixInjector.prepare().

MatrixInjector.MatrixInjector.label

Definition at line 55 of file MatrixInjector.py.

Referenced by Vispa.Plugins.ConfigEditor.ConfigDataAccessor.ConfigDataAccessor._sort_list(), python.rootplot.root2matplotlib.Hist.bar(), python.rootplot.root2matplotlib.Hist.barh(), python.rootplot.root2matplotlib.Hist.errorbar(), python.rootplot.root2matplotlib.Hist.errorbarh(), Vispa.Plugins.ConfigEditor.ConfigDataAccessor.ConfigDataAccessor.foundIn(), Vispa.Plugins.ConfigEditor.ConfigDataAccessor.ConfigDataAccessor.fullFilename(), Vispa.Plugins.ConfigEditor.ConfigDataAccessor.ConfigDataAccessor.inputEventContent(), Vispa.Plugins.ConfigEditor.ConfigDataAccessor.ConfigDataAccessor.outputEventContent(), Vispa.Plugins.ConfigEditor.ToolDataAccessor.ToolDataAccessor.properties(), Vispa.Plugins.EdmBrowser.EdmDataAccessor.EdmDataAccessor.properties(), Vispa.Plugins.ConfigEditor.ConfigDataAccessor.ConfigDataAccessor.properties(), Vispa.Plugins.ConfigEditor.ConfigDataAccessor.ConfigDataAccessor.readConnections(), Vispa.Plugins.ConfigEditor.ToolDataAccessor.ToolDataAccessor.updateProcess(), MatrixInjector.MatrixInjector.uploadConf(), and Vispa.Plugins.ConfigEditor.ConfigDataAccessor.ConfigDataAccessor.usedBy().

MatrixInjector.MatrixInjector.speciallabel

Definition at line 56 of file MatrixInjector.py.

Referenced by MatrixInjector.MatrixInjector.prepare().

MatrixInjector.MatrixInjector.testMode

Definition at line 40 of file MatrixInjector.py.

Referenced by MatrixInjector.MatrixInjector.submit(), and MatrixInjector.MatrixInjector.uploadConf().

MatrixInjector.MatrixInjector.user

Definition at line 53 of file MatrixInjector.py.

Referenced by cmsPerfSuite.PerfSuite.optionParse(), and MatrixInjector.MatrixInjector.uploadConf().

MatrixInjector.MatrixInjector.version

Definition at line 41 of file MatrixInjector.py.

Referenced by argparse._VersionAction.__call__(), python.rootplot.argparse._VersionAction.__call__(), argparse.ArgumentParser.__init__(), argparse.ArgumentParser.format_version(), and python.rootplot.argparse.ArgumentParser.format_version().

MatrixInjector.MatrixInjector.wmagent

Definition at line 45 of file MatrixInjector.py.

Referenced by MatrixInjector.MatrixInjector.submit().