![]() |
![]() |
Public Member Functions | |
def | __init__ |
def | prepare |
def | submit |
def | upload |
def | uploadConf |
Public Attributes | |
chainDicts | |
couch | |
couchDB | |
count | |
defaultChain | |
defaultInput | |
defaultScratch | |
defaultTask | |
group | |
label | |
testMode | |
user | |
version | |
wmagent |
Definition at line 24 of file MatrixInjector.py.
Definition at line 26 of file MatrixInjector.py.
00027 : 00028 self.count=1040 00029 self.testMode=((mode!='submit') and (mode!='force')) 00030 self.version ='v1' 00031 00032 #wagemt stuff 00033 self.wmagent=os.getenv('WMAGENT_REQMGR') 00034 if not self.wmagent: 00035 self.wmagent = 'cmsweb.cern.ch' 00036 00037 #couch stuff 00038 self.couch = 'https://'+self.wmagent+'/couchdb' 00039 self.couchDB = 'reqmgr_config_cache' 00040 self.user = os.getenv('USER') 00041 self.group = 'ppd' 00042 self.label = 'RelValSet_'+os.getenv('CMSSW_VERSION').replace('-','')+'_'+self.version 00043 00044 00045 if not os.getenv('WMCORE_ROOT'): 00046 print '\n\twmclient is not setup properly. Will not be able to upload or submit requests.\n' 00047 if not self.testMode: 00048 print '\n\t QUIT\n' 00049 sys.exit(-18) 00050 else: 00051 print '\n\tFound wmclient\n' 00052 00053 self.defaultChain={ 00054 "RequestType" : "TaskChain", #this is how we handle relvals 00055 "AcquisitionEra": "ReleaseValidation", #Acq Era 00056 "Requestor": self.user, #Person responsible 00057 "Group": self.group, #group for the request 00058 "CMSSWVersion": os.getenv('CMSSW_VERSION'), #CMSSW Version (used for all tasks in chain) 00059 "ScramArch": os.getenv('SCRAM_ARCH'), #Scram Arch (used for all tasks in chain) 00060 "ProcessingVersion": self.version, #Processing Version (used for all tasks in chain) 00061 "GlobalTag": None, #Global Tag (used for all tasks) 00062 "CouchURL": self.couch, #URL of CouchDB containing Config Cache 00063 "CouchDBName": self.couchDB, #Name of Couch Database containing config cache 00064 #- Will contain all configs for all Tasks 00065 "SiteWhitelist" : ["T1_CH_CERN", "T1_US_FNAL"], #Site whitelist 00066 "TaskChain" : None, #Define number of tasks in chain. 00067 "nowmTasklist" : [] #a list of tasks as we put them in 00068 } 00069 00070 self.defaultScratch={ 00071 "TaskName" : None, #Task Name 00072 "ConfigCacheID" : None, #Generator Config id 00073 "SplittingAlgorithm" : "EventBased", #Splitting Algorithm 00074 "SplittingArguments" : {"events_per_job" : 250}, #Size of jobs in terms of splitting algorithm 00075 #"RequestSizeEvents" : 10000, #Total number of events to generate 00076 "RequestNumEvents" : 10000, #Total number of events to generate 00077 "Seeding" : "AutomaticSeeding", #Random seeding method 00078 "PrimaryDataset" : None, #Primary Dataset to be created 00079 "nowmIO": {} 00080 } 00081 self.defaultInput={ 00082 "TaskName" : "DigiHLT", #Task Name 00083 "ConfigCacheID" : None, #Processing Config id 00084 "InputDataset" : None, #Input Dataset to be processed 00085 #"SplittingAlgorithm" : "FileBased", #Splitting Algorithm 00086 #"SplittingArguments" : {"files_per_job" : 1}, #Size of jobs in terms of splitting algorithm 00087 "SplittingAlgorithm" : "LumiBased", #Splitting Algorithm 00088 "SplittingArguments" : {"lumis_per_job" : 1}, #Size of jobs in terms of splitting algorithm 00089 "nowmIO": {} 00090 } 00091 self.defaultTask={ 00092 "TaskName" : None, #Task Name 00093 "InputTask" : None, #Input Task Name (Task Name field of a previous Task entry) 00094 "InputFromOutputModule" : None, #OutputModule name in the input task that will provide files to process 00095 "ConfigCacheID" : None, #Processing Config id 00096 "SplittingAlgorithm" : "FileBased", #Splitting Algorithm 00097 "SplittingArguments" : {"files_per_job" : 1 }, #Size of jobs in terms of splitting algorithm 00098 "nowmIO": {} 00099 } 00100 00101 self.chainDicts={} 00102
Definition at line 103 of file MatrixInjector.py.
00104 : 00105 00106 for (n,dir) in directories.items(): 00107 chainDict=copy.deepcopy(self.defaultChain) 00108 print "inspecting",dir 00109 nextHasDSInput=None 00110 for (x,s) in mReader.workFlowSteps.items(): 00111 #x has the format (num, prefix) 00112 #s has the format (num, name, commands, stepList) 00113 if x[0]==n: 00114 #print "found",n,s[3] 00115 chainDict['RequestString']='RV'+s[1].split('+')[0] 00116 index=0 00117 for step in s[3]: 00118 if 'INPUT' in step or (not isinstance(s[2][index],str)): 00119 nextHasDSInput=s[2][index] 00120 else: 00121 if 'HARVEST' in step: 00122 continue 00123 if (index==0): 00124 #first step and not input -> gen part 00125 chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultScratch)) 00126 chainDict['nowmTasklist'][-1]['PrimaryDataset']='RelVal'+step 00127 if not '--relval' in s[2][index]: 00128 print 'Impossible to create task from scratch' 00129 return -12 00130 else: 00131 arg=s[2][index].split() 00132 ns=map(int,arg[arg.index('--relval')+1].split(',')) 00133 chainDict['nowmTasklist'][-1]['RequestSizeEvents'] = ns[0] 00134 chainDict['nowmTasklist'][-1]['SplittingArguments']['events_per_job'] = ns[1] 00135 elif nextHasDSInput: 00136 chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultInput)) 00137 chainDict['nowmTasklist'][-1]['InputDataset']=nextHasDSInput.dataSet 00138 # get the run numbers or #events 00139 if len(nextHasDSInput.run): 00140 chainDict['nowmTasklist'][-1]['RunWhitelist']=nextHasDSInput.run 00141 nextHasDSInput=None 00142 else: 00143 #not first step and no inputDS 00144 chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultTask)) 00145 #print step 00146 chainDict['nowmTasklist'][-1]['TaskName']=step 00147 try: 00148 chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read()) 00149 except: 00150 print "Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created" 00151 return -15 00152 chainDict['nowmTasklist'][-1]['ConfigCacheID']='%s/%s.py'%(dir,step) 00153 chainDict['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT'] 00154 index+=1 00155 00156 #wrap up for this one 00157 #print 'wrapping up' 00158 chainDict['TaskChain']=len(chainDict['nowmTasklist']) 00159 #loop on the task list 00160 for i_second in reversed(range(len(chainDict['nowmTasklist']))): 00161 #for t_second in reversed(chainDict['nowmTasklist']): 00162 t_second=chainDict['nowmTasklist'][i_second] 00163 #print "t_second taskname", t_second['TaskName'] 00164 if 'primary' in t_second['nowmIO']: 00165 #print t_second['nowmIO']['primary'] 00166 primary=t_second['nowmIO']['primary'][0].replace('file:','') 00167 #for t_input in reversed(chainDict['nowmTasklist']): 00168 for i_input in reversed(range(0,i_second)): 00169 t_input=chainDict['nowmTasklist'][i_input] 00170 for (om,o) in t_input['nowmIO'].items(): 00171 if primary in o: 00172 #print "found",primary,"procuced by",om,"of",t_input['TaskName'] 00173 t_second['InputTask'] = t_input['TaskName'] 00174 t_second['InputFromOutputModule'] = om 00175 #print 't_second',t_second 00176 break 00177 for (i,t) in enumerate(chainDict['nowmTasklist']): 00178 t.pop('nowmIO') 00179 chainDict['Task%d'%(i+1)]=t 00180 00181 chainDict.pop('nowmTasklist') 00182 self.chainDicts[n]=chainDict 00183 00184 00185 return 0
def MatrixInjector::MatrixInjector::submit | ( | self | ) |
Definition at line 219 of file MatrixInjector.py.
00220 : 00221 try: 00222 from wmcontrol2_newauth import makeRequest,approveRequest,random_sleep 00223 print '\n\tFound wmcontrol\n' 00224 except: 00225 print '\n\tUnable to find wmcontrol modules. Please include it in your python path\n' 00226 if not self.testMode: 00227 print '\n\t QUIT\n' 00228 sys.exit(-17) 00229 00230 import pprint 00231 for (n,d) in self.chainDicts.items(): 00232 if self.testMode: 00233 print "Only viewing request",n 00234 print pprint.pprint(d) 00235 else: 00236 #submit to wmagent each dict 00237 print "For eyes before submitting",n 00238 print pprint.pprint(d) 00239 print "Submitting",n,"..........." 00240 workFlow=makeRequest(self.wmagent,d) 00241 approveRequest(self.wmagent,workFlow) 00242 print "...........",n,"submitted" 00243 random_sleep() 00244 00245 00246
def MatrixInjector::MatrixInjector::upload | ( | self | ) |
Definition at line 207 of file MatrixInjector.py.
00208 : 00209 for (n,d) in self.chainDicts.items(): 00210 for it in d: 00211 if it.startswith("Task") and it!='TaskChain': 00212 #upload 00213 couchID=self.uploadConf(d[it]['ConfigCacheID'], 00214 str(n)+d[it]['TaskName'], 00215 d['CouchURL'] 00216 ) 00217 print d[it]['ConfigCacheID']," uploaded to couchDB for",str(n),"with ID",couchID 00218 d[it]['ConfigCacheID']=couchID
def MatrixInjector::MatrixInjector::uploadConf | ( | self, | |
filePath, | |||
label, | |||
where | |||
) |
Definition at line 186 of file MatrixInjector.py.
00187 : 00188 labelInCouch=self.label+'_'+label 00189 if self.testMode: 00190 self.count+=1 00191 print '\tFake upload to couch with label',labelInCouch 00192 return self.count 00193 else: 00194 try: 00195 from wmcontrol2_newauth import upload_to_couch,loadConfig 00196 except: 00197 print '\n\tUnable to find wmcontrol modules. Please include it in your python path\n' 00198 print '\n\t QUIT\n' 00199 sys.exit(-16) 00200 print "Loading",filePath,"to",where,"for",label 00201 return upload_to_couch(filePath, 00202 self.group, 00203 self.user, 00204 labelInCouch, 00205 where 00206 )
Definition at line 26 of file MatrixInjector.py.
Definition at line 26 of file MatrixInjector.py.
Definition at line 26 of file MatrixInjector.py.
Definition at line 26 of file MatrixInjector.py.
Definition at line 26 of file MatrixInjector.py.
Definition at line 26 of file MatrixInjector.py.
Definition at line 26 of file MatrixInjector.py.
Definition at line 26 of file MatrixInjector.py.
Definition at line 26 of file MatrixInjector.py.
Definition at line 26 of file MatrixInjector.py.
Definition at line 26 of file MatrixInjector.py.
Definition at line 26 of file MatrixInjector.py.
Definition at line 26 of file MatrixInjector.py.
Definition at line 26 of file MatrixInjector.py.