CMS 3D CMS Logo

Public Member Functions | Public Attributes

MatrixInjector::MatrixInjector Class Reference

List of all members.

Public Member Functions

def __init__
def prepare
def submit
def upload
def uploadConf

Public Attributes

 chainDicts
 couch
 couchCache
 couchDB
 count
 defaultChain
 defaultHarvest
 defaultInput
 defaultScratch
 defaultTask
 group
 keep
 label
 speciallabel
 testMode
 user
 version
 wmagent

Detailed Description

Definition at line 23 of file MatrixInjector.py.


Constructor & Destructor Documentation

def MatrixInjector::MatrixInjector::__init__ (   self,
  opt,
  mode = 'init' 
)

Definition at line 25 of file MatrixInjector.py.

00026                                       :
00027         self.count=1040
00028         self.testMode=((mode!='submit') and (mode!='force'))
00029         self.version =1
00030         self.keep = opt.keep
00031 
00032         #wagemt stuff
00033         self.wmagent=os.getenv('WMAGENT_REQMGR')
00034         if not self.wmagent:
00035             self.wmagent = 'cmsweb.cern.ch'
00036             
00037         #couch stuff
00038         self.couch = 'https://'+self.wmagent+'/couchdb'
00039         self.couchDB = 'reqmgr_config_cache'
00040         self.couchCache={} # so that we do not upload like crazy, and recyle cfgs
00041         self.user = os.getenv('USER')
00042         self.group = 'ppd'
00043         self.label = 'RelValSet_'+os.getenv('CMSSW_VERSION').replace('-','')+'_v'+str(self.version)
00044         self.speciallabel=''
00045         if opt.label:
00046             self.speciallabel= '_'+opt.label
00047 
00048 
00049         if not os.getenv('WMCORE_ROOT'):
00050             print '\n\twmclient is not setup properly. Will not be able to upload or submit requests.\n'
00051             if not self.testMode:
00052                 print '\n\t QUIT\n'
00053                 sys.exit(-18)
00054         else:
00055             print '\n\tFound wmclient\n'
00056             
00057         self.defaultChain={
00058             "RequestType" :   "TaskChain",                    #this is how we handle relvals
00059             "AcquisitionEra": {},                             #Acq Era
00060             "ProcessingString": {},                           # processing string to label the dataset
00061             "Requestor": self.user,                           #Person responsible
00062             "Group": self.group,                              #group for the request
00063             "CMSSWVersion": os.getenv('CMSSW_VERSION'),       #CMSSW Version (used for all tasks in chain)
00064             "Campaign": os.getenv('CMSSW_VERSION'),           # only for wmstat purpose
00065             "ScramArch": os.getenv('SCRAM_ARCH'),             #Scram Arch (used for all tasks in chain)
00066             "ProcessingVersion": self.version,                #Processing Version (used for all tasks in chain)
00067             "GlobalTag": None,                                #Global Tag (overridden per task)
00068             "CouchURL": self.couch,                           #URL of CouchDB containing Config Cache
00069             "CouchDBName": self.couchDB,                      #Name of Couch Database containing config cache
00070             #- Will contain all configs for all Tasks
00071             "SiteWhitelist" : ["T2_CH_CERN", "T1_US_FNAL"],   #Site whitelist
00072             "TaskChain" : None,                                  #Define number of tasks in chain.
00073             "nowmTasklist" : [],  #a list of tasks as we put them in
00074             "unmergedLFNBase" : "/store/unmerged",
00075             "mergedLFNBase" : "/store/relval",
00076             "dashboardActivity" : "relval",
00077             "Memory" : 2400,
00078             "SizePerEvent" : 1234,
00079             "TimePerEvent" : 20
00080             }
00081 
00082         self.defaultHarvest={
00083             "EnableDQMHarvest" : 1,
00084             "DQMUploadUrl" : "https://cmsweb.cern.ch/dqm/relval",
00085             "DQMConfigCacheID" : None
00086             }
00087         
00088         self.defaultScratch={
00089             "TaskName" : None,                            #Task Name
00090             "ConfigCacheID" : None,                   #Generator Config id
00091             "GlobalTag": None,
00092             "SplittingAlgorithm"  : "EventBased",             #Splitting Algorithm
00093             "SplittingArguments" : {"events_per_job" : None},  #Size of jobs in terms of splitting algorithm
00094             "RequestNumEvents" : None,                      #Total number of events to generate
00095             "Seeding" : "AutomaticSeeding",                          #Random seeding method
00096             "PrimaryDataset" : None,                          #Primary Dataset to be created
00097             "nowmIO": {},
00098             "KeepOutput" : False
00099             }
00100         self.defaultInput={
00101             "TaskName" : "DigiHLT",                                      #Task Name
00102             "ConfigCacheID" : None,                                      #Processing Config id
00103             "GlobalTag": None,
00104             "InputDataset" : None,                                       #Input Dataset to be processed
00105             "SplittingAlgorithm"  : "LumiBased",                        #Splitting Algorithm
00106             "SplittingArguments" : {"lumis_per_job" : 10},               #Size of jobs in terms of splitting algorithm
00107             "nowmIO": {},
00108             "KeepOutput" : False
00109             }
00110         self.defaultTask={
00111             "TaskName" : None,                                 #Task Name
00112             "InputTask" : None,                                #Input Task Name (Task Name field of a previous Task entry)
00113             "InputFromOutputModule" : None,                    #OutputModule name in the input task that will provide files to process
00114             "ConfigCacheID" : None,                            #Processing Config id
00115             "GlobalTag": None,
00116             "SplittingAlgorithm"  : "LumiBased",                        #Splitting Algorithm
00117             "SplittingArguments" : {"lumis_per_job" : 10},               #Size of jobs in terms of splitting algorithm
00118             "nowmIO": {},
00119             "KeepOutput" : False
00120             }
00121 
00122         self.chainDicts={}
00123 


Member Function Documentation

def MatrixInjector::MatrixInjector::prepare (   self,
  mReader,
  directories,
  mode = 'init' 
)

Definition at line 124 of file MatrixInjector.py.

00125                                                        :
00126         try:
00127             from Configuration.PyReleaseValidation.relval_steps import wmsplit
00128             import pprint
00129             pprint.pprint(wmsplit)
00130         except:
00131             print "Not set up for step splitting"
00132             wmsplit={}
00133 
00134         acqEra=False
00135         for (n,dir) in directories.items():
00136             chainDict=copy.deepcopy(self.defaultChain)
00137             print "inspecting",dir
00138             nextHasDSInput=None
00139             for (x,s) in mReader.workFlowSteps.items():
00140                 #x has the format (num, prefix)
00141                 #s has the format (num, name, commands, stepList)
00142                 if x[0]==n:
00143                     #print "found",n,s[3]
00144                     chainDict['RequestString']='RV'+chainDict['CMSSWVersion']+s[1].split('+')[0]
00145                     index=0
00146                     splitForThisWf=None
00147                     thisLabel=self.speciallabel
00148                     processStrPrefix=''
00149                     for step in s[3]:
00150                         
00151                         if 'INPUT' in step or (not isinstance(s[2][index],str)):
00152                             nextHasDSInput=s[2][index]
00153 
00154                         else:
00155 
00156                             if (index==0):
00157                                 #first step and not input -> gen part
00158                                 chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultScratch))
00159                                 try:
00160                                     chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
00161                                 except:
00162                                     print "Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created"
00163                                     return -15
00164 
00165                                 chainDict['nowmTasklist'][-1]['PrimaryDataset']='RelVal'+s[1].split('+')[0]
00166                                 if not '--relval' in s[2][index]:
00167                                     print 'Impossible to create task from scratch without splitting information with --relval'
00168                                     return -12
00169                                 else:
00170                                     arg=s[2][index].split()
00171                                     ns=map(int,arg[arg.index('--relval')+1].split(','))
00172                                     chainDict['nowmTasklist'][-1]['RequestNumEvents'] = ns[0]
00173                                     chainDict['nowmTasklist'][-1]['SplittingArguments']['events_per_job'] = ns[1]
00174                                 if 'FASTSIM' in s[2][index]:
00175                                     thisLabel+='_FastSim'
00176 
00177                             elif nextHasDSInput:
00178                                 chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultInput))
00179                                 try:
00180                                     chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
00181                                 except:
00182                                     print "Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created"
00183                                     return -15
00184                                 chainDict['nowmTasklist'][-1]['InputDataset']=nextHasDSInput.dataSet
00185                                 splitForThisWf=nextHasDSInput.split
00186                                 chainDict['nowmTasklist'][-1]['SplittingArguments']['lumis_per_job']=splitForThisWf
00187                                 if step in wmsplit:
00188                                     chainDict['nowmTasklist'][-1]['SplittingArguments']['lumis_per_job']=wmsplit[step]
00189                                 # get the run numbers or #events
00190                                 if len(nextHasDSInput.run):
00191                                     chainDict['nowmTasklist'][-1]['RunWhitelist']=nextHasDSInput.run
00192                                 #print "what is s",s[2][index]
00193                                 if '--data' in s[2][index] and nextHasDSInput.label:
00194                                     thisLabel+='_RelVal_%s'%nextHasDSInput.label
00195                                 if 'filter' in chainDict['nowmTasklist'][-1]['nowmIO']:
00196                                     print "This has an input DS and a filter sequence: very likely to be the PyQuen sample"
00197                                     processStrPrefix='PU_'
00198                                     chainDict['nowmTasklist'][-1]['PrimaryDataset']='RelVal'+s[1].split('+')[0]
00199                                 nextHasDSInput=None
00200                             else:
00201                                 #not first step and no inputDS
00202                                 chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultTask))
00203                                 try:
00204                                     chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
00205                                 except:
00206                                     print "Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created"
00207                                     return -15
00208                                 if splitForThisWf:
00209                                     chainDict['nowmTasklist'][-1]['SplittingArguments']['lumis_per_job']=splitForThisWf
00210                                 if step in wmsplit:
00211                                     chainDict['nowmTasklist'][-1]['SplittingArguments']['lumis_per_job']=wmsplit[step]
00212 
00213                             #print step
00214                             chainDict['nowmTasklist'][-1]['TaskName']=step
00215                             chainDict['nowmTasklist'][-1]['ConfigCacheID']='%s/%s.py'%(dir,step)
00216                             chainDict['nowmTasklist'][-1]['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT'] # copy to the proper parameter name
00217                             chainDict['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT'] #set in general to the last one of the chain
00218                             if 'pileup' in chainDict['nowmTasklist'][-1]['nowmIO']:
00219                                 chainDict['nowmTasklist'][-1]['MCPileup']=chainDict['nowmTasklist'][-1]['nowmIO']['pileup']
00220                             if '--pileup' in s[2][index]:
00221                                 processStrPrefix='PU_'
00222                             if acqEra:
00223                                 #chainDict['AcquisitionEra'][step]=(chainDict['CMSSWVersion']+'-PU_'+chainDict['nowmTasklist'][-1]['GlobalTag']).replace('::All','')+thisLabel
00224                                 chainDict['AcquisitionEra'][step]=chainDict['CMSSWVersion']
00225                                 chainDict['ProcessingString'][step]=processStrPrefix+chainDict['nowmTasklist'][-1]['GlobalTag'].replace('::All','')+thisLabel
00226                             else:
00227                                 #chainDict['nowmTasklist'][-1]['AcquisitionEra']=(chainDict['CMSSWVersion']+'-PU_'+chainDict['nowmTasklist'][-1]['GlobalTag']).replace('::All','')+thisLabel
00228                                 chainDict['nowmTasklist'][-1]['AcquisitionEra']=chainDict['CMSSWVersion']
00229                                 chainDict['nowmTasklist'][-1]['ProcessingString']=processStrPrefix+chainDict['nowmTasklist'][-1]['GlobalTag'].replace('::All','')+thisLabel
00230 
00231                         index+=1
00232                         
00233             #wrap up for this one
00234             import pprint
00235             #print 'wrapping up'
00236             #pprint.pprint(chainDict)
00237             #loop on the task list
00238             for i_second in reversed(range(len(chainDict['nowmTasklist']))):
00239                 t_second=chainDict['nowmTasklist'][i_second]
00240                 #print "t_second taskname", t_second['TaskName']
00241                 if 'primary' in t_second['nowmIO']:
00242                     #print t_second['nowmIO']['primary']
00243                     primary=t_second['nowmIO']['primary'][0].replace('file:','')
00244                     for i_input in reversed(range(0,i_second)):
00245                         t_input=chainDict['nowmTasklist'][i_input]
00246                         for (om,o) in t_input['nowmIO'].items():
00247                             if primary in o:
00248                                 #print "found",primary,"procuced by",om,"of",t_input['TaskName']
00249                                 t_second['InputTask'] = t_input['TaskName']
00250                                 t_second['InputFromOutputModule'] = om
00251                                 #print 't_second',pprint.pformat(t_second)
00252                                 if t_second['TaskName'].startswith('HARVEST'):
00253                                     chainDict.update(copy.deepcopy(self.defaultHarvest))
00254                                     chainDict['DQMConfigCacheID']=t_second['ConfigCacheID']
00255                                     ## the info are not in the task specific dict but in the general dict
00256                                     #t_input.update(copy.deepcopy(self.defaultHarvest))
00257                                     #t_input['DQMConfigCacheID']=t_second['ConfigCacheID']
00258                                 break
00259 
00260             ## there is in fact only one acquisition era
00261             #if len(set(chainDict['AcquisitionEra'].values()))==1:
00262             #    print "setting only one acq"
00263             if acqEra:
00264                 chainDict['AcquisitionEra'] = chainDict['AcquisitionEra'].values()[0]
00265                 
00266             ## clean things up now
00267             itask=0
00268             if self.keep:
00269                 for i in self.keep:
00270                     if type(i)==int and i < len(chainDict['nowmTasklist']):
00271                         chainDict['nowmTasklist'][i]['KeepOutput']=True
00272             for (i,t) in enumerate(chainDict['nowmTasklist']):
00273                 if t['TaskName'].startswith('HARVEST'):
00274                     continue
00275                 if not self.keep:
00276                     t['KeepOutput']=True
00277                 elif t['TaskName'] in self.keep:
00278                     t['KeepOutput']=True
00279                 t.pop('nowmIO')
00280                 itask+=1
00281                 chainDict['Task%d'%(itask)]=t
00282 
00283 
00284             ## 
00285 
00286 
00287             ## provide the number of tasks
00288             chainDict['TaskChain']=itask#len(chainDict['nowmTasklist'])
00289             
00290             chainDict.pop('nowmTasklist')
00291             self.chainDicts[n]=chainDict
00292 
00293             
00294         return 0

def MatrixInjector::MatrixInjector::submit (   self)

Definition at line 344 of file MatrixInjector.py.

00345                     :
00346         try:
00347             from modules.wma import makeRequest,approveRequest
00348             from wmcontrol import random_sleep
00349             print '\n\tFound wmcontrol\n'
00350         except:
00351             print '\n\tUnable to find wmcontrol modules. Please include it in your python path\n'
00352             if not self.testMode:
00353                 print '\n\t QUIT\n'
00354                 sys.exit(-17)
00355 
00356         import pprint
00357         for (n,d) in self.chainDicts.items():
00358             if self.testMode:
00359                 print "Only viewing request",n
00360                 print pprint.pprint(d)
00361             else:
00362                 #submit to wmagent each dict
00363                 print "For eyes before submitting",n
00364                 print pprint.pprint(d)
00365                 print "Submitting",n,"..........."
00366                 workFlow=makeRequest(self.wmagent,d,encodeDict=True)
00367                 approveRequest(self.wmagent,workFlow)
00368                 print "...........",n,"submitted"
00369                 random_sleep()
00370             
00371 
00372         
def MatrixInjector::MatrixInjector::upload (   self)

Definition at line 324 of file MatrixInjector.py.

00325                     :
00326         for (n,d) in self.chainDicts.items():
00327             for it in d:
00328                 if it.startswith("Task") and it!='TaskChain':
00329                     #upload
00330                     couchID=self.uploadConf(d[it]['ConfigCacheID'],
00331                                             str(n)+d[it]['TaskName'],
00332                                             d['CouchURL']
00333                                             )
00334                     print d[it]['ConfigCacheID']," uploaded to couchDB for",str(n),"with ID",couchID
00335                     d[it]['ConfigCacheID']=couchID
00336                 if it =='DQMConfigCacheID':
00337                     couchID=self.uploadConf(d['DQMConfigCacheID'],
00338                                             str(n)+'harvesting',
00339                                             d['CouchURL']
00340                                             )
00341                     print d['DQMConfigCacheID'],"uploaded to couchDB for",str(n),"with ID",couchID
00342                     d['DQMConfigCacheID']=couchID
00343                         
            
def MatrixInjector::MatrixInjector::uploadConf (   self,
  filePath,
  label,
  where 
)

Definition at line 295 of file MatrixInjector.py.

00296                                              :
00297         labelInCouch=self.label+'_'+label
00298         cacheName=filePath.split('/')[-1]
00299         if self.testMode:
00300             self.count+=1
00301             print '\tFake upload of',filePath,'to couch with label',labelInCouch
00302             return self.count
00303         else:
00304             try:
00305                 from modules.wma import upload_to_couch
00306             except:
00307                 print '\n\tUnable to find wmcontrol modules. Please include it in your python path\n'
00308                 print '\n\t QUIT\n'
00309                 sys.exit(-16)
00310             if cacheName in self.couchCache:
00311                 print "Not re-uploading",filePath,"to",where,"for",label
00312                 cacheId=self.couchCache[cacheName]
00313             else:
00314                 print "Loading",filePath,"to",where,"for",label
00315                 cacheId=upload_to_couch(filePath,
00316                                         labelInCouch,
00317                                         self.user,
00318                                         self.group,
00319                                         test_mode=False,
00320                                         url=where
00321                                         )
00322                 self.couchCache[cacheName]=cacheId
00323             return cacheId
    

Member Data Documentation

Definition at line 25 of file MatrixInjector.py.

Definition at line 25 of file MatrixInjector.py.

Definition at line 25 of file MatrixInjector.py.

Definition at line 25 of file MatrixInjector.py.

Definition at line 25 of file MatrixInjector.py.

Definition at line 25 of file MatrixInjector.py.

Definition at line 25 of file MatrixInjector.py.

Definition at line 25 of file MatrixInjector.py.

Definition at line 25 of file MatrixInjector.py.

Definition at line 25 of file MatrixInjector.py.

Definition at line 25 of file MatrixInjector.py.

Definition at line 25 of file MatrixInjector.py.

Definition at line 25 of file MatrixInjector.py.

Definition at line 25 of file MatrixInjector.py.

Definition at line 25 of file MatrixInjector.py.

Definition at line 25 of file MatrixInjector.py.

Definition at line 25 of file MatrixInjector.py.

Definition at line 25 of file MatrixInjector.py.