CMS 3D CMS Logo

Public Member Functions | Public Attributes

MatrixInjector::MatrixInjector Class Reference

List of all members.

Public Member Functions

def __init__
def prepare
def submit
def upload
def uploadConf

Public Attributes

 chainDicts
 couch
 couchCache
 couchDB
 count
 defaultChain
 defaultInput
 defaultScratch
 defaultTask
 group
 label
 testMode
 user
 version
 wmagent

Detailed Description

Definition at line 24 of file MatrixInjector.py.


Constructor & Destructor Documentation

def MatrixInjector::MatrixInjector::__init__ (   self,
  mode = 'init' 
)

Definition at line 26 of file MatrixInjector.py.

00027                                   :
00028         self.count=1040
00029         self.testMode=((mode!='submit') and (mode!='force'))
00030         self.version ='v1'
00031 
00032         #wagemt stuff
00033         self.wmagent=os.getenv('WMAGENT_REQMGR')
00034         if not self.wmagent:
00035             self.wmagent = 'cmsweb.cern.ch'
00036             
00037         #couch stuff
00038         self.couch = 'https://'+self.wmagent+'/couchdb'
00039         self.couchDB = 'reqmgr_config_cache'
00040         self.couchCache={} # so that we do not upload like crazy, and recyle cfgs
00041         self.user = os.getenv('USER')
00042         self.group = 'ppd'
00043         self.label = 'RelValSet_'+os.getenv('CMSSW_VERSION').replace('-','')+'_'+self.version
00044 
00045 
00046         if not os.getenv('WMCORE_ROOT'):
00047             print '\n\twmclient is not setup properly. Will not be able to upload or submit requests.\n'
00048             if not self.testMode:
00049                 print '\n\t QUIT\n'
00050                 sys.exit(-18)
00051         else:
00052             print '\n\tFound wmclient\n'
00053             
00054         self.defaultChain={
00055             "RequestType" :   "TaskChain",                    #this is how we handle relvals
00056             "AcquisitionEra": None,            #Acq Era
00057             "Requestor": self.user,                           #Person responsible
00058             "Group": self.group,                              #group for the request
00059             "CMSSWVersion": os.getenv('CMSSW_VERSION'),       #CMSSW Version (used for all tasks in chain)
00060             "ScramArch": os.getenv('SCRAM_ARCH'),             #Scram Arch (used for all tasks in chain)
00061             "ProcessingVersion": self.version,                #Processing Version (used for all tasks in chain)
00062             "GlobalTag": None,                                #Global Tag (used for all tasks)
00063             "CouchURL": self.couch,                           #URL of CouchDB containing Config Cache
00064             "CouchDBName": self.couchDB,                      #Name of Couch Database containing config cache
00065             #- Will contain all configs for all Tasks
00066             "SiteWhitelist" : ["T1_CH_CERN", "T1_US_FNAL"],   #Site whitelist
00067             "TaskChain" : None,                                  #Define number of tasks in chain.
00068             "nowmTasklist" : [],  #a list of tasks as we put them in
00069             "unmergedLFNBase" : "/store/unmerged",
00070             "mergedLFNBase" : "/store/relval",
00071             "dashboardActivity" : "relval"
00072             }
00073 
00074         self.defaultScratch={
00075             "TaskName" : None,                            #Task Name
00076             "ConfigCacheID" : None,                   #Generator Config id
00077             "SplittingAlgorithm"  : "EventBased",             #Splitting Algorithm
00078             "SplittingArguments" : {"events_per_job" : None},  #Size of jobs in terms of splitting algorithm
00079             "RequestNumEvents" : None,                      #Total number of events to generate
00080             "Seeding" : "AutomaticSeeding",                          #Random seeding method
00081             "PrimaryDataset" : None,                          #Primary Dataset to be created
00082             "nowmIO": {}
00083             }
00084         self.defaultInput={
00085             "TaskName" : "DigiHLT",                                      #Task Name
00086             "ConfigCacheID" : None,                                      #Processing Config id
00087             "InputDataset" : None,                                       #Input Dataset to be processed
00088             "SplittingAlgorithm"  : "LumiBased",                        #Splitting Algorithm
00089             "SplittingArguments" : {"lumis_per_job" : 10},               #Size of jobs in terms of splitting algorithm
00090             "nowmIO": {}
00091             }
00092         self.defaultTask={
00093             "TaskName" : None,                                 #Task Name
00094             "InputTask" : None,                                #Input Task Name (Task Name field of a previous Task entry)
00095             "InputFromOutputModule" : None,                    #OutputModule name in the input task that will provide files to process
00096             "ConfigCacheID" : None,                            #Processing Config id
00097             "SplittingAlgorithm"  : "LumiBased",                        #Splitting Algorithm
00098             "SplittingArguments" : {"lumis_per_job" : 10},               #Size of jobs in terms of splitting algorithm
00099             "nowmIO": {}
00100             }
00101 
00102         self.chainDicts={}
00103 


Member Function Documentation

def MatrixInjector::MatrixInjector::prepare (   self,
  mReader,
  directories,
  mode = 'init' 
)

Definition at line 104 of file MatrixInjector.py.

00105                                                        :
00106         
00107         for (n,dir) in directories.items():
00108             chainDict=copy.deepcopy(self.defaultChain)
00109             print "inspecting",dir
00110             nextHasDSInput=None
00111             for (x,s) in mReader.workFlowSteps.items():
00112                 #x has the format (num, prefix)
00113                 #s has the format (num, name, commands, stepList)
00114                 if x[0]==n:
00115                     #print "found",n,s[3]
00116                     chainDict['RequestString']='RV'+chainDict['CMSSWVersion']+s[1].split('+')[0]
00117                     index=0
00118                     splitForThisWf=None
00119                     for step in s[3]:
00120                         if 'INPUT' in step or (not isinstance(s[2][index],str)):
00121                             nextHasDSInput=s[2][index]
00122                         else:
00123                             if 'HARVEST' in step:
00124                                 continue
00125                             if (index==0):
00126                                 #first step and not input -> gen part
00127                                 chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultScratch))
00128                                 chainDict['nowmTasklist'][-1]['PrimaryDataset']='RelVal'+s[1].split('+')[0]
00129                                 if not '--relval' in s[2][index]:
00130                                     print 'Impossible to create task from scratch'
00131                                     return -12
00132                                 else:
00133                                     arg=s[2][index].split()
00134                                     ns=map(int,arg[arg.index('--relval')+1].split(','))
00135                                     chainDict['nowmTasklist'][-1]['RequestNumEvents'] = ns[0]
00136                                     chainDict['nowmTasklist'][-1]['SplittingArguments']['events_per_job'] = ns[1]
00137                             elif nextHasDSInput:
00138                                 chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultInput))
00139                                 chainDict['nowmTasklist'][-1]['InputDataset']=nextHasDSInput.dataSet
00140                                 splitForThisWf=nextHasDSInput.split
00141                                 chainDict['nowmTasklist'][-1]['SplittingArguments']['lumis_per_job']=splitForThisWf
00142                                 # get the run numbers or #events
00143                                 if len(nextHasDSInput.run):
00144                                     chainDict['nowmTasklist'][-1]['RunWhitelist']=nextHasDSInput.run
00145                                 nextHasDSInput=None
00146                             else:
00147                                 #not first step and no inputDS
00148                                 chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultTask))
00149                                 if splitForThisWf:
00150                                     chainDict['nowmTasklist'][-1]['SplittingArguments']['lumis_per_job']=splitForThisWf
00151                             #print step
00152                             chainDict['nowmTasklist'][-1]['TaskName']=step
00153                             try:
00154                                 chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
00155                             except:
00156                                 print "Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created"
00157                                 return -15
00158                             chainDict['nowmTasklist'][-1]['ConfigCacheID']='%s/%s.py'%(dir,step)
00159                             chainDict['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT']
00160                             chainDict['AcquisitionEra']=(chainDict['CMSSWVersion']+'-'+chainDict['GlobalTag']).replace('::All','')
00161                         index+=1
00162                         
00163             #wrap up for this one
00164             #print 'wrapping up'
00165             chainDict['TaskChain']=len(chainDict['nowmTasklist'])
00166             #loop on the task list
00167             for i_second in reversed(range(len(chainDict['nowmTasklist']))):
00168             #for t_second in reversed(chainDict['nowmTasklist']):
00169                 t_second=chainDict['nowmTasklist'][i_second]
00170                 #print "t_second taskname", t_second['TaskName']
00171                 if 'primary' in t_second['nowmIO']:
00172                     #print t_second['nowmIO']['primary']
00173                     primary=t_second['nowmIO']['primary'][0].replace('file:','')
00174                     #for t_input in reversed(chainDict['nowmTasklist']):
00175                     for i_input in reversed(range(0,i_second)):
00176                         t_input=chainDict['nowmTasklist'][i_input]
00177                         for (om,o) in t_input['nowmIO'].items():
00178                             if primary in o:
00179                                 #print "found",primary,"procuced by",om,"of",t_input['TaskName']
00180                                 t_second['InputTask'] = t_input['TaskName']
00181                                 t_second['InputFromOutputModule'] = om
00182                                 #print 't_second',t_second
00183                                 break
00184             for (i,t) in enumerate(chainDict['nowmTasklist']):
00185                 t.pop('nowmIO')
00186                 chainDict['Task%d'%(i+1)]=t
00187 
00188             chainDict.pop('nowmTasklist')
00189             self.chainDicts[n]=chainDict
00190 
00191             
00192         return 0

def MatrixInjector::MatrixInjector::submit (   self)

Definition at line 234 of file MatrixInjector.py.

00235                     :
00236         try:
00237             from modules.wma import makeRequest,approveRequest
00238             from wmcontrol import random_sleep
00239             print '\n\tFound wmcontrol\n'
00240         except:
00241             print '\n\tUnable to find wmcontrol modules. Please include it in your python path\n'
00242             if not self.testMode:
00243                 print '\n\t QUIT\n'
00244                 sys.exit(-17)
00245 
00246         import pprint
00247         for (n,d) in self.chainDicts.items():
00248             if self.testMode:
00249                 print "Only viewing request",n
00250                 print pprint.pprint(d)
00251             else:
00252                 #submit to wmagent each dict
00253                 print "For eyes before submitting",n
00254                 print pprint.pprint(d)
00255                 print "Submitting",n,"..........."
00256                 workFlow=makeRequest(self.wmagent,d,encodeDict=True)
00257                 approveRequest(self.wmagent,workFlow)
00258                 print "...........",n,"submitted"
00259                 random_sleep()
00260             
00261 
00262         
def MatrixInjector::MatrixInjector::upload (   self)

Definition at line 222 of file MatrixInjector.py.

00223                     :
00224         for (n,d) in self.chainDicts.items():
00225             for it in d:
00226                 if it.startswith("Task") and it!='TaskChain':
00227                     #upload
00228                     couchID=self.uploadConf(d[it]['ConfigCacheID'],
00229                                             str(n)+d[it]['TaskName'],
00230                                             d['CouchURL']
00231                                             )
00232                     print d[it]['ConfigCacheID']," uploaded to couchDB for",str(n),"with ID",couchID
00233                     d[it]['ConfigCacheID']=couchID
            
def MatrixInjector::MatrixInjector::uploadConf (   self,
  filePath,
  label,
  where 
)

Definition at line 193 of file MatrixInjector.py.

00194                                              :
00195         labelInCouch=self.label+'_'+label
00196         cacheName=filePath.split('/')[-1]
00197         if self.testMode:
00198             self.count+=1
00199             print '\tFake upload to couch with label',labelInCouch
00200             return self.count
00201         else:
00202             try:
00203                 from modules.wma import upload_to_couch
00204             except:
00205                 print '\n\tUnable to find wmcontrol modules. Please include it in your python path\n'
00206                 print '\n\t QUIT\n'
00207                 sys.exit(-16)
00208             if cacheName in self.couchCache:
00209                 print "Not re-uploading",filePath,"to",where,"for",label
00210                 cacheId=self.couchCache[cacheName]
00211             else:
00212                 print "Loading",filePath,"to",where,"for",label
00213                 cacheId=upload_to_couch(filePath,
00214                                         labelInCouch,
00215                                         self.user,
00216                                         self.group,
00217                                         test_mode=False,
00218                                         url=where
00219                                         )
00220                 self.couchCache[cacheName]=cacheId
00221             return cacheId
    

Member Data Documentation

Definition at line 26 of file MatrixInjector.py.

Definition at line 26 of file MatrixInjector.py.

Definition at line 26 of file MatrixInjector.py.

Definition at line 26 of file MatrixInjector.py.

Definition at line 26 of file MatrixInjector.py.

Definition at line 26 of file MatrixInjector.py.

Definition at line 26 of file MatrixInjector.py.

Definition at line 26 of file MatrixInjector.py.

Definition at line 26 of file MatrixInjector.py.

Definition at line 26 of file MatrixInjector.py.

Definition at line 26 of file MatrixInjector.py.

Definition at line 26 of file MatrixInjector.py.

Definition at line 26 of file MatrixInjector.py.

Definition at line 26 of file MatrixInjector.py.

Definition at line 26 of file MatrixInjector.py.