CMS 3D CMS Logo

Public Member Functions | Public Attributes

MatrixInjector::MatrixInjector Class Reference

List of all members.

Public Member Functions

def __init__
def prepare
def submit
def upload
def uploadConf

Public Attributes

 chainDicts
 couch
 couchCache
 count
 defaultChain
 defaultHarvest
 defaultInput
 defaultScratch
 defaultTask
 dqmgui
 group
 keep
 label
 speciallabel
 testMode
 user
 version
 wmagent

Detailed Description

Definition at line 36 of file MatrixInjector.py.


Constructor & Destructor Documentation

def MatrixInjector::MatrixInjector::__init__ (   self,
  opt,
  mode = 'init',
  options = '' 
)

Definition at line 38 of file MatrixInjector.py.

00039                                                  :
00040         self.count=1040
00041 
00042         self.dqmgui=None
00043         self.wmagent=None
00044         for k in options.split(','):
00045             if k.startswith('dqm:'):
00046                 self.dqmgui=k.split(':',1)[-1]
00047             elif k.startswith('wma:'):
00048                 self.wmagent=k.split(':',1)[-1]
00049 
00050         self.testMode=((mode!='submit') and (mode!='force'))
00051         self.version =1
00052         self.keep = opt.keep
00053 
00054         #wagemt stuff
00055         if not self.wmagent:
00056             self.wmagent=os.getenv('WMAGENT_REQMGR')
00057         if not self.wmagent:
00058             self.wmagent = 'cmsweb.cern.ch'
00059 
00060         if not self.dqmgui:
00061             self.dqmgui="https://cmsweb.cern.ch/dqm/relval"
00062         #couch stuff
00063         self.couch = 'https://'+self.wmagent+'/couchdb'
00064 #        self.couchDB = 'reqmgr_config_cache'
00065         self.couchCache={} # so that we do not upload like crazy, and recyle cfgs
00066         self.user = os.getenv('USER')
00067         self.group = 'ppd'
00068         self.label = 'RelValSet_'+os.getenv('CMSSW_VERSION').replace('-','')+'_v'+str(self.version)
00069         self.speciallabel=''
00070         if opt.label:
00071             self.speciallabel= '_'+opt.label
00072 
00073 
00074         if not os.getenv('WMCORE_ROOT'):
00075             print '\n\twmclient is not setup properly. Will not be able to upload or submit requests.\n'
00076             if not self.testMode:
00077                 print '\n\t QUIT\n'
00078                 sys.exit(-18)
00079         else:
00080             print '\n\tFound wmclient\n'
00081             
00082         self.defaultChain={
00083             "RequestType" :   "TaskChain",                    #this is how we handle relvals
00084             "Requestor": self.user,                           #Person responsible
00085             "Group": self.group,                              #group for the request
00086             "CMSSWVersion": os.getenv('CMSSW_VERSION'),       #CMSSW Version (used for all tasks in chain)
00087             "Campaign": os.getenv('CMSSW_VERSION'),           # only for wmstat purpose
00088             "ScramArch": os.getenv('SCRAM_ARCH'),             #Scram Arch (used for all tasks in chain)
00089             "ProcessingVersion": self.version,                #Processing Version (used for all tasks in chain)
00090             "GlobalTag": None,                                #Global Tag (overridden per task)
00091             "CouchURL": self.couch,                           #URL of CouchDB containing Config Cache
00092             "ConfigCacheURL": self.couch,                           #URL of CouchDB containing Config Cache
00093             "DbsUrl": "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet",
00094             #"CouchDBName": self.couchDB,                      #Name of Couch Database containing config cache
00095             #- Will contain all configs for all Tasks
00096             #"SiteWhitelist" : ["T2_CH_CERN", "T1_US_FNAL"],   #Site whitelist
00097             "TaskChain" : None,                                  #Define number of tasks in chain.
00098             "nowmTasklist" : [],  #a list of tasks as we put them in
00099             "unmergedLFNBase" : "/store/unmerged",
00100             "mergedLFNBase" : "/store/relval",
00101             "dashboardActivity" : "relval",
00102             "Memory" : 2400,
00103             "SizePerEvent" : 1234,
00104             "TimePerEvent" : 20
00105             }
00106 
00107         self.defaultHarvest={
00108             "EnableHarvesting" : "True",
00109             "DQMUploadUrl" : self.dqmgui,
00110             "DQMConfigCacheID" : None
00111             }
00112         
00113         self.defaultScratch={
00114             "TaskName" : None,                            #Task Name
00115             "ConfigCacheID" : None,                   #Generator Config id
00116             "GlobalTag": None,
00117             "SplittingAlgo"  : "EventBased",             #Splitting Algorithm
00118             "EventsPerJob" : None,                       #Size of jobs in terms of splitting algorithm
00119             "RequestNumEvents" : None,                      #Total number of events to generate
00120             "Seeding" : "AutomaticSeeding",                          #Random seeding method
00121             "PrimaryDataset" : None,                          #Primary Dataset to be created
00122             "nowmIO": {},
00123             "KeepOutput" : False
00124             }
00125         self.defaultInput={
00126             "TaskName" : "DigiHLT",                                      #Task Name
00127             "ConfigCacheID" : None,                                      #Processing Config id
00128             "GlobalTag": None,
00129             "InputDataset" : None,                                       #Input Dataset to be processed
00130             "SplittingAlgo"  : "LumiBased",                        #Splitting Algorithm
00131             "LumisPerJob" : 10,               #Size of jobs in terms of splitting algorithm
00132             "nowmIO": {},
00133             "KeepOutput" : False
00134             }
00135         self.defaultTask={
00136             "TaskName" : None,                                 #Task Name
00137             "InputTask" : None,                                #Input Task Name (Task Name field of a previous Task entry)
00138             "InputFromOutputModule" : None,                    #OutputModule name in the input task that will provide files to process
00139             "ConfigCacheID" : None,                            #Processing Config id
00140             "GlobalTag": None,
00141             "SplittingAlgo"  : "LumiBased",                        #Splitting Algorithm
00142             "LumisPerJob" : 10,               #Size of jobs in terms of splitting algorithm
00143             "nowmIO": {},
00144             "KeepOutput" : False
00145             }
00146 
00147         self.chainDicts={}
00148 


Member Function Documentation

def MatrixInjector::MatrixInjector::prepare (   self,
  mReader,
  directories,
  mode = 'init' 
)

Definition at line 149 of file MatrixInjector.py.

00150                                                        :
00151         try:
00152             #from Configuration.PyReleaseValidation.relval_steps import wmsplit
00153             wmsplit = {}
00154             wmsplit['DIGIHI']=5
00155             wmsplit['RECOHI']=5
00156             wmsplit['HLTD']=5
00157             wmsplit['RECODreHLT']=2  
00158             wmsplit['DIGIPU']=4
00159             wmsplit['DIGIPU1']=4
00160             wmsplit['RECOPU1']=1
00161             wmsplit['DIGIHISt3']=5
00162             wmsplit['RECOHISt4']=5
00163             wmsplit['SingleMuPt10_ID']=1
00164             wmsplit['DIGI_ID']=1
00165             wmsplit['RECO_ID']=1
00166             wmsplit['TTbar_ID']=1
00167             wmsplit['SingleMuPt10FS_ID']=1
00168             wmsplit['TTbarFS_ID']=1
00169                                     
00170             #import pprint
00171             #pprint.pprint(wmsplit)            
00172         except:
00173             print "Not set up for step splitting"
00174             wmsplit={}
00175 
00176         acqEra=False
00177         for (n,dir) in directories.items():
00178             chainDict=copy.deepcopy(self.defaultChain)
00179             print "inspecting",dir
00180             nextHasDSInput=None
00181             for (x,s) in mReader.workFlowSteps.items():
00182                 #x has the format (num, prefix)
00183                 #s has the format (num, name, commands, stepList)
00184                 if x[0]==n:
00185                     #print "found",n,s[3]
00186                     #chainDict['RequestString']='RV'+chainDict['CMSSWVersion']+s[1].split('+')[0]
00187                     index=0
00188                     splitForThisWf=None
00189                     thisLabel=self.speciallabel
00190                     processStrPrefix=''
00191                     setPrimaryDs=None
00192                     for step in s[3]:
00193                         
00194                         if 'INPUT' in step or (not isinstance(s[2][index],str)):
00195                             nextHasDSInput=s[2][index]
00196 
00197                         else:
00198 
00199                             if (index==0):
00200                                 #first step and not input -> gen part
00201                                 chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultScratch))
00202                                 try:
00203                                     chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
00204                                 except:
00205                                     print "Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created"
00206                                     return -15
00207 
00208                                 chainDict['nowmTasklist'][-1]['PrimaryDataset']='RelVal'+s[1].split('+')[0]
00209                                 if not '--relval' in s[2][index]:
00210                                     print 'Impossible to create task from scratch without splitting information with --relval'
00211                                     return -12
00212                                 else:
00213                                     arg=s[2][index].split()
00214                                     ns=map(int,arg[arg.index('--relval')+1].split(','))
00215                                     chainDict['nowmTasklist'][-1]['RequestNumEvents'] = ns[0]
00216                                     chainDict['nowmTasklist'][-1]['EventsPerJob'] = ns[1]
00217                                 if 'FASTSIM' in s[2][index] or '--fast' in s[2][index]:
00218                                     thisLabel+='_FastSim'
00219 
00220                             elif nextHasDSInput:
00221                                 chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultInput))
00222                                 try:
00223                                     chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
00224                                 except:
00225                                     print "Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created"
00226                                     return -15
00227                                 chainDict['nowmTasklist'][-1]['InputDataset']=nextHasDSInput.dataSet
00228                                 splitForThisWf=nextHasDSInput.split
00229                                 chainDict['nowmTasklist'][-1]['LumisPerJob']=splitForThisWf
00230                                 if step in wmsplit:
00231                                     chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit[step]
00232                                 # get the run numbers or #events
00233                                 if len(nextHasDSInput.run):
00234                                     chainDict['nowmTasklist'][-1]['RunWhitelist']=nextHasDSInput.run
00235                                 #print "what is s",s[2][index]
00236                                 if '--data' in s[2][index] and nextHasDSInput.label:
00237                                     thisLabel+='_RelVal_%s'%nextHasDSInput.label
00238                                 if 'filter' in chainDict['nowmTasklist'][-1]['nowmIO']:
00239                                     print "This has an input DS and a filter sequence: very likely to be the PyQuen sample"
00240                                     processStrPrefix='PU_'
00241                                     setPrimaryDs = 'RelVal'+s[1].split('+')[0]
00242                                     if setPrimaryDs:
00243                                         chainDict['nowmTasklist'][-1]['PrimaryDataset']=setPrimaryDs
00244                                 nextHasDSInput=None
00245                             else:
00246                                 #not first step and no inputDS
00247                                 chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultTask))
00248                                 try:
00249                                     chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
00250                                 except:
00251                                     print "Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created"
00252                                     return -15
00253                                 if splitForThisWf:
00254                                     chainDict['nowmTasklist'][-1]['LumisPerJob']=splitForThisWf
00255                                 if step in wmsplit:
00256                                     chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit[step]
00257 
00258                             #print step
00259                             chainDict['nowmTasklist'][-1]['TaskName']=step
00260                             if setPrimaryDs:
00261                                 chainDict['nowmTasklist'][-1]['PrimaryDataset']=setPrimaryDs
00262                             chainDict['nowmTasklist'][-1]['ConfigCacheID']='%s/%s.py'%(dir,step)
00263                             chainDict['nowmTasklist'][-1]['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT'] # copy to the proper parameter name
00264                             chainDict['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT'] #set in general to the last one of the chain
00265                             if 'pileup' in chainDict['nowmTasklist'][-1]['nowmIO']:
00266                                 chainDict['nowmTasklist'][-1]['MCPileup']=chainDict['nowmTasklist'][-1]['nowmIO']['pileup']
00267                             if '--pileup' in s[2][index]:
00268                                 processStrPrefix='PU_'
00269                                 
00270                             if acqEra:
00271                                 #chainDict['AcquisitionEra'][step]=(chainDict['CMSSWVersion']+'-PU_'+chainDict['nowmTasklist'][-1]['GlobalTag']).replace('::All','')+thisLabel
00272                                 chainDict['AcquisitionEra'][step]=chainDict['CMSSWVersion']
00273                                 chainDict['ProcessingString'][step]=processStrPrefix+chainDict['nowmTasklist'][-1]['GlobalTag'].replace('::All','')+thisLabel
00274                             else:
00275                                 #chainDict['nowmTasklist'][-1]['AcquisitionEra']=(chainDict['CMSSWVersion']+'-PU_'+chainDict['nowmTasklist'][-1]['GlobalTag']).replace('::All','')+thisLabel
00276                                 chainDict['nowmTasklist'][-1]['AcquisitionEra']=chainDict['CMSSWVersion']
00277                                 chainDict['nowmTasklist'][-1]['ProcessingString']=processStrPrefix+chainDict['nowmTasklist'][-1]['GlobalTag'].replace('::All','')+thisLabel
00278 
00279                         index+=1
00280                     #end of loop through steps
00281                     chainDict['RequestString']='RV'+chainDict['CMSSWVersion']+s[1].split('+')[0]
00282                     if processStrPrefix or thisLabel:
00283                         chainDict['RequestString']+='_'+processStrPrefix+thisLabel
00284 
00285                         
00286                         
00287             #wrap up for this one
00288             import pprint
00289             #print 'wrapping up'
00290             #pprint.pprint(chainDict)
00291             #loop on the task list
00292             for i_second in reversed(range(len(chainDict['nowmTasklist']))):
00293                 t_second=chainDict['nowmTasklist'][i_second]
00294                 #print "t_second taskname", t_second['TaskName']
00295                 if 'primary' in t_second['nowmIO']:
00296                     #print t_second['nowmIO']['primary']
00297                     primary=t_second['nowmIO']['primary'][0].replace('file:','')
00298                     for i_input in reversed(range(0,i_second)):
00299                         t_input=chainDict['nowmTasklist'][i_input]
00300                         for (om,o) in t_input['nowmIO'].items():
00301                             if primary in o:
00302                                 #print "found",primary,"procuced by",om,"of",t_input['TaskName']
00303                                 t_second['InputTask'] = t_input['TaskName']
00304                                 t_second['InputFromOutputModule'] = om
00305                                 #print 't_second',pprint.pformat(t_second)
00306                                 if t_second['TaskName'].startswith('HARVEST'):
00307                                     chainDict.update(copy.deepcopy(self.defaultHarvest))
00308                                     chainDict['DQMConfigCacheID']=t_second['ConfigCacheID']
00309                                     ## the info are not in the task specific dict but in the general dict
00310                                     #t_input.update(copy.deepcopy(self.defaultHarvest))
00311                                     #t_input['DQMConfigCacheID']=t_second['ConfigCacheID']
00312                                 break
00313 
00314             ## there is in fact only one acquisition era
00315             #if len(set(chainDict['AcquisitionEra'].values()))==1:
00316             #    print "setting only one acq"
00317             if acqEra:
00318                 chainDict['AcquisitionEra'] = chainDict['AcquisitionEra'].values()[0]
00319                 
00320             ## clean things up now
00321             itask=0
00322             if self.keep:
00323                 for i in self.keep:
00324                     if type(i)==int and i < len(chainDict['nowmTasklist']):
00325                         chainDict['nowmTasklist'][i]['KeepOutput']=True
00326             for (i,t) in enumerate(chainDict['nowmTasklist']):
00327                 if t['TaskName'].startswith('HARVEST'):
00328                     continue
00329                 if not self.keep:
00330                     t['KeepOutput']=True
00331                 elif t['TaskName'] in self.keep:
00332                     t['KeepOutput']=True
00333                 t.pop('nowmIO')
00334                 itask+=1
00335                 chainDict['Task%d'%(itask)]=t
00336 
00337 
00338             ## 
00339 
00340 
00341             ## provide the number of tasks
00342             chainDict['TaskChain']=itask#len(chainDict['nowmTasklist'])
00343             
00344             chainDict.pop('nowmTasklist')
00345             self.chainDicts[n]=chainDict
00346 
00347             
00348         return 0

def MatrixInjector::MatrixInjector::submit (   self)

Definition at line 396 of file MatrixInjector.py.

00397                     :
00398         try:
00399             from modules.wma import makeRequest,approveRequest
00400             from wmcontrol import random_sleep
00401             print '\n\tFound wmcontrol\n'
00402         except:
00403             print '\n\tUnable to find wmcontrol modules. Please include it in your python path\n'
00404             if not self.testMode:
00405                 print '\n\t QUIT\n'
00406                 sys.exit(-17)
00407 
00408         import pprint
00409         for (n,d) in self.chainDicts.items():
00410             if self.testMode:
00411                 print "Only viewing request",n
00412                 print pprint.pprint(d)
00413             else:
00414                 #submit to wmagent each dict
00415                 print "For eyes before submitting",n
00416                 print pprint.pprint(d)
00417                 print "Submitting",n,"..........."
00418                 workFlow=makeRequest(self.wmagent,d,encodeDict=True)
00419                 approveRequest(self.wmagent,workFlow)
00420                 print "...........",n,"submitted"
00421                 random_sleep()
00422             
00423 
00424         
def MatrixInjector::MatrixInjector::upload (   self)

Definition at line 376 of file MatrixInjector.py.

00377                     :
00378         for (n,d) in self.chainDicts.items():
00379             for it in d:
00380                 if it.startswith("Task") and it!='TaskChain':
00381                     #upload
00382                     couchID=self.uploadConf(d[it]['ConfigCacheID'],
00383                                             str(n)+d[it]['TaskName'],
00384                                             d['CouchURL']
00385                                             )
00386                     print d[it]['ConfigCacheID']," uploaded to couchDB for",str(n),"with ID",couchID
00387                     d[it]['ConfigCacheID']=couchID
00388                 if it =='DQMConfigCacheID':
00389                     couchID=self.uploadConf(d['DQMConfigCacheID'],
00390                                             str(n)+'harvesting',
00391                                             d['CouchURL']
00392                                             )
00393                     print d['DQMConfigCacheID'],"uploaded to couchDB for",str(n),"with ID",couchID
00394                     d['DQMConfigCacheID']=couchID
00395                         
            
def MatrixInjector::MatrixInjector::uploadConf (   self,
  filePath,
  label,
  where 
)

Definition at line 349 of file MatrixInjector.py.

00350                                              :
00351         labelInCouch=self.label+'_'+label
00352         cacheName=filePath.split('/')[-1]
00353         if self.testMode:
00354             self.count+=1
00355             print '\tFake upload of',filePath,'to couch with label',labelInCouch
00356             return self.count
00357         else:
00358             try:
00359                 from modules.wma import upload_to_couch,DATABASE_NAME
00360             except:
00361                 print '\n\tUnable to find wmcontrol modules. Please include it in your python path\n'
00362                 print '\n\t QUIT\n'
00363                 sys.exit(-16)
00364 
00365             if cacheName in self.couchCache:
00366                 print "Not re-uploading",filePath,"to",where,"for",label
00367                 cacheId=self.couchCache[cacheName]
00368             else:
00369                 print "Loading",filePath,"to",where,"for",label
00370                 ## totally fork the upload to couch to prevent cross loading of process configurations
00371                 pool = multiprocessing.Pool(1)
00372                 cacheIds = pool.map( upload_to_couch_oneArg, [(filePath,labelInCouch,self.user,self.group,where)] )
00373                 cacheId = cacheIds[0]
00374                 self.couchCache[cacheName]=cacheId
00375             return cacheId
    

Member Data Documentation

Definition at line 39 of file MatrixInjector.py.

Definition at line 38 of file MatrixInjector.py.

Definition at line 39 of file MatrixInjector.py.

Definition at line 38 of file MatrixInjector.py.

Definition at line 39 of file MatrixInjector.py.

Definition at line 39 of file MatrixInjector.py.

Definition at line 39 of file MatrixInjector.py.

Definition at line 39 of file MatrixInjector.py.

Definition at line 39 of file MatrixInjector.py.

Definition at line 38 of file MatrixInjector.py.

Definition at line 39 of file MatrixInjector.py.

Definition at line 38 of file MatrixInjector.py.

Definition at line 39 of file MatrixInjector.py.

Definition at line 39 of file MatrixInjector.py.

Definition at line 38 of file MatrixInjector.py.

Definition at line 39 of file MatrixInjector.py.

Definition at line 38 of file MatrixInjector.py.

Definition at line 38 of file MatrixInjector.py.