CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_5_3_9_patch3/src/Configuration/PyReleaseValidation/python/MatrixInjector.py

Go to the documentation of this file.
00001 import sys
00002 import json
00003 import os
00004 import copy
00005 
00006 def performInjectionOptionTest(opt):
00007     if opt.show:
00008         print 'Not injecting to wmagent in --show mode. Need to run the worklfows.'
00009         sys.exit(-1)
00010     if opt.wmcontrol=='init':
00011         #init means it'll be in test mode
00012         opt.nThreads=0
00013     if opt.wmcontrol=='test':
00014         #means the wf were created already, and we just dryRun it.
00015         opt.dryRun=True
00016     if opt.wmcontrol=='submit' and opt.nThreads==0:
00017         print 'Not injecting to wmagent in -j 0 mode. Need to run the worklfows.'
00018         sys.exit(-1)
00019     if opt.wmcontrol=='force':
00020         print "This is an expert setting, you'd better know what you're doing"
00021         opt.dryRun=True
00022 
00023 class MatrixInjector(object):
00024 
00025     def __init__(self,opt,mode='init'):
00026         self.count=1040
00027         self.testMode=((mode!='submit') and (mode!='force'))
00028         self.version =1
00029         self.keep = opt.keep
00030 
00031         #wagemt stuff
00032         self.wmagent=os.getenv('WMAGENT_REQMGR')
00033         if not self.wmagent:
00034             self.wmagent = 'cmsweb.cern.ch'
00035             
00036         #couch stuff
00037         self.couch = 'https://'+self.wmagent+'/couchdb'
00038         self.couchDB = 'reqmgr_config_cache'
00039         self.couchCache={} # so that we do not upload like crazy, and recyle cfgs
00040         self.user = os.getenv('USER')
00041         self.group = 'ppd'
00042         self.label = 'RelValSet_'+os.getenv('CMSSW_VERSION').replace('-','')+'_v'+str(self.version)
00043         self.speciallabel=''
00044         if opt.label:
00045             self.speciallabel= '_'+opt.label
00046 
00047 
00048         if not os.getenv('WMCORE_ROOT'):
00049             print '\n\twmclient is not setup properly. Will not be able to upload or submit requests.\n'
00050             if not self.testMode:
00051                 print '\n\t QUIT\n'
00052                 sys.exit(-18)
00053         else:
00054             print '\n\tFound wmclient\n'
00055             
00056         self.defaultChain={
00057             "RequestType" :   "TaskChain",                    #this is how we handle relvals
00058             "AcquisitionEra": {},                             #Acq Era
00059             "ProcessingString": {},                           # processing string to label the dataset
00060             "Requestor": self.user,                           #Person responsible
00061             "Group": self.group,                              #group for the request
00062             "CMSSWVersion": os.getenv('CMSSW_VERSION'),       #CMSSW Version (used for all tasks in chain)
00063             "Campaign": os.getenv('CMSSW_VERSION'),           # only for wmstat purpose
00064             "ScramArch": os.getenv('SCRAM_ARCH'),             #Scram Arch (used for all tasks in chain)
00065             "ProcessingVersion": self.version,                #Processing Version (used for all tasks in chain)
00066             "GlobalTag": None,                                #Global Tag (overridden per task)
00067             "CouchURL": self.couch,                           #URL of CouchDB containing Config Cache
00068             "CouchDBName": self.couchDB,                      #Name of Couch Database containing config cache
00069             #- Will contain all configs for all Tasks
00070             "SiteWhitelist" : ["T2_CH_CERN", "T1_US_FNAL"],   #Site whitelist
00071             "TaskChain" : None,                                  #Define number of tasks in chain.
00072             "nowmTasklist" : [],  #a list of tasks as we put them in
00073             "unmergedLFNBase" : "/store/unmerged",
00074             "mergedLFNBase" : "/store/relval",
00075             "dashboardActivity" : "relval",
00076             "Memory" : 2400,
00077             "SizePerEvent" : 1234,
00078             "TimePerEvent" : 20
00079             }
00080 
00081         self.defaultHarvest={
00082             "EnableDQMHarvest" : 1,
00083             "DQMUploadUrl" : "https://cmsweb.cern.ch/dqm/relval",
00084             "DQMConfigCacheID" : None
00085             }
00086         
00087         self.defaultScratch={
00088             "TaskName" : None,                            #Task Name
00089             "ConfigCacheID" : None,                   #Generator Config id
00090             "GlobalTag": None,
00091             "SplittingAlgorithm"  : "EventBased",             #Splitting Algorithm
00092             "SplittingArguments" : {"events_per_job" : None},  #Size of jobs in terms of splitting algorithm
00093             "RequestNumEvents" : None,                      #Total number of events to generate
00094             "Seeding" : "AutomaticSeeding",                          #Random seeding method
00095             "PrimaryDataset" : None,                          #Primary Dataset to be created
00096             "nowmIO": {},
00097             "KeepOutput" : False
00098             }
00099         self.defaultInput={
00100             "TaskName" : "DigiHLT",                                      #Task Name
00101             "ConfigCacheID" : None,                                      #Processing Config id
00102             "GlobalTag": None,
00103             "InputDataset" : None,                                       #Input Dataset to be processed
00104             "SplittingAlgorithm"  : "LumiBased",                        #Splitting Algorithm
00105             "SplittingArguments" : {"lumis_per_job" : 10},               #Size of jobs in terms of splitting algorithm
00106             "nowmIO": {},
00107             "KeepOutput" : False
00108             }
00109         self.defaultTask={
00110             "TaskName" : None,                                 #Task Name
00111             "InputTask" : None,                                #Input Task Name (Task Name field of a previous Task entry)
00112             "InputFromOutputModule" : None,                    #OutputModule name in the input task that will provide files to process
00113             "ConfigCacheID" : None,                            #Processing Config id
00114             "GlobalTag": None,
00115             "SplittingAlgorithm"  : "LumiBased",                        #Splitting Algorithm
00116             "SplittingArguments" : {"lumis_per_job" : 10},               #Size of jobs in terms of splitting algorithm
00117             "nowmIO": {},
00118             "KeepOutput" : False
00119             }
00120 
00121         self.chainDicts={}
00122 
00123 
00124     def prepare(self,mReader, directories, mode='init'):
00125         try:
00126             from Configuration.PyReleaseValidation.relval_steps import wmsplit
00127             import pprint
00128             pprint.pprint(wmsplit)
00129         except:
00130             print "Not set up for step splitting"
00131             wmsplit={}
00132 
00133         acqEra=False
00134         for (n,dir) in directories.items():
00135             chainDict=copy.deepcopy(self.defaultChain)
00136             print "inspecting",dir
00137             nextHasDSInput=None
00138             for (x,s) in mReader.workFlowSteps.items():
00139                 #x has the format (num, prefix)
00140                 #s has the format (num, name, commands, stepList)
00141                 if x[0]==n:
00142                     #print "found",n,s[3]
00143                     chainDict['RequestString']='RV'+chainDict['CMSSWVersion']+s[1].split('+')[0]
00144                     index=0
00145                     splitForThisWf=None
00146                     thisLabel=self.speciallabel
00147                     processStrPrefix=''
00148                     for step in s[3]:
00149                         
00150                         if 'INPUT' in step or (not isinstance(s[2][index],str)):
00151                             nextHasDSInput=s[2][index]
00152 
00153                         else:
00154 
00155                             if (index==0):
00156                                 #first step and not input -> gen part
00157                                 chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultScratch))
00158                                 try:
00159                                     chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
00160                                 except:
00161                                     print "Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created"
00162                                     return -15
00163 
00164                                 chainDict['nowmTasklist'][-1]['PrimaryDataset']='RelVal'+s[1].split('+')[0]
00165                                 if not '--relval' in s[2][index]:
00166                                     print 'Impossible to create task from scratch without splitting information with --relval'
00167                                     return -12
00168                                 else:
00169                                     arg=s[2][index].split()
00170                                     ns=map(int,arg[arg.index('--relval')+1].split(','))
00171                                     chainDict['nowmTasklist'][-1]['RequestNumEvents'] = ns[0]
00172                                     chainDict['nowmTasklist'][-1]['SplittingArguments']['events_per_job'] = ns[1]
00173                                 if 'FASTSIM' in s[2][index]:
00174                                     thisLabel+='_FastSim'
00175 
00176                             elif nextHasDSInput:
00177                                 chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultInput))
00178                                 try:
00179                                     chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
00180                                 except:
00181                                     print "Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created"
00182                                     return -15
00183                                 chainDict['nowmTasklist'][-1]['InputDataset']=nextHasDSInput.dataSet
00184                                 splitForThisWf=nextHasDSInput.split
00185                                 chainDict['nowmTasklist'][-1]['SplittingArguments']['lumis_per_job']=splitForThisWf
00186                                 if step in wmsplit:
00187                                     chainDict['nowmTasklist'][-1]['SplittingArguments']['lumis_per_job']=wmsplit[step]
00188                                 # get the run numbers or #events
00189                                 if len(nextHasDSInput.run):
00190                                     chainDict['nowmTasklist'][-1]['RunWhitelist']=nextHasDSInput.run
00191                                 #print "what is s",s[2][index]
00192                                 if '--data' in s[2][index] and nextHasDSInput.label:
00193                                     thisLabel+='_RelVal_%s'%nextHasDSInput.label
00194                                 if 'filter' in chainDict['nowmTasklist'][-1]['nowmIO']:
00195                                     print "This has an input DS and a filter sequence: very likely to be the PyQuen sample"
00196                                     processStrPrefix='PU_'
00197                                     chainDict['nowmTasklist'][-1]['PrimaryDataset']='RelVal'+s[1].split('+')[0]
00198                                 nextHasDSInput=None
00199                             else:
00200                                 #not first step and no inputDS
00201                                 chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultTask))
00202                                 try:
00203                                     chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
00204                                 except:
00205                                     print "Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created"
00206                                     return -15
00207                                 if splitForThisWf:
00208                                     chainDict['nowmTasklist'][-1]['SplittingArguments']['lumis_per_job']=splitForThisWf
00209                                 if step in wmsplit:
00210                                     chainDict['nowmTasklist'][-1]['SplittingArguments']['lumis_per_job']=wmsplit[step]
00211 
00212                             #print step
00213                             chainDict['nowmTasklist'][-1]['TaskName']=step
00214                             chainDict['nowmTasklist'][-1]['ConfigCacheID']='%s/%s.py'%(dir,step)
00215                             chainDict['nowmTasklist'][-1]['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT'] # copy to the proper parameter name
00216                             chainDict['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT'] #set in general to the last one of the chain
00217                             if 'pileup' in chainDict['nowmTasklist'][-1]['nowmIO']:
00218                                 chainDict['nowmTasklist'][-1]['MCPileup']=chainDict['nowmTasklist'][-1]['nowmIO']['pileup']
00219                             if '--pileup' in s[2][index]:
00220                                 processStrPrefix='PU_'
00221                             if acqEra:
00222                                 #chainDict['AcquisitionEra'][step]=(chainDict['CMSSWVersion']+'-PU_'+chainDict['nowmTasklist'][-1]['GlobalTag']).replace('::All','')+thisLabel
00223                                 chainDict['AcquisitionEra'][step]=chainDict['CMSSWVersion']
00224                                 chainDict['ProcessingString'][step]=processStrPrefix+chainDict['nowmTasklist'][-1]['GlobalTag'].replace('::All','')+thisLabel
00225                             else:
00226                                 #chainDict['nowmTasklist'][-1]['AcquisitionEra']=(chainDict['CMSSWVersion']+'-PU_'+chainDict['nowmTasklist'][-1]['GlobalTag']).replace('::All','')+thisLabel
00227                                 chainDict['nowmTasklist'][-1]['AcquisitionEra']=chainDict['CMSSWVersion']
00228                                 chainDict['nowmTasklist'][-1]['ProcessingString']=processStrPrefix+chainDict['nowmTasklist'][-1]['GlobalTag'].replace('::All','')+thisLabel
00229 
00230                         index+=1
00231                         
00232             #wrap up for this one
00233             import pprint
00234             #print 'wrapping up'
00235             #pprint.pprint(chainDict)
00236             #loop on the task list
00237             for i_second in reversed(range(len(chainDict['nowmTasklist']))):
00238                 t_second=chainDict['nowmTasklist'][i_second]
00239                 #print "t_second taskname", t_second['TaskName']
00240                 if 'primary' in t_second['nowmIO']:
00241                     #print t_second['nowmIO']['primary']
00242                     primary=t_second['nowmIO']['primary'][0].replace('file:','')
00243                     for i_input in reversed(range(0,i_second)):
00244                         t_input=chainDict['nowmTasklist'][i_input]
00245                         for (om,o) in t_input['nowmIO'].items():
00246                             if primary in o:
00247                                 #print "found",primary,"procuced by",om,"of",t_input['TaskName']
00248                                 t_second['InputTask'] = t_input['TaskName']
00249                                 t_second['InputFromOutputModule'] = om
00250                                 #print 't_second',pprint.pformat(t_second)
00251                                 if t_second['TaskName'].startswith('HARVEST'):
00252                                     chainDict.update(copy.deepcopy(self.defaultHarvest))
00253                                     chainDict['DQMConfigCacheID']=t_second['ConfigCacheID']
00254                                     ## the info are not in the task specific dict but in the general dict
00255                                     #t_input.update(copy.deepcopy(self.defaultHarvest))
00256                                     #t_input['DQMConfigCacheID']=t_second['ConfigCacheID']
00257                                 break
00258 
00259             ## there is in fact only one acquisition era
00260             #if len(set(chainDict['AcquisitionEra'].values()))==1:
00261             #    print "setting only one acq"
00262             if acqEra:
00263                 chainDict['AcquisitionEra'] = chainDict['AcquisitionEra'].values()[0]
00264                 
00265             ## clean things up now
00266             itask=0
00267             if self.keep:
00268                 for i in self.keep:
00269                     if type(i)==int and i < len(chainDict['nowmTasklist']):
00270                         chainDict['nowmTasklist'][i]['KeepOutput']=True
00271             for (i,t) in enumerate(chainDict['nowmTasklist']):
00272                 if t['TaskName'].startswith('HARVEST'):
00273                     continue
00274                 if not self.keep:
00275                     t['KeepOutput']=True
00276                 elif t['TaskName'] in self.keep:
00277                     t['KeepOutput']=True
00278                 t.pop('nowmIO')
00279                 itask+=1
00280                 chainDict['Task%d'%(itask)]=t
00281 
00282 
00283             ## 
00284 
00285 
00286             ## provide the number of tasks
00287             chainDict['TaskChain']=itask#len(chainDict['nowmTasklist'])
00288             
00289             chainDict.pop('nowmTasklist')
00290             self.chainDicts[n]=chainDict
00291 
00292             
00293         return 0
00294 
00295     def uploadConf(self,filePath,label,where):
00296         labelInCouch=self.label+'_'+label
00297         cacheName=filePath.split('/')[-1]
00298         if self.testMode:
00299             self.count+=1
00300             print '\tFake upload of',filePath,'to couch with label',labelInCouch
00301             return self.count
00302         else:
00303             try:
00304                 from modules.wma import upload_to_couch
00305             except:
00306                 print '\n\tUnable to find wmcontrol modules. Please include it in your python path\n'
00307                 print '\n\t QUIT\n'
00308                 sys.exit(-16)
00309             if cacheName in self.couchCache:
00310                 print "Not re-uploading",filePath,"to",where,"for",label
00311                 cacheId=self.couchCache[cacheName]
00312             else:
00313                 print "Loading",filePath,"to",where,"for",label
00314                 cacheId=upload_to_couch(filePath,
00315                                         labelInCouch,
00316                                         self.user,
00317                                         self.group,
00318                                         test_mode=False,
00319                                         url=where
00320                                         )
00321                 self.couchCache[cacheName]=cacheId
00322             return cacheId
00323     
00324     def upload(self):
00325         for (n,d) in self.chainDicts.items():
00326             for it in d:
00327                 if it.startswith("Task") and it!='TaskChain':
00328                     #upload
00329                     couchID=self.uploadConf(d[it]['ConfigCacheID'],
00330                                             str(n)+d[it]['TaskName'],
00331                                             d['CouchURL']
00332                                             )
00333                     print d[it]['ConfigCacheID']," uploaded to couchDB for",str(n),"with ID",couchID
00334                     d[it]['ConfigCacheID']=couchID
00335                 if it =='DQMConfigCacheID':
00336                     couchID=self.uploadConf(d['DQMConfigCacheID'],
00337                                             str(n)+'harvesting',
00338                                             d['CouchURL']
00339                                             )
00340                     print d['DQMConfigCacheID'],"uploaded to couchDB for",str(n),"with ID",couchID
00341                     d['DQMConfigCacheID']=couchID
00342                         
00343             
00344     def submit(self):
00345         try:
00346             from modules.wma import makeRequest,approveRequest
00347             from wmcontrol import random_sleep
00348             print '\n\tFound wmcontrol\n'
00349         except:
00350             print '\n\tUnable to find wmcontrol modules. Please include it in your python path\n'
00351             if not self.testMode:
00352                 print '\n\t QUIT\n'
00353                 sys.exit(-17)
00354 
00355         import pprint
00356         for (n,d) in self.chainDicts.items():
00357             if self.testMode:
00358                 print "Only viewing request",n
00359                 print pprint.pprint(d)
00360             else:
00361                 #submit to wmagent each dict
00362                 print "For eyes before submitting",n
00363                 print pprint.pprint(d)
00364                 print "Submitting",n,"..........."
00365                 workFlow=makeRequest(self.wmagent,d,encodeDict=True)
00366                 approveRequest(self.wmagent,workFlow)
00367                 print "...........",n,"submitted"
00368                 random_sleep()
00369             
00370 
00371