![]() |
![]() |
Public Member Functions | |
def | __init__ |
def | prepare |
def | submit |
def | upload |
def | uploadConf |
Public Attributes | |
chainDicts | |
couch | |
couchCache | |
count | |
defaultChain | |
defaultHarvest | |
defaultInput | |
defaultScratch | |
defaultTask | |
dqmgui | |
group | |
keep | |
label | |
speciallabel | |
testMode | |
user | |
version | |
wmagent |
Definition at line 36 of file MatrixInjector.py.
Definition at line 38 of file MatrixInjector.py.
00039 : 00040 self.count=1040 00041 00042 self.dqmgui=None 00043 self.wmagent=None 00044 for k in options.split(','): 00045 if k.startswith('dqm:'): 00046 self.dqmgui=k.split(':',1)[-1] 00047 elif k.startswith('wma:'): 00048 self.wmagent=k.split(':',1)[-1] 00049 00050 self.testMode=((mode!='submit') and (mode!='force')) 00051 self.version =1 00052 self.keep = opt.keep 00053 00054 #wagemt stuff 00055 if not self.wmagent: 00056 self.wmagent=os.getenv('WMAGENT_REQMGR') 00057 if not self.wmagent: 00058 self.wmagent = 'cmsweb.cern.ch' 00059 00060 if not self.dqmgui: 00061 self.dqmgui="https://cmsweb.cern.ch/dqm/relval" 00062 #couch stuff 00063 self.couch = 'https://'+self.wmagent+'/couchdb' 00064 # self.couchDB = 'reqmgr_config_cache' 00065 self.couchCache={} # so that we do not upload like crazy, and recyle cfgs 00066 self.user = os.getenv('USER') 00067 self.group = 'ppd' 00068 self.label = 'RelValSet_'+os.getenv('CMSSW_VERSION').replace('-','')+'_v'+str(self.version) 00069 self.speciallabel='' 00070 if opt.label: 00071 self.speciallabel= '_'+opt.label 00072 00073 00074 if not os.getenv('WMCORE_ROOT'): 00075 print '\n\twmclient is not setup properly. Will not be able to upload or submit requests.\n' 00076 if not self.testMode: 00077 print '\n\t QUIT\n' 00078 sys.exit(-18) 00079 else: 00080 print '\n\tFound wmclient\n' 00081 00082 self.defaultChain={ 00083 "RequestType" : "TaskChain", #this is how we handle relvals 00084 "Requestor": self.user, #Person responsible 00085 "Group": self.group, #group for the request 00086 "CMSSWVersion": os.getenv('CMSSW_VERSION'), #CMSSW Version (used for all tasks in chain) 00087 "Campaign": os.getenv('CMSSW_VERSION'), # only for wmstat purpose 00088 "ScramArch": os.getenv('SCRAM_ARCH'), #Scram Arch (used for all tasks in chain) 00089 "ProcessingVersion": self.version, #Processing Version (used for all tasks in chain) 00090 "GlobalTag": None, #Global Tag (overridden per task) 00091 "CouchURL": self.couch, #URL of CouchDB containing Config Cache 00092 "ConfigCacheURL": self.couch, #URL of CouchDB containing Config Cache 00093 "DbsUrl": "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet", 00094 #"CouchDBName": self.couchDB, #Name of Couch Database containing config cache 00095 #- Will contain all configs for all Tasks 00096 #"SiteWhitelist" : ["T2_CH_CERN", "T1_US_FNAL"], #Site whitelist 00097 "TaskChain" : None, #Define number of tasks in chain. 00098 "nowmTasklist" : [], #a list of tasks as we put them in 00099 "unmergedLFNBase" : "/store/unmerged", 00100 "mergedLFNBase" : "/store/relval", 00101 "dashboardActivity" : "relval", 00102 "Memory" : 2400, 00103 "SizePerEvent" : 1234, 00104 "TimePerEvent" : 20 00105 } 00106 00107 self.defaultHarvest={ 00108 "EnableHarvesting" : "True", 00109 "DQMUploadUrl" : self.dqmgui, 00110 "DQMConfigCacheID" : None 00111 } 00112 00113 self.defaultScratch={ 00114 "TaskName" : None, #Task Name 00115 "ConfigCacheID" : None, #Generator Config id 00116 "GlobalTag": None, 00117 "SplittingAlgo" : "EventBased", #Splitting Algorithm 00118 "EventsPerJob" : None, #Size of jobs in terms of splitting algorithm 00119 "RequestNumEvents" : None, #Total number of events to generate 00120 "Seeding" : "AutomaticSeeding", #Random seeding method 00121 "PrimaryDataset" : None, #Primary Dataset to be created 00122 "nowmIO": {}, 00123 "KeepOutput" : False 00124 } 00125 self.defaultInput={ 00126 "TaskName" : "DigiHLT", #Task Name 00127 "ConfigCacheID" : None, #Processing Config id 00128 "GlobalTag": None, 00129 "InputDataset" : None, #Input Dataset to be processed 00130 "SplittingAlgo" : "LumiBased", #Splitting Algorithm 00131 "LumisPerJob" : 10, #Size of jobs in terms of splitting algorithm 00132 "nowmIO": {}, 00133 "KeepOutput" : False 00134 } 00135 self.defaultTask={ 00136 "TaskName" : None, #Task Name 00137 "InputTask" : None, #Input Task Name (Task Name field of a previous Task entry) 00138 "InputFromOutputModule" : None, #OutputModule name in the input task that will provide files to process 00139 "ConfigCacheID" : None, #Processing Config id 00140 "GlobalTag": None, 00141 "SplittingAlgo" : "LumiBased", #Splitting Algorithm 00142 "LumisPerJob" : 10, #Size of jobs in terms of splitting algorithm 00143 "nowmIO": {}, 00144 "KeepOutput" : False 00145 } 00146 00147 self.chainDicts={} 00148
Definition at line 149 of file MatrixInjector.py.
00150 : 00151 try: 00152 #from Configuration.PyReleaseValidation.relval_steps import wmsplit 00153 wmsplit = {} 00154 wmsplit['DIGIHI']=5 00155 wmsplit['RECOHI']=5 00156 wmsplit['HLTD']=5 00157 wmsplit['RECODreHLT']=2 00158 wmsplit['DIGIPU']=4 00159 wmsplit['DIGIPU1']=4 00160 wmsplit['RECOPU1']=1 00161 wmsplit['DIGIHISt3']=5 00162 wmsplit['RECOHISt4']=5 00163 wmsplit['SingleMuPt10_ID']=1 00164 wmsplit['DIGI_ID']=1 00165 wmsplit['RECO_ID']=1 00166 wmsplit['TTbar_ID']=1 00167 wmsplit['SingleMuPt10FS_ID']=1 00168 wmsplit['TTbarFS_ID']=1 00169 00170 #import pprint 00171 #pprint.pprint(wmsplit) 00172 except: 00173 print "Not set up for step splitting" 00174 wmsplit={} 00175 00176 acqEra=False 00177 for (n,dir) in directories.items(): 00178 chainDict=copy.deepcopy(self.defaultChain) 00179 print "inspecting",dir 00180 nextHasDSInput=None 00181 for (x,s) in mReader.workFlowSteps.items(): 00182 #x has the format (num, prefix) 00183 #s has the format (num, name, commands, stepList) 00184 if x[0]==n: 00185 #print "found",n,s[3] 00186 #chainDict['RequestString']='RV'+chainDict['CMSSWVersion']+s[1].split('+')[0] 00187 index=0 00188 splitForThisWf=None 00189 thisLabel=self.speciallabel 00190 processStrPrefix='' 00191 setPrimaryDs=None 00192 for step in s[3]: 00193 00194 if 'INPUT' in step or (not isinstance(s[2][index],str)): 00195 nextHasDSInput=s[2][index] 00196 00197 else: 00198 00199 if (index==0): 00200 #first step and not input -> gen part 00201 chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultScratch)) 00202 try: 00203 chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read()) 00204 except: 00205 print "Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created" 00206 return -15 00207 00208 chainDict['nowmTasklist'][-1]['PrimaryDataset']='RelVal'+s[1].split('+')[0] 00209 if not '--relval' in s[2][index]: 00210 print 'Impossible to create task from scratch without splitting information with --relval' 00211 return -12 00212 else: 00213 arg=s[2][index].split() 00214 ns=map(int,arg[arg.index('--relval')+1].split(',')) 00215 chainDict['nowmTasklist'][-1]['RequestNumEvents'] = ns[0] 00216 chainDict['nowmTasklist'][-1]['EventsPerJob'] = ns[1] 00217 if 'FASTSIM' in s[2][index] or '--fast' in s[2][index]: 00218 thisLabel+='_FastSim' 00219 00220 elif nextHasDSInput: 00221 chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultInput)) 00222 try: 00223 chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read()) 00224 except: 00225 print "Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created" 00226 return -15 00227 chainDict['nowmTasklist'][-1]['InputDataset']=nextHasDSInput.dataSet 00228 splitForThisWf=nextHasDSInput.split 00229 chainDict['nowmTasklist'][-1]['LumisPerJob']=splitForThisWf 00230 if step in wmsplit: 00231 chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit[step] 00232 # get the run numbers or #events 00233 if len(nextHasDSInput.run): 00234 chainDict['nowmTasklist'][-1]['RunWhitelist']=nextHasDSInput.run 00235 #print "what is s",s[2][index] 00236 if '--data' in s[2][index] and nextHasDSInput.label: 00237 thisLabel+='_RelVal_%s'%nextHasDSInput.label 00238 if 'filter' in chainDict['nowmTasklist'][-1]['nowmIO']: 00239 print "This has an input DS and a filter sequence: very likely to be the PyQuen sample" 00240 processStrPrefix='PU_' 00241 setPrimaryDs = 'RelVal'+s[1].split('+')[0] 00242 if setPrimaryDs: 00243 chainDict['nowmTasklist'][-1]['PrimaryDataset']=setPrimaryDs 00244 nextHasDSInput=None 00245 else: 00246 #not first step and no inputDS 00247 chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultTask)) 00248 try: 00249 chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read()) 00250 except: 00251 print "Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created" 00252 return -15 00253 if splitForThisWf: 00254 chainDict['nowmTasklist'][-1]['LumisPerJob']=splitForThisWf 00255 if step in wmsplit: 00256 chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit[step] 00257 00258 #print step 00259 chainDict['nowmTasklist'][-1]['TaskName']=step 00260 if setPrimaryDs: 00261 chainDict['nowmTasklist'][-1]['PrimaryDataset']=setPrimaryDs 00262 chainDict['nowmTasklist'][-1]['ConfigCacheID']='%s/%s.py'%(dir,step) 00263 chainDict['nowmTasklist'][-1]['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT'] # copy to the proper parameter name 00264 chainDict['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT'] #set in general to the last one of the chain 00265 if 'pileup' in chainDict['nowmTasklist'][-1]['nowmIO']: 00266 chainDict['nowmTasklist'][-1]['MCPileup']=chainDict['nowmTasklist'][-1]['nowmIO']['pileup'] 00267 if '--pileup' in s[2][index]: 00268 processStrPrefix='PU_' 00269 00270 if acqEra: 00271 #chainDict['AcquisitionEra'][step]=(chainDict['CMSSWVersion']+'-PU_'+chainDict['nowmTasklist'][-1]['GlobalTag']).replace('::All','')+thisLabel 00272 chainDict['AcquisitionEra'][step]=chainDict['CMSSWVersion'] 00273 chainDict['ProcessingString'][step]=processStrPrefix+chainDict['nowmTasklist'][-1]['GlobalTag'].replace('::All','')+thisLabel 00274 else: 00275 #chainDict['nowmTasklist'][-1]['AcquisitionEra']=(chainDict['CMSSWVersion']+'-PU_'+chainDict['nowmTasklist'][-1]['GlobalTag']).replace('::All','')+thisLabel 00276 chainDict['nowmTasklist'][-1]['AcquisitionEra']=chainDict['CMSSWVersion'] 00277 chainDict['nowmTasklist'][-1]['ProcessingString']=processStrPrefix+chainDict['nowmTasklist'][-1]['GlobalTag'].replace('::All','')+thisLabel 00278 00279 index+=1 00280 #end of loop through steps 00281 chainDict['RequestString']='RV'+chainDict['CMSSWVersion']+s[1].split('+')[0] 00282 if processStrPrefix or thisLabel: 00283 chainDict['RequestString']+='_'+processStrPrefix+thisLabel 00284 00285 00286 00287 #wrap up for this one 00288 import pprint 00289 #print 'wrapping up' 00290 #pprint.pprint(chainDict) 00291 #loop on the task list 00292 for i_second in reversed(range(len(chainDict['nowmTasklist']))): 00293 t_second=chainDict['nowmTasklist'][i_second] 00294 #print "t_second taskname", t_second['TaskName'] 00295 if 'primary' in t_second['nowmIO']: 00296 #print t_second['nowmIO']['primary'] 00297 primary=t_second['nowmIO']['primary'][0].replace('file:','') 00298 for i_input in reversed(range(0,i_second)): 00299 t_input=chainDict['nowmTasklist'][i_input] 00300 for (om,o) in t_input['nowmIO'].items(): 00301 if primary in o: 00302 #print "found",primary,"procuced by",om,"of",t_input['TaskName'] 00303 t_second['InputTask'] = t_input['TaskName'] 00304 t_second['InputFromOutputModule'] = om 00305 #print 't_second',pprint.pformat(t_second) 00306 if t_second['TaskName'].startswith('HARVEST'): 00307 chainDict.update(copy.deepcopy(self.defaultHarvest)) 00308 chainDict['DQMConfigCacheID']=t_second['ConfigCacheID'] 00309 ## the info are not in the task specific dict but in the general dict 00310 #t_input.update(copy.deepcopy(self.defaultHarvest)) 00311 #t_input['DQMConfigCacheID']=t_second['ConfigCacheID'] 00312 break 00313 00314 ## there is in fact only one acquisition era 00315 #if len(set(chainDict['AcquisitionEra'].values()))==1: 00316 # print "setting only one acq" 00317 if acqEra: 00318 chainDict['AcquisitionEra'] = chainDict['AcquisitionEra'].values()[0] 00319 00320 ## clean things up now 00321 itask=0 00322 if self.keep: 00323 for i in self.keep: 00324 if type(i)==int and i < len(chainDict['nowmTasklist']): 00325 chainDict['nowmTasklist'][i]['KeepOutput']=True 00326 for (i,t) in enumerate(chainDict['nowmTasklist']): 00327 if t['TaskName'].startswith('HARVEST'): 00328 continue 00329 if not self.keep: 00330 t['KeepOutput']=True 00331 elif t['TaskName'] in self.keep: 00332 t['KeepOutput']=True 00333 t.pop('nowmIO') 00334 itask+=1 00335 chainDict['Task%d'%(itask)]=t 00336 00337 00338 ## 00339 00340 00341 ## provide the number of tasks 00342 chainDict['TaskChain']=itask#len(chainDict['nowmTasklist']) 00343 00344 chainDict.pop('nowmTasklist') 00345 self.chainDicts[n]=chainDict 00346 00347 00348 return 0
def MatrixInjector::MatrixInjector::submit | ( | self | ) |
Definition at line 396 of file MatrixInjector.py.
00397 : 00398 try: 00399 from modules.wma import makeRequest,approveRequest 00400 from wmcontrol import random_sleep 00401 print '\n\tFound wmcontrol\n' 00402 except: 00403 print '\n\tUnable to find wmcontrol modules. Please include it in your python path\n' 00404 if not self.testMode: 00405 print '\n\t QUIT\n' 00406 sys.exit(-17) 00407 00408 import pprint 00409 for (n,d) in self.chainDicts.items(): 00410 if self.testMode: 00411 print "Only viewing request",n 00412 print pprint.pprint(d) 00413 else: 00414 #submit to wmagent each dict 00415 print "For eyes before submitting",n 00416 print pprint.pprint(d) 00417 print "Submitting",n,"..........." 00418 workFlow=makeRequest(self.wmagent,d,encodeDict=True) 00419 approveRequest(self.wmagent,workFlow) 00420 print "...........",n,"submitted" 00421 random_sleep() 00422 00423 00424
def MatrixInjector::MatrixInjector::upload | ( | self | ) |
Definition at line 376 of file MatrixInjector.py.
00377 : 00378 for (n,d) in self.chainDicts.items(): 00379 for it in d: 00380 if it.startswith("Task") and it!='TaskChain': 00381 #upload 00382 couchID=self.uploadConf(d[it]['ConfigCacheID'], 00383 str(n)+d[it]['TaskName'], 00384 d['CouchURL'] 00385 ) 00386 print d[it]['ConfigCacheID']," uploaded to couchDB for",str(n),"with ID",couchID 00387 d[it]['ConfigCacheID']=couchID 00388 if it =='DQMConfigCacheID': 00389 couchID=self.uploadConf(d['DQMConfigCacheID'], 00390 str(n)+'harvesting', 00391 d['CouchURL'] 00392 ) 00393 print d['DQMConfigCacheID'],"uploaded to couchDB for",str(n),"with ID",couchID 00394 d['DQMConfigCacheID']=couchID 00395
def MatrixInjector::MatrixInjector::uploadConf | ( | self, | |
filePath, | |||
label, | |||
where | |||
) |
Definition at line 349 of file MatrixInjector.py.
00350 : 00351 labelInCouch=self.label+'_'+label 00352 cacheName=filePath.split('/')[-1] 00353 if self.testMode: 00354 self.count+=1 00355 print '\tFake upload of',filePath,'to couch with label',labelInCouch 00356 return self.count 00357 else: 00358 try: 00359 from modules.wma import upload_to_couch,DATABASE_NAME 00360 except: 00361 print '\n\tUnable to find wmcontrol modules. Please include it in your python path\n' 00362 print '\n\t QUIT\n' 00363 sys.exit(-16) 00364 00365 if cacheName in self.couchCache: 00366 print "Not re-uploading",filePath,"to",where,"for",label 00367 cacheId=self.couchCache[cacheName] 00368 else: 00369 print "Loading",filePath,"to",where,"for",label 00370 ## totally fork the upload to couch to prevent cross loading of process configurations 00371 pool = multiprocessing.Pool(1) 00372 cacheIds = pool.map( upload_to_couch_oneArg, [(filePath,labelInCouch,self.user,self.group,where)] ) 00373 cacheId = cacheIds[0] 00374 self.couchCache[cacheName]=cacheId 00375 return cacheId
Definition at line 39 of file MatrixInjector.py.
Definition at line 38 of file MatrixInjector.py.
Definition at line 39 of file MatrixInjector.py.
Definition at line 38 of file MatrixInjector.py.
Definition at line 39 of file MatrixInjector.py.
Definition at line 39 of file MatrixInjector.py.
Definition at line 39 of file MatrixInjector.py.
Definition at line 39 of file MatrixInjector.py.
Definition at line 39 of file MatrixInjector.py.
Definition at line 38 of file MatrixInjector.py.
Definition at line 39 of file MatrixInjector.py.
Definition at line 38 of file MatrixInjector.py.
Definition at line 39 of file MatrixInjector.py.
Definition at line 39 of file MatrixInjector.py.
Definition at line 38 of file MatrixInjector.py.
Definition at line 39 of file MatrixInjector.py.
Definition at line 38 of file MatrixInjector.py.
Definition at line 38 of file MatrixInjector.py.