![]() |
![]() |
Public Member Functions | |
def | __init__ |
def | prepare |
def | submit |
def | upload |
def | uploadConf |
Public Attributes | |
chainDicts | |
couch | |
couchCache | |
couchDB | |
count | |
defaultChain | |
defaultHarvest | |
defaultInput | |
defaultScratch | |
defaultTask | |
group | |
keep | |
label | |
speciallabel | |
testMode | |
user | |
version | |
wmagent |
Definition at line 23 of file MatrixInjector.py.
Definition at line 25 of file MatrixInjector.py.
00026 : 00027 self.count=1040 00028 self.testMode=((mode!='submit') and (mode!='force')) 00029 self.version =1 00030 self.keep = opt.keep 00031 00032 #wagemt stuff 00033 self.wmagent=os.getenv('WMAGENT_REQMGR') 00034 if not self.wmagent: 00035 self.wmagent = 'cmsweb.cern.ch' 00036 00037 #couch stuff 00038 self.couch = 'https://'+self.wmagent+'/couchdb' 00039 self.couchDB = 'reqmgr_config_cache' 00040 self.couchCache={} # so that we do not upload like crazy, and recyle cfgs 00041 self.user = os.getenv('USER') 00042 self.group = 'ppd' 00043 self.label = 'RelValSet_'+os.getenv('CMSSW_VERSION').replace('-','')+'_v'+str(self.version) 00044 self.speciallabel='' 00045 if opt.label: 00046 self.speciallabel= '_'+opt.label 00047 00048 00049 if not os.getenv('WMCORE_ROOT'): 00050 print '\n\twmclient is not setup properly. Will not be able to upload or submit requests.\n' 00051 if not self.testMode: 00052 print '\n\t QUIT\n' 00053 sys.exit(-18) 00054 else: 00055 print '\n\tFound wmclient\n' 00056 00057 self.defaultChain={ 00058 "RequestType" : "TaskChain", #this is how we handle relvals 00059 "AcquisitionEra": {}, #Acq Era 00060 "ProcessingString": {}, # processing string to label the dataset 00061 "Requestor": self.user, #Person responsible 00062 "Group": self.group, #group for the request 00063 "CMSSWVersion": os.getenv('CMSSW_VERSION'), #CMSSW Version (used for all tasks in chain) 00064 "Campaign": os.getenv('CMSSW_VERSION'), # only for wmstat purpose 00065 "ScramArch": os.getenv('SCRAM_ARCH'), #Scram Arch (used for all tasks in chain) 00066 "ProcessingVersion": self.version, #Processing Version (used for all tasks in chain) 00067 "GlobalTag": None, #Global Tag (overridden per task) 00068 "CouchURL": self.couch, #URL of CouchDB containing Config Cache 00069 "CouchDBName": self.couchDB, #Name of Couch Database containing config cache 00070 #- Will contain all configs for all Tasks 00071 "SiteWhitelist" : ["T2_CH_CERN", "T1_US_FNAL"], #Site whitelist 00072 "TaskChain" : None, #Define number of tasks in chain. 00073 "nowmTasklist" : [], #a list of tasks as we put them in 00074 "unmergedLFNBase" : "/store/unmerged", 00075 "mergedLFNBase" : "/store/relval", 00076 "dashboardActivity" : "relval", 00077 "Memory" : 2400, 00078 "SizePerEvent" : 1234, 00079 "TimePerEvent" : 20 00080 } 00081 00082 self.defaultHarvest={ 00083 "EnableDQMHarvest" : 1, 00084 "DQMUploadUrl" : "https://cmsweb.cern.ch/dqm/relval", 00085 "DQMConfigCacheID" : None 00086 } 00087 00088 self.defaultScratch={ 00089 "TaskName" : None, #Task Name 00090 "ConfigCacheID" : None, #Generator Config id 00091 "GlobalTag": None, 00092 "SplittingAlgorithm" : "EventBased", #Splitting Algorithm 00093 "SplittingArguments" : {"events_per_job" : None}, #Size of jobs in terms of splitting algorithm 00094 "RequestNumEvents" : None, #Total number of events to generate 00095 "Seeding" : "AutomaticSeeding", #Random seeding method 00096 "PrimaryDataset" : None, #Primary Dataset to be created 00097 "nowmIO": {}, 00098 "KeepOutput" : False 00099 } 00100 self.defaultInput={ 00101 "TaskName" : "DigiHLT", #Task Name 00102 "ConfigCacheID" : None, #Processing Config id 00103 "GlobalTag": None, 00104 "InputDataset" : None, #Input Dataset to be processed 00105 "SplittingAlgorithm" : "LumiBased", #Splitting Algorithm 00106 "SplittingArguments" : {"lumis_per_job" : 10}, #Size of jobs in terms of splitting algorithm 00107 "nowmIO": {}, 00108 "KeepOutput" : False 00109 } 00110 self.defaultTask={ 00111 "TaskName" : None, #Task Name 00112 "InputTask" : None, #Input Task Name (Task Name field of a previous Task entry) 00113 "InputFromOutputModule" : None, #OutputModule name in the input task that will provide files to process 00114 "ConfigCacheID" : None, #Processing Config id 00115 "GlobalTag": None, 00116 "SplittingAlgorithm" : "LumiBased", #Splitting Algorithm 00117 "SplittingArguments" : {"lumis_per_job" : 10}, #Size of jobs in terms of splitting algorithm 00118 "nowmIO": {}, 00119 "KeepOutput" : False 00120 } 00121 00122 self.chainDicts={} 00123
Definition at line 124 of file MatrixInjector.py.
00125 : 00126 try: 00127 from Configuration.PyReleaseValidation.relval_steps import wmsplit 00128 import pprint 00129 pprint.pprint(wmsplit) 00130 except: 00131 print "Not set up for step splitting" 00132 wmsplit={} 00133 00134 acqEra=False 00135 for (n,dir) in directories.items(): 00136 chainDict=copy.deepcopy(self.defaultChain) 00137 print "inspecting",dir 00138 nextHasDSInput=None 00139 for (x,s) in mReader.workFlowSteps.items(): 00140 #x has the format (num, prefix) 00141 #s has the format (num, name, commands, stepList) 00142 if x[0]==n: 00143 #print "found",n,s[3] 00144 chainDict['RequestString']='RV'+chainDict['CMSSWVersion']+s[1].split('+')[0] 00145 index=0 00146 splitForThisWf=None 00147 thisLabel=self.speciallabel 00148 processStrPrefix='' 00149 for step in s[3]: 00150 00151 if 'INPUT' in step or (not isinstance(s[2][index],str)): 00152 nextHasDSInput=s[2][index] 00153 00154 else: 00155 00156 if (index==0): 00157 #first step and not input -> gen part 00158 chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultScratch)) 00159 try: 00160 chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read()) 00161 except: 00162 print "Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created" 00163 return -15 00164 00165 chainDict['nowmTasklist'][-1]['PrimaryDataset']='RelVal'+s[1].split('+')[0] 00166 if not '--relval' in s[2][index]: 00167 print 'Impossible to create task from scratch without splitting information with --relval' 00168 return -12 00169 else: 00170 arg=s[2][index].split() 00171 ns=map(int,arg[arg.index('--relval')+1].split(',')) 00172 chainDict['nowmTasklist'][-1]['RequestNumEvents'] = ns[0] 00173 chainDict['nowmTasklist'][-1]['SplittingArguments']['events_per_job'] = ns[1] 00174 if 'FASTSIM' in s[2][index]: 00175 thisLabel+='_FastSim' 00176 00177 elif nextHasDSInput: 00178 chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultInput)) 00179 try: 00180 chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read()) 00181 except: 00182 print "Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created" 00183 return -15 00184 chainDict['nowmTasklist'][-1]['InputDataset']=nextHasDSInput.dataSet 00185 splitForThisWf=nextHasDSInput.split 00186 chainDict['nowmTasklist'][-1]['SplittingArguments']['lumis_per_job']=splitForThisWf 00187 if step in wmsplit: 00188 chainDict['nowmTasklist'][-1]['SplittingArguments']['lumis_per_job']=wmsplit[step] 00189 # get the run numbers or #events 00190 if len(nextHasDSInput.run): 00191 chainDict['nowmTasklist'][-1]['RunWhitelist']=nextHasDSInput.run 00192 #print "what is s",s[2][index] 00193 if '--data' in s[2][index] and nextHasDSInput.label: 00194 thisLabel+='_RelVal_%s'%nextHasDSInput.label 00195 if 'filter' in chainDict['nowmTasklist'][-1]['nowmIO']: 00196 print "This has an input DS and a filter sequence: very likely to be the PyQuen sample" 00197 processStrPrefix='PU_' 00198 chainDict['nowmTasklist'][-1]['PrimaryDataset']='RelVal'+s[1].split('+')[0] 00199 nextHasDSInput=None 00200 else: 00201 #not first step and no inputDS 00202 chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultTask)) 00203 try: 00204 chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read()) 00205 except: 00206 print "Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created" 00207 return -15 00208 if splitForThisWf: 00209 chainDict['nowmTasklist'][-1]['SplittingArguments']['lumis_per_job']=splitForThisWf 00210 if step in wmsplit: 00211 chainDict['nowmTasklist'][-1]['SplittingArguments']['lumis_per_job']=wmsplit[step] 00212 00213 #print step 00214 chainDict['nowmTasklist'][-1]['TaskName']=step 00215 chainDict['nowmTasklist'][-1]['ConfigCacheID']='%s/%s.py'%(dir,step) 00216 chainDict['nowmTasklist'][-1]['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT'] # copy to the proper parameter name 00217 chainDict['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT'] #set in general to the last one of the chain 00218 if 'pileup' in chainDict['nowmTasklist'][-1]['nowmIO']: 00219 chainDict['nowmTasklist'][-1]['MCPileup']=chainDict['nowmTasklist'][-1]['nowmIO']['pileup'] 00220 if '--pileup' in s[2][index]: 00221 processStrPrefix='PU_' 00222 if acqEra: 00223 #chainDict['AcquisitionEra'][step]=(chainDict['CMSSWVersion']+'-PU_'+chainDict['nowmTasklist'][-1]['GlobalTag']).replace('::All','')+thisLabel 00224 chainDict['AcquisitionEra'][step]=chainDict['CMSSWVersion'] 00225 chainDict['ProcessingString'][step]=processStrPrefix+chainDict['nowmTasklist'][-1]['GlobalTag'].replace('::All','')+thisLabel 00226 else: 00227 #chainDict['nowmTasklist'][-1]['AcquisitionEra']=(chainDict['CMSSWVersion']+'-PU_'+chainDict['nowmTasklist'][-1]['GlobalTag']).replace('::All','')+thisLabel 00228 chainDict['nowmTasklist'][-1]['AcquisitionEra']=chainDict['CMSSWVersion'] 00229 chainDict['nowmTasklist'][-1]['ProcessingString']=processStrPrefix+chainDict['nowmTasklist'][-1]['GlobalTag'].replace('::All','')+thisLabel 00230 00231 index+=1 00232 00233 #wrap up for this one 00234 import pprint 00235 #print 'wrapping up' 00236 #pprint.pprint(chainDict) 00237 #loop on the task list 00238 for i_second in reversed(range(len(chainDict['nowmTasklist']))): 00239 t_second=chainDict['nowmTasklist'][i_second] 00240 #print "t_second taskname", t_second['TaskName'] 00241 if 'primary' in t_second['nowmIO']: 00242 #print t_second['nowmIO']['primary'] 00243 primary=t_second['nowmIO']['primary'][0].replace('file:','') 00244 for i_input in reversed(range(0,i_second)): 00245 t_input=chainDict['nowmTasklist'][i_input] 00246 for (om,o) in t_input['nowmIO'].items(): 00247 if primary in o: 00248 #print "found",primary,"procuced by",om,"of",t_input['TaskName'] 00249 t_second['InputTask'] = t_input['TaskName'] 00250 t_second['InputFromOutputModule'] = om 00251 #print 't_second',pprint.pformat(t_second) 00252 if t_second['TaskName'].startswith('HARVEST'): 00253 chainDict.update(copy.deepcopy(self.defaultHarvest)) 00254 chainDict['DQMConfigCacheID']=t_second['ConfigCacheID'] 00255 ## the info are not in the task specific dict but in the general dict 00256 #t_input.update(copy.deepcopy(self.defaultHarvest)) 00257 #t_input['DQMConfigCacheID']=t_second['ConfigCacheID'] 00258 break 00259 00260 ## there is in fact only one acquisition era 00261 #if len(set(chainDict['AcquisitionEra'].values()))==1: 00262 # print "setting only one acq" 00263 if acqEra: 00264 chainDict['AcquisitionEra'] = chainDict['AcquisitionEra'].values()[0] 00265 00266 ## clean things up now 00267 itask=0 00268 if self.keep: 00269 for i in self.keep: 00270 if type(i)==int and i < len(chainDict['nowmTasklist']): 00271 chainDict['nowmTasklist'][i]['KeepOutput']=True 00272 for (i,t) in enumerate(chainDict['nowmTasklist']): 00273 if t['TaskName'].startswith('HARVEST'): 00274 continue 00275 if not self.keep: 00276 t['KeepOutput']=True 00277 elif t['TaskName'] in self.keep: 00278 t['KeepOutput']=True 00279 t.pop('nowmIO') 00280 itask+=1 00281 chainDict['Task%d'%(itask)]=t 00282 00283 00284 ## 00285 00286 00287 ## provide the number of tasks 00288 chainDict['TaskChain']=itask#len(chainDict['nowmTasklist']) 00289 00290 chainDict.pop('nowmTasklist') 00291 self.chainDicts[n]=chainDict 00292 00293 00294 return 0
def MatrixInjector::MatrixInjector::submit | ( | self | ) |
Definition at line 344 of file MatrixInjector.py.
00345 : 00346 try: 00347 from modules.wma import makeRequest,approveRequest 00348 from wmcontrol import random_sleep 00349 print '\n\tFound wmcontrol\n' 00350 except: 00351 print '\n\tUnable to find wmcontrol modules. Please include it in your python path\n' 00352 if not self.testMode: 00353 print '\n\t QUIT\n' 00354 sys.exit(-17) 00355 00356 import pprint 00357 for (n,d) in self.chainDicts.items(): 00358 if self.testMode: 00359 print "Only viewing request",n 00360 print pprint.pprint(d) 00361 else: 00362 #submit to wmagent each dict 00363 print "For eyes before submitting",n 00364 print pprint.pprint(d) 00365 print "Submitting",n,"..........." 00366 workFlow=makeRequest(self.wmagent,d,encodeDict=True) 00367 approveRequest(self.wmagent,workFlow) 00368 print "...........",n,"submitted" 00369 random_sleep() 00370 00371 00372
def MatrixInjector::MatrixInjector::upload | ( | self | ) |
Definition at line 324 of file MatrixInjector.py.
00325 : 00326 for (n,d) in self.chainDicts.items(): 00327 for it in d: 00328 if it.startswith("Task") and it!='TaskChain': 00329 #upload 00330 couchID=self.uploadConf(d[it]['ConfigCacheID'], 00331 str(n)+d[it]['TaskName'], 00332 d['CouchURL'] 00333 ) 00334 print d[it]['ConfigCacheID']," uploaded to couchDB for",str(n),"with ID",couchID 00335 d[it]['ConfigCacheID']=couchID 00336 if it =='DQMConfigCacheID': 00337 couchID=self.uploadConf(d['DQMConfigCacheID'], 00338 str(n)+'harvesting', 00339 d['CouchURL'] 00340 ) 00341 print d['DQMConfigCacheID'],"uploaded to couchDB for",str(n),"with ID",couchID 00342 d['DQMConfigCacheID']=couchID 00343
def MatrixInjector::MatrixInjector::uploadConf | ( | self, | |
filePath, | |||
label, | |||
where | |||
) |
Definition at line 295 of file MatrixInjector.py.
00296 : 00297 labelInCouch=self.label+'_'+label 00298 cacheName=filePath.split('/')[-1] 00299 if self.testMode: 00300 self.count+=1 00301 print '\tFake upload of',filePath,'to couch with label',labelInCouch 00302 return self.count 00303 else: 00304 try: 00305 from modules.wma import upload_to_couch 00306 except: 00307 print '\n\tUnable to find wmcontrol modules. Please include it in your python path\n' 00308 print '\n\t QUIT\n' 00309 sys.exit(-16) 00310 if cacheName in self.couchCache: 00311 print "Not re-uploading",filePath,"to",where,"for",label 00312 cacheId=self.couchCache[cacheName] 00313 else: 00314 print "Loading",filePath,"to",where,"for",label 00315 cacheId=upload_to_couch(filePath, 00316 labelInCouch, 00317 self.user, 00318 self.group, 00319 test_mode=False, 00320 url=where 00321 ) 00322 self.couchCache[cacheName]=cacheId 00323 return cacheId
Definition at line 25 of file MatrixInjector.py.
Definition at line 25 of file MatrixInjector.py.
Definition at line 25 of file MatrixInjector.py.
Definition at line 25 of file MatrixInjector.py.
Definition at line 25 of file MatrixInjector.py.
Definition at line 25 of file MatrixInjector.py.
Definition at line 25 of file MatrixInjector.py.
Definition at line 25 of file MatrixInjector.py.
Definition at line 25 of file MatrixInjector.py.
Definition at line 25 of file MatrixInjector.py.
Definition at line 25 of file MatrixInjector.py.
Definition at line 25 of file MatrixInjector.py.
Definition at line 25 of file MatrixInjector.py.
Definition at line 25 of file MatrixInjector.py.
Definition at line 25 of file MatrixInjector.py.
Definition at line 25 of file MatrixInjector.py.
Definition at line 25 of file MatrixInjector.py.
Definition at line 25 of file MatrixInjector.py.