00001 import sys
00002 import json
00003 import os
00004 import copy
00005
00006 def performInjectionOptionTest(opt):
00007 if opt.show:
00008 print 'Not injecting to wmagent in --show mode. Need to run the worklfows.'
00009 sys.exit(-1)
00010 if opt.wmcontrol=='init':
00011
00012 opt.nThreads=0
00013 if opt.wmcontrol=='test':
00014
00015 opt.dryRun=True
00016 if opt.wmcontrol=='submit' and opt.nThreads==0:
00017 print 'Not injecting to wmagent in -j 0 mode. Need to run the worklfows.'
00018 sys.exit(-1)
00019 if opt.wmcontrol=='force':
00020 print "This is an expert setting, you'd better know what you're doing"
00021 opt.dryRun=True
00022
00023 class MatrixInjector(object):
00024
00025 def __init__(self,opt,mode='init'):
00026 self.count=1040
00027 self.testMode=((mode!='submit') and (mode!='force'))
00028 self.version =1
00029 self.keep = opt.keep
00030
00031
00032 self.wmagent=os.getenv('WMAGENT_REQMGR')
00033 if not self.wmagent:
00034 self.wmagent = 'cmsweb.cern.ch'
00035
00036
00037 self.couch = 'https://'+self.wmagent+'/couchdb'
00038 self.couchDB = 'reqmgr_config_cache'
00039 self.couchCache={}
00040 self.user = os.getenv('USER')
00041 self.group = 'ppd'
00042 self.label = 'RelValSet_'+os.getenv('CMSSW_VERSION').replace('-','')+'_v'+str(self.version)
00043 self.speciallabel=''
00044 if opt.label:
00045 self.speciallabel= '_'+opt.label
00046
00047
00048 if not os.getenv('WMCORE_ROOT'):
00049 print '\n\twmclient is not setup properly. Will not be able to upload or submit requests.\n'
00050 if not self.testMode:
00051 print '\n\t QUIT\n'
00052 sys.exit(-18)
00053 else:
00054 print '\n\tFound wmclient\n'
00055
00056 self.defaultChain={
00057 "RequestType" : "TaskChain",
00058 "AcquisitionEra": {},
00059 "ProcessingString": {},
00060 "Requestor": self.user,
00061 "Group": self.group,
00062 "CMSSWVersion": os.getenv('CMSSW_VERSION'),
00063 "Campaign": os.getenv('CMSSW_VERSION'),
00064 "ScramArch": os.getenv('SCRAM_ARCH'),
00065 "ProcessingVersion": self.version,
00066 "GlobalTag": None,
00067 "CouchURL": self.couch,
00068 "CouchDBName": self.couchDB,
00069
00070 "SiteWhitelist" : ["T2_CH_CERN", "T1_US_FNAL"],
00071 "TaskChain" : None,
00072 "nowmTasklist" : [],
00073 "unmergedLFNBase" : "/store/unmerged",
00074 "mergedLFNBase" : "/store/relval",
00075 "dashboardActivity" : "relval",
00076 "Memory" : 2400,
00077 "SizePerEvent" : 1234,
00078 "TimePerEvent" : 20
00079 }
00080
00081 self.defaultHarvest={
00082 "EnableDQMHarvest" : 1,
00083 "DQMUploadUrl" : "https://cmsweb.cern.ch/dqm/relval",
00084 "DQMConfigCacheID" : None
00085 }
00086
00087 self.defaultScratch={
00088 "TaskName" : None,
00089 "ConfigCacheID" : None,
00090 "GlobalTag": None,
00091 "SplittingAlgorithm" : "EventBased",
00092 "SplittingArguments" : {"events_per_job" : None},
00093 "RequestNumEvents" : None,
00094 "Seeding" : "AutomaticSeeding",
00095 "PrimaryDataset" : None,
00096 "nowmIO": {},
00097 "KeepOutput" : False
00098 }
00099 self.defaultInput={
00100 "TaskName" : "DigiHLT",
00101 "ConfigCacheID" : None,
00102 "GlobalTag": None,
00103 "InputDataset" : None,
00104 "SplittingAlgorithm" : "LumiBased",
00105 "SplittingArguments" : {"lumis_per_job" : 10},
00106 "nowmIO": {},
00107 "KeepOutput" : False
00108 }
00109 self.defaultTask={
00110 "TaskName" : None,
00111 "InputTask" : None,
00112 "InputFromOutputModule" : None,
00113 "ConfigCacheID" : None,
00114 "GlobalTag": None,
00115 "SplittingAlgorithm" : "LumiBased",
00116 "SplittingArguments" : {"lumis_per_job" : 10},
00117 "nowmIO": {},
00118 "KeepOutput" : False
00119 }
00120
00121 self.chainDicts={}
00122
00123
00124 def prepare(self,mReader, directories, mode='init'):
00125 try:
00126 from Configuration.PyReleaseValidation.relval_steps import wmsplit
00127 import pprint
00128 pprint.pprint(wmsplit)
00129 except:
00130 print "Not set up for step splitting"
00131 wmsplit={}
00132
00133 acqEra=False
00134 for (n,dir) in directories.items():
00135 chainDict=copy.deepcopy(self.defaultChain)
00136 print "inspecting",dir
00137 nextHasDSInput=None
00138 for (x,s) in mReader.workFlowSteps.items():
00139
00140
00141 if x[0]==n:
00142
00143 chainDict['RequestString']='RV'+chainDict['CMSSWVersion']+s[1].split('+')[0]
00144 index=0
00145 splitForThisWf=None
00146 thisLabel=self.speciallabel
00147 processStrPrefix=''
00148 for step in s[3]:
00149
00150 if 'INPUT' in step or (not isinstance(s[2][index],str)):
00151 nextHasDSInput=s[2][index]
00152
00153 else:
00154
00155 if (index==0):
00156
00157 chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultScratch))
00158 try:
00159 chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
00160 except:
00161 print "Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created"
00162 return -15
00163
00164 chainDict['nowmTasklist'][-1]['PrimaryDataset']='RelVal'+s[1].split('+')[0]
00165 if not '--relval' in s[2][index]:
00166 print 'Impossible to create task from scratch without splitting information with --relval'
00167 return -12
00168 else:
00169 arg=s[2][index].split()
00170 ns=map(int,arg[arg.index('--relval')+1].split(','))
00171 chainDict['nowmTasklist'][-1]['RequestNumEvents'] = ns[0]
00172 chainDict['nowmTasklist'][-1]['SplittingArguments']['events_per_job'] = ns[1]
00173 if 'FASTSIM' in s[2][index]:
00174 thisLabel+='_FastSim'
00175
00176 elif nextHasDSInput:
00177 chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultInput))
00178 try:
00179 chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
00180 except:
00181 print "Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created"
00182 return -15
00183 chainDict['nowmTasklist'][-1]['InputDataset']=nextHasDSInput.dataSet
00184 splitForThisWf=nextHasDSInput.split
00185 chainDict['nowmTasklist'][-1]['SplittingArguments']['lumis_per_job']=splitForThisWf
00186 if step in wmsplit:
00187 chainDict['nowmTasklist'][-1]['SplittingArguments']['lumis_per_job']=wmsplit[step]
00188
00189 if len(nextHasDSInput.run):
00190 chainDict['nowmTasklist'][-1]['RunWhitelist']=nextHasDSInput.run
00191
00192 if '--data' in s[2][index] and nextHasDSInput.label:
00193 thisLabel+='_RelVal_%s'%nextHasDSInput.label
00194 if 'filter' in chainDict['nowmTasklist'][-1]['nowmIO']:
00195 print "This has an input DS and a filter sequence: very likely to be the PyQuen sample"
00196 processStrPrefix='PU_'
00197 chainDict['nowmTasklist'][-1]['PrimaryDataset']='RelVal'+s[1].split('+')[0]
00198 nextHasDSInput=None
00199 else:
00200
00201 chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultTask))
00202 try:
00203 chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
00204 except:
00205 print "Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created"
00206 return -15
00207 if splitForThisWf:
00208 chainDict['nowmTasklist'][-1]['SplittingArguments']['lumis_per_job']=splitForThisWf
00209 if step in wmsplit:
00210 chainDict['nowmTasklist'][-1]['SplittingArguments']['lumis_per_job']=wmsplit[step]
00211
00212
00213 chainDict['nowmTasklist'][-1]['TaskName']=step
00214 chainDict['nowmTasklist'][-1]['ConfigCacheID']='%s/%s.py'%(dir,step)
00215 chainDict['nowmTasklist'][-1]['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT']
00216 chainDict['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT']
00217 if 'pileup' in chainDict['nowmTasklist'][-1]['nowmIO']:
00218 chainDict['nowmTasklist'][-1]['MCPileup']=chainDict['nowmTasklist'][-1]['nowmIO']['pileup']
00219 if '--pileup' in s[2][index]:
00220 processStrPrefix='PU_'
00221 if acqEra:
00222
00223 chainDict['AcquisitionEra'][step]=chainDict['CMSSWVersion']
00224 chainDict['ProcessingString'][step]=processStrPrefix+chainDict['nowmTasklist'][-1]['GlobalTag'].replace('::All','')+thisLabel
00225 else:
00226
00227 chainDict['nowmTasklist'][-1]['AcquisitionEra']=chainDict['CMSSWVersion']
00228 chainDict['nowmTasklist'][-1]['ProcessingString']=processStrPrefix+chainDict['nowmTasklist'][-1]['GlobalTag'].replace('::All','')+thisLabel
00229
00230 index+=1
00231
00232
00233 import pprint
00234
00235
00236
00237 for i_second in reversed(range(len(chainDict['nowmTasklist']))):
00238 t_second=chainDict['nowmTasklist'][i_second]
00239
00240 if 'primary' in t_second['nowmIO']:
00241
00242 primary=t_second['nowmIO']['primary'][0].replace('file:','')
00243 for i_input in reversed(range(0,i_second)):
00244 t_input=chainDict['nowmTasklist'][i_input]
00245 for (om,o) in t_input['nowmIO'].items():
00246 if primary in o:
00247
00248 t_second['InputTask'] = t_input['TaskName']
00249 t_second['InputFromOutputModule'] = om
00250
00251 if t_second['TaskName'].startswith('HARVEST'):
00252 chainDict.update(copy.deepcopy(self.defaultHarvest))
00253 chainDict['DQMConfigCacheID']=t_second['ConfigCacheID']
00254
00255
00256
00257 break
00258
00259
00260
00261
00262 if acqEra:
00263 chainDict['AcquisitionEra'] = chainDict['AcquisitionEra'].values()[0]
00264
00265
00266 itask=0
00267 if self.keep:
00268 for i in self.keep:
00269 if type(i)==int and i < len(chainDict['nowmTasklist']):
00270 chainDict['nowmTasklist'][i]['KeepOutput']=True
00271 for (i,t) in enumerate(chainDict['nowmTasklist']):
00272 if t['TaskName'].startswith('HARVEST'):
00273 continue
00274 if not self.keep:
00275 t['KeepOutput']=True
00276 elif t['TaskName'] in self.keep:
00277 t['KeepOutput']=True
00278 t.pop('nowmIO')
00279 itask+=1
00280 chainDict['Task%d'%(itask)]=t
00281
00282
00283
00284
00285
00286
00287 chainDict['TaskChain']=itask
00288
00289 chainDict.pop('nowmTasklist')
00290 self.chainDicts[n]=chainDict
00291
00292
00293 return 0
00294
00295 def uploadConf(self,filePath,label,where):
00296 labelInCouch=self.label+'_'+label
00297 cacheName=filePath.split('/')[-1]
00298 if self.testMode:
00299 self.count+=1
00300 print '\tFake upload of',filePath,'to couch with label',labelInCouch
00301 return self.count
00302 else:
00303 try:
00304 from modules.wma import upload_to_couch
00305 except:
00306 print '\n\tUnable to find wmcontrol modules. Please include it in your python path\n'
00307 print '\n\t QUIT\n'
00308 sys.exit(-16)
00309 if cacheName in self.couchCache:
00310 print "Not re-uploading",filePath,"to",where,"for",label
00311 cacheId=self.couchCache[cacheName]
00312 else:
00313 print "Loading",filePath,"to",where,"for",label
00314 cacheId=upload_to_couch(filePath,
00315 labelInCouch,
00316 self.user,
00317 self.group,
00318 test_mode=False,
00319 url=where
00320 )
00321 self.couchCache[cacheName]=cacheId
00322 return cacheId
00323
00324 def upload(self):
00325 for (n,d) in self.chainDicts.items():
00326 for it in d:
00327 if it.startswith("Task") and it!='TaskChain':
00328
00329 couchID=self.uploadConf(d[it]['ConfigCacheID'],
00330 str(n)+d[it]['TaskName'],
00331 d['CouchURL']
00332 )
00333 print d[it]['ConfigCacheID']," uploaded to couchDB for",str(n),"with ID",couchID
00334 d[it]['ConfigCacheID']=couchID
00335 if it =='DQMConfigCacheID':
00336 couchID=self.uploadConf(d['DQMConfigCacheID'],
00337 str(n)+'harvesting',
00338 d['CouchURL']
00339 )
00340 print d['DQMConfigCacheID'],"uploaded to couchDB for",str(n),"with ID",couchID
00341 d['DQMConfigCacheID']=couchID
00342
00343
00344 def submit(self):
00345 try:
00346 from modules.wma import makeRequest,approveRequest
00347 from wmcontrol import random_sleep
00348 print '\n\tFound wmcontrol\n'
00349 except:
00350 print '\n\tUnable to find wmcontrol modules. Please include it in your python path\n'
00351 if not self.testMode:
00352 print '\n\t QUIT\n'
00353 sys.exit(-17)
00354
00355 import pprint
00356 for (n,d) in self.chainDicts.items():
00357 if self.testMode:
00358 print "Only viewing request",n
00359 print pprint.pprint(d)
00360 else:
00361
00362 print "For eyes before submitting",n
00363 print pprint.pprint(d)
00364 print "Submitting",n,"..........."
00365 workFlow=makeRequest(self.wmagent,d,encodeDict=True)
00366 approveRequest(self.wmagent,workFlow)
00367 print "...........",n,"submitted"
00368 random_sleep()
00369
00370
00371