![]() |
![]() |
Public Member Functions | |
def | __init__ |
def | prepare |
def | submit |
def | upload |
def | uploadConf |
Public Attributes | |
chainDicts | |
couch | |
couchCache | |
couchDB | |
count | |
defaultChain | |
defaultInput | |
defaultScratch | |
defaultTask | |
group | |
label | |
testMode | |
user | |
version | |
wmagent |
Definition at line 24 of file MatrixInjector.py.
Definition at line 26 of file MatrixInjector.py.
00027 : 00028 self.count=1040 00029 self.testMode=((mode!='submit') and (mode!='force')) 00030 self.version ='v1' 00031 00032 #wagemt stuff 00033 self.wmagent=os.getenv('WMAGENT_REQMGR') 00034 if not self.wmagent: 00035 self.wmagent = 'cmsweb.cern.ch' 00036 00037 #couch stuff 00038 self.couch = 'https://'+self.wmagent+'/couchdb' 00039 self.couchDB = 'reqmgr_config_cache' 00040 self.couchCache={} # so that we do not upload like crazy, and recyle cfgs 00041 self.user = os.getenv('USER') 00042 self.group = 'ppd' 00043 self.label = 'RelValSet_'+os.getenv('CMSSW_VERSION').replace('-','')+'_'+self.version 00044 00045 00046 if not os.getenv('WMCORE_ROOT'): 00047 print '\n\twmclient is not setup properly. Will not be able to upload or submit requests.\n' 00048 if not self.testMode: 00049 print '\n\t QUIT\n' 00050 sys.exit(-18) 00051 else: 00052 print '\n\tFound wmclient\n' 00053 00054 self.defaultChain={ 00055 "RequestType" : "TaskChain", #this is how we handle relvals 00056 "AcquisitionEra": None, #Acq Era 00057 "Requestor": self.user, #Person responsible 00058 "Group": self.group, #group for the request 00059 "CMSSWVersion": os.getenv('CMSSW_VERSION'), #CMSSW Version (used for all tasks in chain) 00060 "ScramArch": os.getenv('SCRAM_ARCH'), #Scram Arch (used for all tasks in chain) 00061 "ProcessingVersion": self.version, #Processing Version (used for all tasks in chain) 00062 "GlobalTag": None, #Global Tag (used for all tasks) 00063 "CouchURL": self.couch, #URL of CouchDB containing Config Cache 00064 "CouchDBName": self.couchDB, #Name of Couch Database containing config cache 00065 #- Will contain all configs for all Tasks 00066 "SiteWhitelist" : ["T1_CH_CERN", "T1_US_FNAL"], #Site whitelist 00067 "TaskChain" : None, #Define number of tasks in chain. 00068 "nowmTasklist" : [], #a list of tasks as we put them in 00069 "unmergedLFNBase" : "/store/unmerged", 00070 "mergedLFNBase" : "/store/relval", 00071 "dashboardActivity" : "relval" 00072 } 00073 00074 self.defaultScratch={ 00075 "TaskName" : None, #Task Name 00076 "ConfigCacheID" : None, #Generator Config id 00077 "SplittingAlgorithm" : "EventBased", #Splitting Algorithm 00078 "SplittingArguments" : {"events_per_job" : None}, #Size of jobs in terms of splitting algorithm 00079 "RequestNumEvents" : None, #Total number of events to generate 00080 "Seeding" : "AutomaticSeeding", #Random seeding method 00081 "PrimaryDataset" : None, #Primary Dataset to be created 00082 "nowmIO": {} 00083 } 00084 self.defaultInput={ 00085 "TaskName" : "DigiHLT", #Task Name 00086 "ConfigCacheID" : None, #Processing Config id 00087 "InputDataset" : None, #Input Dataset to be processed 00088 "SplittingAlgorithm" : "LumiBased", #Splitting Algorithm 00089 "SplittingArguments" : {"lumis_per_job" : 10}, #Size of jobs in terms of splitting algorithm 00090 "nowmIO": {} 00091 } 00092 self.defaultTask={ 00093 "TaskName" : None, #Task Name 00094 "InputTask" : None, #Input Task Name (Task Name field of a previous Task entry) 00095 "InputFromOutputModule" : None, #OutputModule name in the input task that will provide files to process 00096 "ConfigCacheID" : None, #Processing Config id 00097 "SplittingAlgorithm" : "LumiBased", #Splitting Algorithm 00098 "SplittingArguments" : {"lumis_per_job" : 10}, #Size of jobs in terms of splitting algorithm 00099 "nowmIO": {} 00100 } 00101 00102 self.chainDicts={} 00103
Definition at line 104 of file MatrixInjector.py.
00105 : 00106 00107 for (n,dir) in directories.items(): 00108 chainDict=copy.deepcopy(self.defaultChain) 00109 print "inspecting",dir 00110 nextHasDSInput=None 00111 for (x,s) in mReader.workFlowSteps.items(): 00112 #x has the format (num, prefix) 00113 #s has the format (num, name, commands, stepList) 00114 if x[0]==n: 00115 #print "found",n,s[3] 00116 chainDict['RequestString']='RV'+chainDict['CMSSWVersion']+s[1].split('+')[0] 00117 index=0 00118 splitForThisWf=None 00119 for step in s[3]: 00120 if 'INPUT' in step or (not isinstance(s[2][index],str)): 00121 nextHasDSInput=s[2][index] 00122 else: 00123 if 'HARVEST' in step: 00124 continue 00125 if (index==0): 00126 #first step and not input -> gen part 00127 chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultScratch)) 00128 chainDict['nowmTasklist'][-1]['PrimaryDataset']='RelVal'+s[1].split('+')[0] 00129 if not '--relval' in s[2][index]: 00130 print 'Impossible to create task from scratch' 00131 return -12 00132 else: 00133 arg=s[2][index].split() 00134 ns=map(int,arg[arg.index('--relval')+1].split(',')) 00135 chainDict['nowmTasklist'][-1]['RequestNumEvents'] = ns[0] 00136 chainDict['nowmTasklist'][-1]['SplittingArguments']['events_per_job'] = ns[1] 00137 elif nextHasDSInput: 00138 chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultInput)) 00139 chainDict['nowmTasklist'][-1]['InputDataset']=nextHasDSInput.dataSet 00140 splitForThisWf=nextHasDSInput.split 00141 chainDict['nowmTasklist'][-1]['SplittingArguments']['lumis_per_job']=splitForThisWf 00142 # get the run numbers or #events 00143 if len(nextHasDSInput.run): 00144 chainDict['nowmTasklist'][-1]['RunWhitelist']=nextHasDSInput.run 00145 nextHasDSInput=None 00146 else: 00147 #not first step and no inputDS 00148 chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultTask)) 00149 if splitForThisWf: 00150 chainDict['nowmTasklist'][-1]['SplittingArguments']['lumis_per_job']=splitForThisWf 00151 #print step 00152 chainDict['nowmTasklist'][-1]['TaskName']=step 00153 try: 00154 chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read()) 00155 except: 00156 print "Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created" 00157 return -15 00158 chainDict['nowmTasklist'][-1]['ConfigCacheID']='%s/%s.py'%(dir,step) 00159 chainDict['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT'] 00160 chainDict['AcquisitionEra']=(chainDict['CMSSWVersion']+'-'+chainDict['GlobalTag']).replace('::All','') 00161 index+=1 00162 00163 #wrap up for this one 00164 #print 'wrapping up' 00165 chainDict['TaskChain']=len(chainDict['nowmTasklist']) 00166 #loop on the task list 00167 for i_second in reversed(range(len(chainDict['nowmTasklist']))): 00168 #for t_second in reversed(chainDict['nowmTasklist']): 00169 t_second=chainDict['nowmTasklist'][i_second] 00170 #print "t_second taskname", t_second['TaskName'] 00171 if 'primary' in t_second['nowmIO']: 00172 #print t_second['nowmIO']['primary'] 00173 primary=t_second['nowmIO']['primary'][0].replace('file:','') 00174 #for t_input in reversed(chainDict['nowmTasklist']): 00175 for i_input in reversed(range(0,i_second)): 00176 t_input=chainDict['nowmTasklist'][i_input] 00177 for (om,o) in t_input['nowmIO'].items(): 00178 if primary in o: 00179 #print "found",primary,"procuced by",om,"of",t_input['TaskName'] 00180 t_second['InputTask'] = t_input['TaskName'] 00181 t_second['InputFromOutputModule'] = om 00182 #print 't_second',t_second 00183 break 00184 for (i,t) in enumerate(chainDict['nowmTasklist']): 00185 t.pop('nowmIO') 00186 chainDict['Task%d'%(i+1)]=t 00187 00188 chainDict.pop('nowmTasklist') 00189 self.chainDicts[n]=chainDict 00190 00191 00192 return 0
def MatrixInjector::MatrixInjector::submit | ( | self | ) |
Definition at line 234 of file MatrixInjector.py.
00235 : 00236 try: 00237 from modules.wma import makeRequest,approveRequest 00238 from wmcontrol import random_sleep 00239 print '\n\tFound wmcontrol\n' 00240 except: 00241 print '\n\tUnable to find wmcontrol modules. Please include it in your python path\n' 00242 if not self.testMode: 00243 print '\n\t QUIT\n' 00244 sys.exit(-17) 00245 00246 import pprint 00247 for (n,d) in self.chainDicts.items(): 00248 if self.testMode: 00249 print "Only viewing request",n 00250 print pprint.pprint(d) 00251 else: 00252 #submit to wmagent each dict 00253 print "For eyes before submitting",n 00254 print pprint.pprint(d) 00255 print "Submitting",n,"..........." 00256 workFlow=makeRequest(self.wmagent,d,encodeDict=True) 00257 approveRequest(self.wmagent,workFlow) 00258 print "...........",n,"submitted" 00259 random_sleep() 00260 00261 00262
def MatrixInjector::MatrixInjector::upload | ( | self | ) |
Definition at line 222 of file MatrixInjector.py.
00223 : 00224 for (n,d) in self.chainDicts.items(): 00225 for it in d: 00226 if it.startswith("Task") and it!='TaskChain': 00227 #upload 00228 couchID=self.uploadConf(d[it]['ConfigCacheID'], 00229 str(n)+d[it]['TaskName'], 00230 d['CouchURL'] 00231 ) 00232 print d[it]['ConfigCacheID']," uploaded to couchDB for",str(n),"with ID",couchID 00233 d[it]['ConfigCacheID']=couchID
def MatrixInjector::MatrixInjector::uploadConf | ( | self, | |
filePath, | |||
label, | |||
where | |||
) |
Definition at line 193 of file MatrixInjector.py.
00194 : 00195 labelInCouch=self.label+'_'+label 00196 cacheName=filePath.split('/')[-1] 00197 if self.testMode: 00198 self.count+=1 00199 print '\tFake upload to couch with label',labelInCouch 00200 return self.count 00201 else: 00202 try: 00203 from modules.wma import upload_to_couch 00204 except: 00205 print '\n\tUnable to find wmcontrol modules. Please include it in your python path\n' 00206 print '\n\t QUIT\n' 00207 sys.exit(-16) 00208 if cacheName in self.couchCache: 00209 print "Not re-uploading",filePath,"to",where,"for",label 00210 cacheId=self.couchCache[cacheName] 00211 else: 00212 print "Loading",filePath,"to",where,"for",label 00213 cacheId=upload_to_couch(filePath, 00214 labelInCouch, 00215 self.user, 00216 self.group, 00217 test_mode=False, 00218 url=where 00219 ) 00220 self.couchCache[cacheName]=cacheId 00221 return cacheId
Definition at line 26 of file MatrixInjector.py.
Definition at line 26 of file MatrixInjector.py.
Definition at line 26 of file MatrixInjector.py.
Definition at line 26 of file MatrixInjector.py.
Definition at line 26 of file MatrixInjector.py.
Definition at line 26 of file MatrixInjector.py.
Definition at line 26 of file MatrixInjector.py.
Definition at line 26 of file MatrixInjector.py.
Definition at line 26 of file MatrixInjector.py.
Definition at line 26 of file MatrixInjector.py.
Definition at line 26 of file MatrixInjector.py.
Definition at line 26 of file MatrixInjector.py.
Definition at line 26 of file MatrixInjector.py.
Definition at line 26 of file MatrixInjector.py.
Definition at line 26 of file MatrixInjector.py.