CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_5_3_4/src/CalibMuon/DTCalibration/python/Workflow/crabWrap.py

Go to the documentation of this file.
00001 import os,time,sys
00002 from crab import Crab,common,parseOptions,CrabException
00003 from crabStatusFromReport import crabStatusFromReport
00004 
00005 def computeSummaryCRAB260(up_task):
00006     """
00007     Computes jobs summary for given task
00008     """
00009     taskId = str(up_task['name'])
00010     task_unique_name = str(up_task['name'])
00011     ended = None
00012 
00013     summary = {}
00014     nJobs = 0
00015     for job in up_task.jobs :
00016         id = str(job.runningJob['jobId'])
00017         jobStatus =  str(job.runningJob['statusScheduler'])
00018         jobState =  str(job.runningJob['state'])
00019         dest = str(job.runningJob['destination']).split(':')[0]
00020         exe_exit_code = str(job.runningJob['applicationReturnCode'])
00021         job_exit_code = str(job.runningJob['wrapperReturnCode'])
00022         ended = str(job['closed'])  
00023         printline=''
00024         if dest == 'None' :  dest = ''
00025         if exe_exit_code == 'None' :  exe_exit_code = ''
00026         if job_exit_code == 'None' :  job_exit_code = ''
00027         if job.runningJob['state'] == 'SubRequested' : jobStatus = 'Submitting'
00028         if job.runningJob['state'] == 'Terminated': jobStatus = 'Done'
00029         
00030         if summary.has_key(jobStatus): summary[jobStatus] += 1
00031         else: summary[jobStatus] = 1 
00032         nJobs += 1
00033 
00034     for item in summary: summary[item] = 100.*summary[item]/nJobs 
00035 
00036     return summary
00037 
00038 def computeSummaryCRAB251(up_task):
00039     "Computes jobs summary for given task" 
00040  
00041     taskId = str(up_task['name'])
00042     task_unique_name = str(up_task['name'])
00043     ended = None
00044 
00045     summary = {}
00046     nJobs = 0
00047     for job in up_task.jobs :
00048         id = str(job.runningJob['jobId'])
00049         jobStatus =  str(job.runningJob['statusScheduler'])
00050         dest = str(job.runningJob['destination']).split(':')[0]
00051         exe_exit_code = str(job.runningJob['applicationReturnCode'])
00052         job_exit_code = str(job.runningJob['wrapperReturnCode'])
00053         ended = str(job['standardInput'])  
00054         printline=''
00055         if dest == 'None' :  dest = ''
00056         if exe_exit_code == 'None' :  exe_exit_code = ''
00057         if job_exit_code == 'None' :  job_exit_code = ''
00058         #printline+="%-6s %-18s %-36s %-13s %-16s %-4s" % (id,jobStatus,dest,exe_exit_code,job_exit_code,ended)
00059         #print printline
00060         if summary.has_key(jobStatus): summary[jobStatus] += 1
00061         else: summary[jobStatus] = 1 
00062         nJobs += 1
00063 
00064     for item in summary: summary[item] = 100.*summary[item]/nJobs 
00065 
00066     return summary
00067 
00068 computeSummary = computeSummaryCRAB260
00069 
00070 def summaryStandAlone(self):
00071     """
00072     Returns jobs summary
00073     """
00074     task = common._db.getTask()
00075     upTask = common.scheduler.queryEverything(task['id'])
00076     return computeSummary(upTask)
00077 
00078 def summaryServer(self):
00079     """
00080     Returns jobs summary
00081     """
00082     #self.resynchClientSide()
00083         
00084     upTask = common._db.getTask()  
00085     return computeSummary(upTask)
00086 
00087 """
00088 # Add method to Status classes
00089 import Status
00090 import StatusServer
00091 Status.Status.summary = summaryStandAlone
00092 StatusServer.StatusServer.summary = summaryServer
00093 """
00094 
00095 def crabAction(options, action = None):
00096 
00097     options = parseOptions(options)
00098 
00099     crab = Crab()
00100     result = None
00101     try:
00102         crab.initialize_(options)
00103         crab.run()
00104         if action: result = action(crab)
00105         del crab
00106         print 'Log file is %s%s.log'%(common.work_space.logDir(),common.prog_name) 
00107     except CrabException, e:
00108         del crab
00109         #print '\n' + common.prog_name + ': ' + str(e) + '\n' 
00110         raise
00111         
00112     if (common.logger): common.logger.delete()
00113 
00114     if result: return result
00115 
00116 def crabActionCRAB251(options, action = None):
00117 
00118     options = parseOptions(options)
00119 
00120     result = None
00121     try:
00122         crab = Crab(options)
00123         crab.run()
00124         common.apmon.free()
00125         if action: result = action(crab)
00126         del crab
00127         #print 'Log file is %s%s.log'%(common.work_space.logDir(),common.prog_name)  
00128         #print '\n##############################  E N D  ####################################\n'
00129     except CrabException, e:
00130         print '\n' + common.prog_name + ': ' + str(e) + '\n'
00131         pass
00132     pass
00133     #if (common.logger): common.logger.delete()
00134 
00135     if result: return result
00136 
00137 def crabCreate(dir = '.', crabCfg_name = 'crab.cfg'):
00138 
00139     cwd = os.getcwd()
00140     os.chdir(dir)
00141 
00142     options = ['-create','-cfg',crabCfg_name]
00143 
00144     project = crabAction(options,lambda crab: common.work_space.topDir())
00145 
00146     os.chdir(cwd)
00147 
00148     return project
00149 
00150 def crabSubmit(project):
00151     options = ['-submit','-c',project]
00152 
00153     crabAction(options)
00154 
00155     return
00156 
00157 def crabStatus(project):
00158     options = ['-status']
00159     if project:
00160         options.append('-c')
00161         options.append(project)
00162 
00163     def action(crab):
00164         #act = '-status'
00165         #return crab.actions[act].summary()
00166         xml = crab.cfg_params.get("USER.xml_report",'')
00167         return common.work_space.shareDir() + xml
00168         
00169     xmlreport = crabAction(options,action)
00170     status = crabStatusFromReport(xmlreport)
00171  
00172     return status
00173 
00174 def convertStatus(status):
00175     """
00176     doneStatus = ['Done','Done (success)','Cleared','Retrieved']
00177     failedStatus = ['Aborted','Done (failed)','Killed','Cancelled']
00178     ignoreStatus = ['Created']
00179     """
00180     doneStatus = ['SD','E']
00181     failedStatus = ['A','DA','K']
00182     runningStatus = ['R']
00183     ignoreStatus = ['C']
00184     sumDone = 0.0
00185     sumFailed = 0.0
00186     sumRunning = 0.0
00187     sumIgnore = 0.0
00188     for key in status:
00189         if key in doneStatus: sumDone += status[key]
00190         if key in failedStatus: sumFailed += status[key]
00191         if key in runningStatus: sumRunning += status[key]
00192         if key in ignoreStatus: sumIgnore += status[key]
00193 
00194     # frac(done)' = N*frac(done)/(N - N*frac(ignore)) = frac(done)/(1 - frac(ignore))
00195     fracDone = 100.0*sumDone/(100.0 - sumIgnore)
00196     fracFailed = 100.0*sumFailed/(100.0 - sumIgnore)
00197     fracRun = 100.0*sumRunning/(100.0 - sumIgnore)
00198 
00199     result = {'Finished':fracDone,
00200               'Failed':fracFailed,
00201               'Running':fracRun}
00202 
00203     return result 
00204 
00205 def checkStatus(project, threshold = 95.0):
00206 
00207     status = crabStatus(project)
00208  
00209     print "Percentage of jobs per status:"
00210     maxLength = max( [len(x) for x in status] )
00211     for item in status:
00212         print "%*s: %.0f%%" % (maxLength,item,status[item])
00213 
00214 
00215     statusNew = convertStatus(status)
00216        
00217     print "Relative percentage finished: %.0f%%" % statusNew['Finished']
00218     print "Relative percentage failed  : %.0f%%" % statusNew['Failed']
00219     print "Relative percentage running : %.0f%%" % statusNew['Running']
00220 
00221     finished = False
00222     # Condition for stopping
00223     #if fracFailed > 50.0: raise RuntimeError,'Too many jobs have failed (%.0f%%).' % fracFailed
00224 
00225     # Condition for considering it finished
00226     if statusNew['Finished'] >= threshold: finished = True 
00227 
00228     return finished
00229 
00230 def getOutput(project):
00231     options = ['-getoutput']
00232     if project:
00233         options.append('-c')
00234         options.append(project)
00235 
00236     crabAction(options)
00237 
00238     return
00239 
00240 def crabWatch(action,project = None, threshold = 95.0):
00241     #for i in range(5):
00242     while True:
00243         if checkStatus(project,threshold): break
00244         time.sleep(180)
00245  
00246     print "Finished..."
00247 
00248     action(project)
00249   
00250     return
00251 
00252 def initCrabEnvironment():
00253     pythonpathenv = os.environ['PYTHONPATH']
00254     pythonpathbegin = pythonpathenv.split(':')[0].rstrip('/')
00255     pythonpathend = pythonpathenv.split(':')[-1].rstrip('/')
00256 
00257     indexBegin = sys.path.index(pythonpathbegin)
00258     if os.environ.has_key('CRABPSETPYTHON'): sys.path.insert( indexBegin, os.environ['CRABPSETPYTHON'] )
00259     if os.environ.has_key('CRABDLSAPIPYTHON'): sys.path.insert( indexBegin, os.environ['CRABDLSAPIPYTHON'] )
00260     if os.environ.has_key('CRABDBSAPIPYTHON'): sys.path.insert( indexBegin, os.environ['CRABDBSAPIPYTHON'] )
00261 
00262     if os.environ['SCRAM_ARCH'].find('32') != -1 and os.environ.has_key('CRABPYSQLITE'):
00263         sys.path.insert( indexBegin, os.environ['CRABPYSQLITE'] )
00264     elif os.environ['SCRAM_ARCH'].find('64') != -1 and os.environ.has_key('CRABPYSQLITE64'):
00265         sys.path.insert( indexBegin, os.environ['CRABPYSQLITE64'] )
00266 
00267     indexEnd = sys.path.index(pythonpathend) + 1
00268     if os.environ.has_key('CRABPYTHON'):
00269         if indexEnd >= len(sys.path): sys.path.append( os.environ['CRABPYTHON'] )
00270         else: sys.path.insert( indexEnd, os.environ['CRABPYTHON'] )
00271 
00272     #print sys.path
00273 
00274     os.environ['LD_LIBRARY_PATH'] = os.environ['GLITE_LOCATION'] + '/lib' + ':' + os.environ['LD_LIBRARY_PATH']
00275     os.environ['VOMS_PROXY_INFO_DONT_VERIFY_AC'] = '1'
00276     #print os.environ['LD_LIBRARY_PATH']
00277     #print os.environ['VOMS_PROXY_INFO_DONT_VERIFY_AC'] 
00278     
00279     """ 
00280     export LD_LIBRARY_PATH=${GLITE_LOCATION}/lib:${LD_LIBRARY_PATH}
00281     export VOMS_PROXY_INFO_DONT_VERIFY_AC=1
00282     """
00283    
00284     ## Get rid of some useless warning
00285     try:
00286         import warnings
00287         warnings.simplefilter("ignore", RuntimeWarning)
00288         # import socket
00289         # socket.setdefaulttimeout(15) # Default timeout in seconds
00290     except ImportError:
00291         pass # too bad, you'll get the warning
00292 
00293     # Remove libraries which over-ride CRAB libs and DBS_CONFIG setting
00294     badPaths = []
00295     if os.environ.has_key('DBSCMD_HOME'): # CMSSW's DBS, remove last bit of path
00296         badPaths.append('/'.join(os.environ['DBSCMD_HOME'].split('/')[:-1]))
00297     if os.environ.has_key('DBS_CLIENT_CONFIG'):
00298         del os.environ['DBS_CLIENT_CONFIG']
00299 
00300     def pathIsGood(checkPath):
00301         """
00302         Filter function for badPaths
00303         """
00304         for badPath in badPaths:
00305             if checkPath.find(badPath) != -1:
00306                 return False
00307         return True
00308 
00309     sys.path = filter(pathIsGood, sys.path)
00310 
00311 def run(project = None, threshold = 95.0):
00312 
00313     crabWatch(getOutput,project,threshold)
00314     
00315     return
00316 
00317 if __name__ == '__main__':
00318     project = None
00319     threshold = 95.0
00320     for opt in sys.argv:
00321         if opt[:8] == 'project=':
00322             project = opt[8:]
00323             print "Running on CRAB project",project
00324         if opt[:10] == 'threshold=':
00325             threshold = float(opt[10:])
00326             print "Using threshold",threshold 
00327     
00328     run(project,threshold)