CMS 3D CMS Logo

Functions | Variables

BeamSpotWorkflow Namespace Reference

Functions

def aselectFilesToProcess
def compareLumiLists
def getLastClosedRun
def getLastUploadedIOV
 General functions.
def getListOfFilesToProcess
def getListOfRunsAndLumiFromDBS
def getListOfRunsAndLumiFromRR
def getNewRunList
def getNumberOfFilesToProcessForRun
def getRunNumberFromDBSName
def getRunNumberFromFileName
def main
def removeUncompleteRuns
def selectFilesToProcess

Variables

string error = "Please set a crab environment in order to get the proper JSON lib"

Function Documentation

def BeamSpotWorkflow::aselectFilesToProcess (   listOfFilesToProcess,
  newRunList 
)

Definition at line 459 of file BeamSpotWorkflow.py.

00460                                                           :
00461     selectedFiles = []
00462     runsToProcess = {}
00463     processedRuns = {}
00464     for file in listOfFilesToProcess:
00465         run = getRunNumberFromDBSName(file)
00466 #        print "To process: " + str(run) 
00467         if run not in runsToProcess:
00468             runsToProcess[run] = 1
00469         else:
00470             runsToProcess[run] = runsToProcess[run] + 1 
00471 
00472     for file in newRunList:
00473         run = getRunNumberFromFileName(file)
00474 #        print "Processed: " + str(run)
00475         if run not in processedRuns:
00476             processedRuns[run] = 1
00477         else:
00478             processedRuns[run] = processedRuns[run] + 1 
00479 
00480     #WARNING: getLastClosedRun MUST also have a timeout otherwise the last run will not be considered
00481     lastClosedRun = getLastClosedRun(listOfFilesToProcess)
00482 #    print "LastClosedRun:-" + str(lastClosedRun) + "-"
00483 
00484     processedRunsKeys = processedRuns.keys()
00485     processedRunsKeys.sort()
00486 
00487     for run in processedRunsKeys:
00488         if run <= lastClosedRun :
00489             print "For run " + str(run) + " I have processed " + str(processedRuns[run]) + " files and in DBS there are " + str(runsToProcess[run]) + " files!"
00490             if not run in runsToProcess:
00491                 exit("ERROR: I have a result file for run " + str(run) + " but it doesn't exist in DBS. Impossible but it happened!")
00492             lumiList = getDBSLumiListForRun(run)
00493             if processedRuns[run] == runsToProcess[run]:
00494                 for file in newRunList:
00495                     if run == getRunNumberFromFileName(file):
00496                         selectedFiles.append(file)
00497             else:
00498                 exit("ERROR: For run " + str(run) + " I have processed " + str(processedRuns[run]) + " files but in DBS there are " + str(runsToProcess[run]) + " files!")
00499     return selectedFiles            

def BeamSpotWorkflow::compareLumiLists (   listA,
  listB,
  errors = [],
  tolerance = 0 
)

Definition at line 404 of file BeamSpotWorkflow.py.

00405                                                        :
00406     lenA = len(listA)
00407     lenB = len(listB)
00408     if lenA < lenB-(lenB*float(tolerance)/100):
00409         errors.append("ERROR: The number of lumi sections is different: listA(" + str(lenA) + ")!=(" + str(lenB) + ")listB")
00410     #else:
00411         #errors.append("Lumi check ok!listA(" + str(lenA) + ")-(" + str(lenB) + ")listB")
00412     #print errors
00413     listA.sort()
00414     listB.sort()
00415     #shorter = lenA
00416     #if lenB < shorter:
00417     #    shorter = lenB
00418     #a = 0
00419     #b = 0
00420     badA = []
00421     badB = []
00422     #print listB
00423     #print listA
00424     #print len(listA)
00425     #print len(listB)
00426     #counter = 1
00427     for lumi in listA:
00428         #print str(counter) + "->" + str(lumi)
00429         #counter += 1
00430         if not lumi in listB:
00431             errors.append("Lumi (" + str(lumi) + ") is in listA but not in listB")
00432             badB.append(lumi)
00433             #print "Bad B: " + str(lumi)
00434     #exit("hola")
00435     for lumi in listB:
00436         if not lumi in listA:
00437             errors.append("Lumi (" + str(lumi) + ") is in listB but not in listA")
00438             badA.append(lumi)
00439             #print "Bad A: " + str(lumi)
00440             
00441     return badA,badB

def BeamSpotWorkflow::getLastClosedRun (   DBSListOfFiles)

Definition at line 186 of file BeamSpotWorkflow.py.

00187                                     :
00188     runs = []
00189     for file in DBSListOfFiles:
00190         runNumber = getRunNumberFromDBSName(file)
00191         if runs.count(runNumber) == 0: 
00192             runs.append(runNumber)
00193 
00194     if len(runs) <= 1: #No closed run
00195         return -1
00196     else:
00197         runs.sort()
00198         return long(runs[len(runs)-2])
    
def BeamSpotWorkflow::getLastUploadedIOV (   tagName,
  destDB = "oracle://cms_orcoff_prod/CMS_COND_31X_BEAMSPOT" 
)

General functions.

Definition at line 56 of file BeamSpotWorkflow.py.

00056                                              ://cms_orcoff_prod/CMS_COND_31X_BEAMSPOT"):
00057     #return 582088327592295
00058     listIOVCommand = "cmscond_list_iov -c " + destDB + " -P /afs/cern.ch/cms/DB/conddb -t " + tagName 
00059     dbError = commands.getstatusoutput( listIOVCommand )
00060     if dbError[0] != 0 :
00061         if dbError[1].find("metadata entry \"" + tagName + "\" does not exist") != -1:
00062             print "Creating a new tag because I got the following error contacting the DB"
00063             print dbError[1]
00064             return 1
00065             #return 133928
00066         else:
00067             exit("ERROR: Can\'t connect to db because:\n" + dbError[1])
00068 
00069 
00070     aCommand = listIOVCommand + " | grep DB= | tail -1 | awk \'{print $1}\'"
00071     output = commands.getstatusoutput( aCommand )
00072     
00073     #WARNING when we pass to lumi IOV this should be long long
00074     if output[1] == '':
00075       exit("ERROR: The tag " + tagName + " exists but I can't get the value of the last IOV")
00076       
00077     return long(output[1])
00078 
def BeamSpotWorkflow::getListOfFilesToProcess (   dataSet,
  lastRun = -1 
)

Definition at line 80 of file BeamSpotWorkflow.py.

00081                                                :
00082     queryCommand = "dbs --search --query \"find file where dataset=" + dataSet
00083     if lastRun != -1:
00084         queryCommand = queryCommand + " and run > " + str(lastRun)
00085     queryCommand = queryCommand + "\" | grep .root"    
00086 #    print " >> " + queryCommand
00087     output = commands.getstatusoutput( queryCommand )
00088     return output[1].split('\n')

def BeamSpotWorkflow::getListOfRunsAndLumiFromDBS (   dataSet,
  lastRun = -1 
)

Definition at line 100 of file BeamSpotWorkflow.py.

00101                                                    :
00102     datasetList = dataSet.split(',')
00103     outputList = []
00104     for data in datasetList:
00105         queryCommand = "dbs --search --query \"find run,lumi where dataset=" + data
00106         if lastRun != -1:
00107             queryCommand = queryCommand + " and run > " + str(lastRun)
00108         queryCommand += "\""
00109         print " >> " + queryCommand
00110         output = []
00111         for i in range(0,3):
00112             output = commands.getstatusoutput( queryCommand )
00113             if output[0] == 0 and not (output[1].find("ERROR") != -1 or output[1].find("Error") != -1) :
00114                 break
00115         if output[0] != 0:
00116             exit("ERROR: I can't contact DBS for the following reason:\n" + output[1])
00117         #print output[1]
00118         tmpList = output[1].split('\n')
00119         for file in tmpList:
00120             outputList.append(file)
00121     runsAndLumis = {}
00122     for out in outputList:
00123         regExp = re.search('(\d+)\s+(\d+)',out)
00124         if regExp:
00125             run  = long(regExp.group(1))
00126             lumi = long(regExp.group(2))
00127             if not run in runsAndLumis:
00128                 runsAndLumis[run] = []
00129             runsAndLumis[run].append(lumi)
00130 
00131 #    print runsAndLumis
00132 #    exit("ok")
00133     return runsAndLumis

def BeamSpotWorkflow::getListOfRunsAndLumiFromRR (   lastRun = -1)

Definition at line 135 of file BeamSpotWorkflow.py.

00136                                           :
00137     RunReg  ="https://pccmsdqm04.cern.ch/runregistry"
00138     #RunReg  = "https://localhost:40010/runregistry"
00139     #Dataset=%Online%
00140     Group   = "Collisions10"
00141 
00142     # get handler to RR XML-RPC server
00143     FULLADDRESS=RunReg + "/xmlrpc"
00144     #print "RunRegistry from: ",FULLADDRESS
00145     server = xmlrpclib.ServerProxy(FULLADDRESS)
00146     #sel_runtable="{groupName} ='" + Group + "' and {runNumber} > " + str(lastRun) + " and {datasetName} LIKE '" + Dataset + "'"
00147     sel_runtable="{groupName} ='" + Group + "' and {runNumber} > " + str(lastRun) 
00148     sel_dcstable="{groupName} ='" + Group + "' and {runNumber} > " + str(lastRun) + " and {parDcsBpix} = 1 and {parDcsFpix} = 1 and {parDcsTibtid} = 1 and {parDcsTecM} = 1 and {parDcsTecP} = 1 and {parDcsTob} = 1 and {parDcsEbminus} = 1 and {parDcsEbplus} = 1 and {parDcsEeMinus} = 1 and {parDcsEePlus} = 1 and {parDcsEsMinus} = 1 and {parDcsEsPlus} = 1 and {parDcsHbheA} = 1 and {parDcsHbheB} = 1 and {parDcsHbheC} = 1 and {parDcsH0} = 1 and {parDcsHf} = 1"
00149 
00150     tries = 0;
00151     while tries<10:
00152         try:
00153             run_data = server.DataExporter.export('RUN'           , 'GLOBAL', 'csv_runs', sel_runtable)
00154             dcs_data = server.DataExporter.export('RUNLUMISECTION', 'GLOBAL', 'json'    , sel_dcstable)
00155             break
00156         except:
00157             print "Something wrong in accessing runregistry, retrying in 5s...."
00158             tries += 1
00159             time.sleep(5)
00160         if tries==10:
00161             error = "Run registry unaccessible.....exiting now"
00162             exit(error)
00163     
00164 
00165     listOfRuns=[]
00166     for line in run_data.split("\n"):
00167         run=line.split(',')[0]
00168         if run.isdigit():
00169             listOfRuns.append(run)
00170 
00171 
00172     selected_dcs={}
00173     jsonList=json.loads(dcs_data)
00174 
00175     #for element in jsonList:
00176     for element in listOfRuns:
00177         #if element in listOfRuns:
00178         if element in jsonList:
00179             selected_dcs[long(element)]=jsonList[element]
00180         else:
00181             print "WARNING: Run " + element + " is a collision10 run with 0 lumis in Run Registry!" 
00182             selected_dcs[long(element)]= [[]] 
00183     #print selected_dcs        
00184     return selected_dcs

def BeamSpotWorkflow::getNewRunList (   fromDir,
  lastUploadedIOV 
)

Definition at line 215 of file BeamSpotWorkflow.py.

00216                                           :
00217     newRunList = []
00218     listOfFiles = ls(fromDir,".txt")
00219     runFileMap = {}
00220     for fileName in listOfFiles:
00221         runNumber = getRunNumberFromFileName(fileName) 
00222         if runNumber > lastUploadedIOV:
00223             newRunList.append(fileName)
00224     return newRunList        

def BeamSpotWorkflow::getNumberOfFilesToProcessForRun (   dataSet,
  run 
)

Definition at line 90 of file BeamSpotWorkflow.py.

00091                                                 :
00092     queryCommand = "dbs --search --query \"find file where dataset=" + dataSet + " and run = " + str(run) + "\" | grep .root"
00093     #print " >> " + queryCommand
00094     output = commands.getstatusoutput( queryCommand )
00095     if output[0] != 0:
00096         return 0
00097     else:
00098         return len(output[1].split('\n'))

def BeamSpotWorkflow::getRunNumberFromDBSName (   fileName)

Definition at line 208 of file BeamSpotWorkflow.py.

00209                                      :
00210     regExp = re.search('(\D+)/(\d+)/(\d+)/(\d+)/(\D+)',fileName)
00211     if not regExp:
00212         return -1
00213     return long(regExp.group(3)+regExp.group(4))
    
def BeamSpotWorkflow::getRunNumberFromFileName (   fileName)

Definition at line 200 of file BeamSpotWorkflow.py.

00201                                       :
00202 #    regExp = re.search('(\D+)_(\d+)_(\d+)_(\d+)',fileName)
00203     regExp = re.search('(\D+)_(\d+)_(\d+)_',fileName)
00204     if not regExp:
00205         return -1
00206     return long(regExp.group(3))

def BeamSpotWorkflow::main ( )

Definition at line 501 of file BeamSpotWorkflow.py.

00502           :
00503     ######### COMMAND LINE OPTIONS ##############
00504     option,args = parse(__doc__)
00505 
00506     ######### Check if there is already a megascript running ########
00507     if option.lock:
00508         setLockName('.' + option.lock)
00509         if checkLock():
00510             print "There is already a megascript runnning...exiting"
00511             return
00512         else:
00513             lock()
00514             
00515 
00516     destDB = 'oracle://cms_orcon_prod/CMS_COND_31X_BEAMSPOT'
00517     if option.Test:
00518         destDB = 'oracle://cms_orcoff_prep/CMS_COND_BEAMSPOT'
00519 
00520     ######### CONFIGURATION FILE ################
00521     cfgFile = "BeamSpotWorkflow.cfg"    
00522     if option.cfg:
00523         cfgFile = option.cfg
00524     configurationFile = os.getenv("CMSSW_BASE") + "/src/RecoVertex/BeamSpotProducer/scripts/" + cfgFile
00525     configuration     = ConfigParser.ConfigParser()
00526     print 'Reading configuration from ', configurationFile
00527     configuration.read(configurationFile)
00528 
00529     sourceDir           = configuration.get('Common','SOURCE_DIR')
00530     archiveDir            = configuration.get('Common','ARCHIVE_DIR')
00531     workingDir            = configuration.get('Common','WORKING_DIR')
00532     databaseTag           = configuration.get('Common','DBTAG')
00533     dataSet               = configuration.get('Common','DATASET')
00534     fileIOVBase           = configuration.get('Common','FILE_IOV_BASE')
00535     dbIOVBase             = configuration.get('Common','DB_IOV_BASE')
00536     dbsTolerance          = float(configuration.get('Common','DBS_TOLERANCE'))
00537     dbsTolerancePercent   = float(configuration.get('Common','DBS_TOLERANCE_PERCENT'))
00538     rrTolerance           = float(configuration.get('Common','RR_TOLERANCE'))
00539     missingFilesTolerance = float(configuration.get('Common','MISSING_FILES_TOLERANCE'))
00540     missingLumisTimeout   = float(configuration.get('Common','MISSING_LUMIS_TIMEOUT'))
00541     mailList              = configuration.get('Common','EMAIL')
00542 
00543     ######### DIRECTORIES SETUP #################
00544     if sourceDir[len(sourceDir)-1] != '/':
00545         sourceDir = sourceDir + '/'
00546     if not dirExists(sourceDir):
00547         error = "ERROR: The source directory " + sourceDir + " doesn't exist!"
00548         sendEmail(mailList,error)
00549         exit(error)
00550 
00551     if archiveDir[len(archiveDir)-1] != '/':
00552         archiveDir = archiveDir + '/'
00553     if not os.path.isdir(archiveDir):
00554         os.mkdir(archiveDir)
00555 
00556     if workingDir[len(workingDir)-1] != '/':
00557         workingDir = workingDir + '/'
00558     if not os.path.isdir(workingDir):
00559         os.mkdir(workingDir)
00560     else:
00561         os.system("rm -f "+ workingDir + "*") 
00562 
00563 
00564     print "Getting last IOV for tag: " + databaseTag
00565     lastUploadedIOV = 1
00566     if destDB == "oracle://cms_orcon_prod/CMS_COND_31X_BEAMSPOT": 
00567         lastUploadedIOV = getLastUploadedIOV(databaseTag)
00568     else:
00569         lastUploadedIOV = getLastUploadedIOV(databaseTag,destDB)
00570         
00571     #lastUploadedIOV = 133885
00572     #lastUploadedIOV = 575216380019329
00573     if dbIOVBase == "lumiid":
00574         lastUploadedIOV = unpackLumiid(lastUploadedIOV)["run"]
00575 
00576     ######### Get list of files processed after the last IOV  
00577     print "Getting list of files processed after IOV " + str(lastUploadedIOV)
00578     newProcessedRunList      = getNewRunList(sourceDir,lastUploadedIOV)
00579     if len(newProcessedRunList) == 0:
00580         exit("There are no new runs after " + str(lastUploadedIOV))
00581 
00582     ######### Copy files to archive directory
00583     print "Copying files to archive directory"
00584     copiedFiles = []
00585     for i in range(3):
00586         copiedFiles = cp(sourceDir,archiveDir,newProcessedRunList)    
00587         if len(copiedFiles) == len(newProcessedRunList):
00588             break;
00589     if len(copiedFiles) != len(newProcessedRunList):
00590         error = "ERROR: I can't copy more than " + str(len(copiedFiles)) + " files out of " + str(len(newProcessedRunList)) 
00591         sendEmail(mailList,error)
00592         exit(error)
00593 
00594 
00595     ######### Get from DBS the list of files after last IOV    
00596     #listOfFilesToProcess = getListOfFilesToProcess(dataSet,lastUploadedIOV) 
00597     print "Getting list of files from DBS"
00598     listOfRunsAndLumiFromDBS = getListOfRunsAndLumiFromDBS(dataSet,lastUploadedIOV)
00599     if len(listOfRunsAndLumiFromDBS) == 0:
00600        exit("There are no files in DBS to process") 
00601     print "Getting list of files from RR"
00602     listOfRunsAndLumiFromRR  = getListOfRunsAndLumiFromRR(lastUploadedIOV) 
00603     ######### Get list of files to process for DB
00604     #selectedFilesToProcess = selectFilesToProcess(listOfFilesToProcess,copiedFiles)
00605     #completeProcessedRuns = removeUncompleteRuns(copiedFiles,dataSet)
00606     #print copiedFiles
00607     #print completeProcessedRuns
00608     #exit("complete")
00609     print "Getting list of files to process"
00610     selectedFilesToProcess = selectFilesToProcess(listOfRunsAndLumiFromDBS,listOfRunsAndLumiFromRR,copiedFiles,archiveDir,dataSet,mailList,dbsTolerance,dbsTolerancePercent,rrTolerance,missingFilesTolerance,missingLumisTimeout)
00611     if len(selectedFilesToProcess) == 0:
00612        exit("There are no files to process")
00613         
00614     #print selectedFilesToProcess
00615     ######### Copy files to working directory
00616     print "Copying files from archive to working directory"
00617     copiedFiles = []
00618     for i in range(3):
00619         copiedFiles = cp(archiveDir,workingDir,selectedFilesToProcess)    
00620         if len(copiedFiles) == len(selectedFilesToProcess):
00621             break;
00622         else:
00623             commands.getstatusoutput("rm -rf " + workingDir)
00624     if len(copiedFiles) != len(selectedFilesToProcess):
00625         error = "ERROR: I can't copy more than " + str(len(copiedFiles)) + " files out of " + str(len(selectedFilesToProcess)) + " from " + archiveDir + " to " + workingDir 
00626         sendEmail(mailList,error)
00627         exit(error)
00628 
00629     print "Sorting and cleaning beamlist"
00630     beamSpotObjList = []
00631     for fileName in copiedFiles:
00632         readBeamSpotFile(workingDir+fileName,beamSpotObjList,fileIOVBase)
00633 
00634     sortAndCleanBeamList(beamSpotObjList,fileIOVBase)
00635 
00636     if len(beamSpotObjList) == 0:
00637         error = "WARNING: None of the processed and copied payloads has a valid fit so there are no results. This shouldn't happen since we are filtering using the run register, so there should be at least one good run."
00638         exit(error)
00639 
00640     payloadFileName = "PayloadFile.txt"
00641 
00642     runBased = False
00643     if dbIOVBase == "runnumber":
00644         runBased = True
00645         
00646     payloadList = createWeightedPayloads(workingDir+payloadFileName,beamSpotObjList,runBased)
00647     if len(payloadList) == 0:
00648         error = "WARNING: I wasn't able to create any payload even if I have some BeamSpot objects."
00649         exit(error)
00650        
00651 
00652     tmpPayloadFileName = workingDir + "SingleTmpPayloadFile.txt"
00653     tmpSqliteFileName  = workingDir + "SingleTmpSqliteFile.db"
00654 
00655     writeDBTemplate = os.getenv("CMSSW_BASE") + "/src/RecoVertex/BeamSpotProducer/test/write2DB_template.py"
00656     readDBTemplate  = os.getenv("CMSSW_BASE") + "/src/RecoVertex/BeamSpotProducer/test/readDB_template.py"
00657     payloadNumber = -1
00658     iovSinceFirst = '0';
00659     iovTillLast   = '0';
00660 
00661     #Creating the final name for the combined sqlite file
00662     uuid = commands.getstatusoutput('uuidgen -t')[1]
00663     final_sqlite_file_name = databaseTag + '@' + uuid
00664     sqlite_file     = workingDir + final_sqlite_file_name + ".db"
00665     metadata_file   = workingDir + final_sqlite_file_name + ".txt"
00666 
00667     for payload in payloadList:
00668         payloadNumber += 1
00669         if option.zlarge:
00670             payload.sigmaZ = 10
00671             payload.sigmaZerr = 2.5e-05
00672         tmpFile = file(tmpPayloadFileName,'w')
00673         dumpValues(payload,tmpFile)
00674         tmpFile.close()
00675         if not writeSqliteFile(tmpSqliteFileName,databaseTag,dbIOVBase,tmpPayloadFileName,writeDBTemplate,workingDir):
00676             error = "An error occurred while writing the sqlite file: " + tmpSqliteFileName
00677             exit(error)
00678         readSqliteFile(tmpSqliteFileName,databaseTag,readDBTemplate,workingDir)
00679         
00680         ##############################################################
00681         #WARNING I am not sure if I am packing the right values
00682         if dbIOVBase == "runnumber":
00683             iov_since = str(payload.Run)
00684             iov_till  = iov_since
00685         elif dbIOVBase == "lumiid":
00686             iov_since = str( pack(int(payload.Run), int(payload.IOVfirst)) )
00687             iov_till  = str( pack(int(payload.Run), int(payload.IOVlast)) )
00688         elif dbIOVBase == "timestamp":
00689             error = "ERROR: IOV " + dbIOVBase + " still not implemented."
00690             exit(error)
00691         else:
00692             error = "ERROR: IOV " + dbIOVBase + " unrecognized!"
00693             exit(error)
00694 
00695         if payloadNumber == 0:
00696             iovSinceFirst = iov_since
00697         if payloadNumber == len(payloadList)-1:
00698             iovTillLast   = iov_till
00699             
00700         appendSqliteFile(final_sqlite_file_name + ".db", tmpSqliteFileName, databaseTag, iov_since, iov_till ,workingDir)
00701         os.system("rm -f " + tmpPayloadFileName + " " + tmpSqliteFileName)
00702 
00703         
00704     #### CREATE payload for merged output
00705 
00706     print " create MERGED payload card for dropbox ..."
00707 
00708     dfile = open(metadata_file,'w')
00709 
00710     dfile.write('destDB '  + destDB        +'\n')
00711     dfile.write('tag '     + databaseTag   +'\n')
00712     dfile.write('inputtag'                 +'\n')
00713     dfile.write('since '   + iovSinceFirst +'\n')
00714     #dfile.write('till '    + iov_till      +'\n')
00715     dfile.write('Timetype '+ dbIOVBase     +'\n')
00716 
00717     ###################################################
00718     # WARNING tagType forced to offline
00719     print "WARNING TAG TYPE forced to be just offline"
00720     tagType = "offline"
00721     checkType = tagType
00722     if tagType == "express":
00723         checkType = "hlt"
00724     dfile.write('IOVCheck ' + checkType + '\n')
00725     dfile.write('usertext Beam spot position\n')
00726             
00727     dfile.close()
00728 
00729                                                                                                 
00730 
00731     if option.upload:
00732         print " scp files to offline Drop Box"
00733         dropbox = "/DropBox"
00734         if option.Test:
00735             dropbox = "/DropBox_test"
00736         print "UPLOADING TO TEST DB"
00737         uploadSqliteFile(workingDir, final_sqlite_file_name, dropbox)
00738                    
00739     archive_sqlite_file_name = "Payloads_" + iovSinceFirst + "_" + iovTillLast + "_" + final_sqlite_file_name
00740     archive_results_file_name = "Payloads_" + iovSinceFirst + "_" + iovTillLast + "_" + databaseTag + ".txt"
00741     if not os.path.isdir(archiveDir + 'payloads'):
00742         os.mkdir(archiveDir + 'payloads')
00743     commands.getstatusoutput('mv ' + sqlite_file   + ' ' + archiveDir + 'payloads/' + archive_sqlite_file_name + '.db')
00744     commands.getstatusoutput('mv ' + metadata_file + ' ' + archiveDir + 'payloads/' + archive_sqlite_file_name + '.txt')
00745     commands.getstatusoutput('cp ' + workingDir + payloadFileName + ' ' + archiveDir + 'payloads/' + archive_results_file_name)
00746   
00747     print archiveDir + "payloads/" + archive_sqlite_file_name + '.db'
00748     print archiveDir + "payloads/" + archive_sqlite_file_name + '.txt'
00749 
00750     rmLock()
    
def BeamSpotWorkflow::removeUncompleteRuns (   newRunList,
  dataSet 
)

Definition at line 443 of file BeamSpotWorkflow.py.

00444                                             :
00445     processedRuns = {}
00446     for fileName in newRunList:
00447         run = getRunNumberFromFileName(fileName)
00448         if not run in processedRuns:
00449             processedRuns[run] = 0
00450         processedRuns[run] += 1
00451 
00452     for run in processedRuns.keys():   
00453         nFiles = getNumberOfFilesToProcessForRun(dataSet,run)
00454         if processedRuns[run] < nFiles:
00455             print "I haven't processed all files yet : " + str(processedRuns[run]) + " out of " + str(nFiles) + " for run: " + str(run)
00456         else:
00457             print "All files have been processed for run: " + str(run) + " (" + str(processedRuns[run]) + " out of " + str(nFiles) + ")"
            
def BeamSpotWorkflow::selectFilesToProcess (   listOfRunsAndLumiFromDBS,
  listOfRunsAndLumiFromRR,
  newRunList,
  runListDir,
  dataSet,
  mailList,
  dbsTolerance,
  dbsTolerancePercent,
  rrTolerance,
  missingFilesTolerance,
  missingLumisTimeout 
)

Definition at line 226 of file BeamSpotWorkflow.py.

00227                                                                                                                                                                                                         :
00228     runsAndLumisProcessed = {}
00229     runsAndFiles = {}
00230     for fileName in newRunList:
00231         file = open(runListDir+fileName)
00232         for line in file:
00233             if line.find("Runnumber") != -1:
00234                 run = long(line.replace('\n','').split(' ')[1])
00235             elif line.find("LumiRange") != -1:
00236                 lumiLine = line.replace('\n','').split(' ')
00237                 begLumi = long(lumiLine[1])
00238                 endLumi = long(lumiLine[3])
00239                 if begLumi != endLumi:
00240                     error = "The lumi range is greater than 1 for run " + str(run) + " " + line + " in file: " + runListDir + fileName
00241                     exit(error)
00242                 else:
00243                     if not run in runsAndLumisProcessed:
00244                         runsAndLumisProcessed[run] = []
00245                     if begLumi in runsAndLumisProcessed[run]:
00246                         print "Lumi " + str(begLumi) + " in event " + str(run) + " already exist. This MUST not happen but right now I will ignore this lumi!"
00247                     else:    
00248                         runsAndLumisProcessed[run].append(begLumi)
00249         if not run in runsAndFiles:
00250             runsAndFiles[run] = []
00251         runsAndFiles[run].append(fileName)    
00252         file.close()
00253 
00254     rrKeys = listOfRunsAndLumiFromRR.keys()
00255     rrKeys.sort()
00256     dbsKeys = listOfRunsAndLumiFromDBS.keys()
00257     dbsKeys.sort()
00258     #I remove the last entry from DBS since I am not sure it is an already closed run!
00259     lastUnclosedRun = dbsKeys.pop()
00260     #print "Last unclosed run: " + str(lastUnclosedRun)
00261     procKeys = runsAndLumisProcessed.keys()
00262     procKeys.sort()
00263     #print "Run Registry:"    
00264     #print rrKeys
00265     #print "DBS:"    
00266     #print dbsKeys
00267     #print "List:"    
00268     #print procKeys
00269     #print lastUnclosedRun
00270     filesToProcess = []
00271     for run in rrKeys:
00272         RRList = []
00273         for lumiRange in listOfRunsAndLumiFromRR[run]:
00274             if lumiRange != []: 
00275                 for l in range(lumiRange[0],lumiRange[1]+1):
00276                     RRList.append(long(l))
00277         if run in procKeys and run < lastUnclosedRun:
00278             #print "run " + str(run) + " is in procKeys"
00279             if not run in dbsKeys and run != lastUnclosedRun:
00280                 error = "Impossible but run " + str(run) + " has been processed and it is also in the run registry but it is not in DBS!" 
00281                 exit(error)
00282             print "Working on run " + str(run)
00283             nFiles = 0
00284             for data in dataSet.split(','):
00285                 nFiles = getNumberOfFilesToProcessForRun(data,run)
00286                 if nFiles != 0:
00287                     break
00288             if len(runsAndFiles[run]) < nFiles:
00289                 print "I haven't processed all files yet : " + str(len(runsAndFiles[run])) + " out of " + str(nFiles) + " for run: " + str(run) 
00290                 if nFiles - len(runsAndFiles[run]) <= missingFilesTolerance:
00291                     timeoutManager("DBS_VERY_BIG_MISMATCH_Run"+str(run)) # resetting this timeout
00292                     timeoutType = timeoutManager("DBS_MISMATCH_Run"+str(run),missingLumisTimeout)
00293                     if timeoutType == 1:
00294                         print "WARNING: I previously set a timeout that expired...I'll continue with the script even if I didn't process all the lumis!"
00295                     else:
00296                         if timeoutType == -1:
00297                             print "WARNING: Setting the DBS_MISMATCH_Run" + str(run) + " timeout because I haven't processed all files!"
00298                         else:
00299                             print "WARNING: Timeout DBS_MISMATCH_Run" + str(run) + " is in progress."
00300                         return filesToProcess
00301                 else:
00302                     timeoutType = timeoutManager("DBS_VERY_BIG_MISMATCH_Run"+str(run),missingLumisTimeout)
00303                     if timeoutType == 1:
00304                         error = "ERROR: I previously set a timeout that expired...I can't continue with the script because there are too many (" + str(nFiles - len(runsAndFiles[run])) + " files missing) and for too long " + str(missingLumisTimeout/3600) + " hours! I will process anyway the runs before this one (" + str(run) + ")"
00305                         sendEmail(mailList,error)
00306                         return filesToProcess
00307                         #exit(error)
00308                     else:
00309                         if timeoutType == -1:
00310                             print "WARNING: Setting the DBS_VERY_BIG_MISMATCH_Run" + str(run) + " timeout because I haven't processed all files!"
00311                         else:
00312                             print "WARNING: Timeout DBS_VERY_BIG_MISMATCH_Run" + str(run) + " is in progress."
00313                         return filesToProcess
00314                     
00315             else:
00316                 timeoutManager("DBS_VERY_BIG_MISMATCH_Run"+str(run))
00317                 timeoutManager("DBS_MISMATCH_Run"+str(run))
00318                 print "I have processed " + str(len(runsAndFiles[run])) + " out of " + str(nFiles) + " files that are in DBS. So I should have all the lumis!" 
00319             errors          = []
00320             badProcessed    = []
00321             badDBSProcessed = []
00322             badDBS          = []
00323             badRRProcessed  = []
00324             badRR           = []
00325             #It is important for runsAndLumisProcessed[run] to be the first because the comparision is not ==
00326             badDBSProcessed,badDBS = compareLumiLists(runsAndLumisProcessed[run],listOfRunsAndLumiFromDBS[run],errors)
00327             for i in range(0,len(errors)):
00328                 errors[i] = errors[i].replace("listA","the processed lumis")
00329                 errors[i] = errors[i].replace("listB","DBS")
00330             #print errors
00331             #print badProcessed
00332             #print badDBS
00333             #exit("ciao")
00334             if len(badDBS) != 0:
00335                 print "This is weird because I processed more lumis than the ones that are in DBS!"
00336             if len(badDBSProcessed) != 0 and run in rrKeys:
00337                 lastError = len(errors)
00338                 #print RRList            
00339                 #It is important for runsAndLumisProcessed[run] to be the first because the comparision is not ==
00340                 badRRProcessed,badRR = compareLumiLists(runsAndLumisProcessed[run],RRList,errors)
00341                 for i in range(0,len(errors)):
00342                     errors[i] = errors[i].replace("listA","the processed lumis")
00343                     errors[i] = errors[i].replace("listB","Run Registry")
00344                 #print errors
00345                 #print badProcessed
00346                 #print badRunRegistry
00347                     
00348                 if len(badRRProcessed) != 0:    
00349                     print "I have not processed some of the lumis that are in the run registry for run: " + str(run)
00350                     for lumi in badDBSProcessed:
00351                         if lumi in badRRProcessed:
00352                             badProcessed.append(lumi)
00353                     lenA = len(badProcessed)
00354                     lenB = len(RRList)
00355                     if 100.*lenA/lenB <= dbsTolerancePercent:
00356                         print "WARNING: I didn't process " + str(100.*lenA/lenB) + "% of the lumis but I am within the " + str(dbsTolerancePercent) + "% set in the configuration. Which corrispond to " + str(lenA) + " out of " + str(lenB) + " lumis"
00357                         #print errors
00358                         badProcessed = []
00359                     elif lenA <= dbsTolerance:
00360                         print "WARNING: I didn't process " + str(lenA) + " lumis but I am within the " + str(dbsTolerance) + " lumis set in the configuration. Which corrispond to " + str(lenA) + " out of " + str(lenB) + " lumis"
00361                         #print errors
00362                         badProcessed = []
00363                     else:    
00364                         error = "ERROR: For run " + str(run) + " I didn't process " + str(100.*lenA/lenB) + "% of the lumis and I am not within the " + str(dbsTolerancePercent) + "% set in the configuration. The number of lumis that I didn't process (" + str(lenA) + " out of " + str(lenB) + ") is greater also than the " + str(dbsTolerance) + " lumis that I can tolerate. I can't process runs >= " + str(run) + " but I'll process the runs before!"
00365                         sendEmail(mailList,error)
00366                         print error
00367                         return filesToProcess
00368                         #exit(errors)
00369                     #return filesToProcess
00370                 elif len(errors) != 0:
00371                     print "The number of lumi sections processed didn't match the one in DBS but they cover all the ones in the Run Registry, so it is ok!"
00372                     #print errors
00373 
00374             #If I get here it means that I passed or the DBS or the RR test            
00375             if len(badProcessed) == 0:
00376                 for file in runsAndFiles[run]:
00377                     filesToProcess.append(file)
00378             else:
00379                 #print errors
00380                 print "This should never happen because if I have errors I return or exit! Run: " + str(run)
00381         else:
00382             error = "Run " + str(run) + " is in the run registry but it has not been processed yet!"
00383             print error
00384             timeoutType = timeoutManager("MISSING_RUNREGRUN_Run"+str(run),missingLumisTimeout)
00385             if timeoutType == 1:
00386                 if len(RRList) <= rrTolerance:
00387                     error = "WARNING: I previously set the MISSING_RUNREGRUN_Run" + str(run) + " timeout that expired...I am missing run " + str(run) + " but it only had " + str(len(RRList)) + " <= " + str(rrTolerance) + " lumis. So I will continue and ignore it... "
00388                     #print listOfRunsAndLumiFromRR[run]
00389                     print error
00390                     #sendEmail(mailList,error)
00391                 else:
00392                     error = "ERROR: I previously set the MISSING_RUNREGRUN_Run" + str(run) + " timeout that expired...I am missing run " + str(run) + " which has " + str(len(RRList)) + " > " + str(rrTolerance) + " lumis. I can't continue but I'll process the runs before this one"
00393                     sendEmail(mailList,error)
00394                     return filesToProcess
00395                     #exit(error)
00396             else:
00397                 if timeoutType == -1:
00398                     print "WARNING: Setting the MISSING_RUNREGRUN_Run" + str(run) + " timeout because I haven't processed a run!"
00399                 else:
00400                     print "WARNING: Timeout MISSING_RUNREGRUN_Run" + str(run) + " is in progress."
00401                 return filesToProcess
00402                     
    return filesToProcess

Variable Documentation

string BeamSpotWorkflow::error = "Please set a crab environment in order to get the proper JSON lib"

Definition at line 50 of file BeamSpotWorkflow.py.