CMS 3D CMS Logo

/data/doxygen/doxygen-1.7.3/gen/CMSSW_4_2_8/src/RecoVertex/BeamSpotProducer/scripts/BeamSpotWorkflow.py

Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 #____________________________________________________________
00003 #
00004 #  BeamSpotWorkflow
00005 #
00006 # A very complicate way to automatize the beam spot workflow
00007 #
00008 # Francisco Yumiceva, Lorenzo Uplegger
00009 # yumiceva@fnal.gov, uplegger@fnal.gov
00010 #
00011 # Fermilab, 2010
00012 #
00013 #____________________________________________________________
00014 
00015 """
00016    BeamSpotWorkflow.py
00017 
00018    A very complicate script to upload the results into the DB
00019 
00020    usage: %prog -d <data file/directory> -t <tag name>
00021    -c, --cfg = CFGFILE : Use a different configuration file than the default
00022    -l, --lock = LOCK   : Create a lock file to have just one script running 
00023    -o, --overwrite     : Overwrite results files when copying.
00024    -T, --Test          : Upload files to Test dropbox for data validation.   
00025    -u, --upload        : Upload files to offline drop box via scp.
00026    -z, --zlarge        : Enlarge sigmaZ to 10 +/- 0.005 cm.
00027 
00028    Francisco Yumiceva (yumiceva@fnal.gov)
00029    Lorenzo Uplegger   (send an email to Francisco)
00030    Fermilab 2010
00031    
00032 """
00033 
00034 
00035 import sys,os
00036 import commands, re, time
00037 import datetime
00038 import ConfigParser
00039 import xmlrpclib
00040 from BeamSpotObj import BeamSpot
00041 from IOVObj import IOV
00042 from CommonMethods import *
00043 
00044 try: # FUTURE: Python 2.6, prior to 2.6 requires simplejson
00045     import json
00046 except:
00047     try:
00048         import simplejson as json
00049     except:
00050         error = "Please set a crab environment in order to get the proper JSON lib"
00051         exit(error)
00052 
00053 #####################################################################################
00054 # General functions
00055 #####################################################################################
00056 def getLastUploadedIOV(tagName,destDB="oracle://cms_orcoff_prod/CMS_COND_31X_BEAMSPOT"):
00057     #return 582088327592295
00058     listIOVCommand = "cmscond_list_iov -c " + destDB + " -P /afs/cern.ch/cms/DB/conddb -t " + tagName 
00059     dbError = commands.getstatusoutput( listIOVCommand )
00060     if dbError[0] != 0 :
00061         if dbError[1].find("metadata entry \"" + tagName + "\" does not exist") != -1:
00062             print "Creating a new tag because I got the following error contacting the DB"
00063             print dbError[1]
00064             return 1
00065             #return 133928
00066         else:
00067             exit("ERROR: Can\'t connect to db because:\n" + dbError[1])
00068 
00069 
00070     aCommand = listIOVCommand + " | grep DB= | tail -1 | awk \'{print $1}\'"
00071     output = commands.getstatusoutput( aCommand )
00072     
00073     #WARNING when we pass to lumi IOV this should be long long
00074     if output[1] == '':
00075       exit("ERROR: The tag " + tagName + " exists but I can't get the value of the last IOV")
00076       
00077     return long(output[1])
00078 
00079 ########################################################################
00080 def getListOfFilesToProcess(dataSet,lastRun=-1):
00081     queryCommand = "dbs --search --query \"find file where dataset=" + dataSet
00082     if lastRun != -1:
00083         queryCommand = queryCommand + " and run > " + str(lastRun)
00084     queryCommand = queryCommand + "\" | grep .root"    
00085 #    print " >> " + queryCommand
00086     output = commands.getstatusoutput( queryCommand )
00087     return output[1].split('\n')
00088 
00089 ########################################################################
00090 def getNumberOfFilesToProcessForRun(dataSet,run):
00091     queryCommand = "dbs --search --query \"find file where dataset=" + dataSet + " and run = " + str(run) + "\" | grep .root"
00092     #print " >> " + queryCommand
00093     output = commands.getstatusoutput( queryCommand )
00094     if output[0] != 0:
00095         return 0
00096     else:
00097         return len(output[1].split('\n'))
00098 
00099 ########################################################################
00100 def getListOfRunsAndLumiFromDBS(dataSet,lastRun=-1):
00101     datasetList = dataSet.split(',')
00102     outputList = []
00103     for data in datasetList:
00104         queryCommand = "dbs --search --query \"find run,lumi where dataset=" + data
00105         if lastRun != -1:
00106             queryCommand = queryCommand + " and run > " + str(lastRun)
00107         queryCommand += "\""
00108         print " >> " + queryCommand
00109         output = []
00110         for i in range(0,3):
00111             output = commands.getstatusoutput( queryCommand )
00112             if output[0] == 0 and not (output[1].find("ERROR") != -1 or output[1].find("Error") != -1) :
00113                 break
00114         if output[0] != 0:
00115             exit("ERROR: I can't contact DBS for the following reason:\n" + output[1])
00116         #print output[1]
00117         tmpList = output[1].split('\n')
00118         for file in tmpList:
00119             outputList.append(file)
00120     runsAndLumis = {}
00121     for out in outputList:
00122         regExp = re.search('(\d+)\s+(\d+)',out)
00123         if regExp:
00124             run  = long(regExp.group(1))
00125             lumi = long(regExp.group(2))
00126             if not run in runsAndLumis:
00127                 runsAndLumis[run] = []
00128             runsAndLumis[run].append(lumi)
00129 
00130 #    print runsAndLumis
00131 #    exit("ok")
00132     return runsAndLumis
00133 
00134 #####################################################################################
00135 def getListOfRunsAndLumiFromFile(firstRun=-1,fileName=""):
00136     file = open(fileName);
00137     jsonFile = file.read();
00138     file.close()
00139     jsonList=json.loads(jsonFile);
00140 
00141     selected_dcs = {};
00142     for element in jsonList:
00143         selected_dcs[long(element)]=jsonList[element]
00144     return selected_dcs
00145 
00146 ########################################################################
00147 def getListOfRunsAndLumiFromRR(firstRun=-1):
00148     RunReg  ="https://pccmsdqm04.cern.ch/runregistry"
00149     #RunReg  = "https://localhost:40010/runregistry"
00150     #Dataset=%Online%
00151     Group   = "Collisions10"
00152 
00153     # get handler to RR XML-RPC server
00154     FULLADDRESS=RunReg + "/xmlrpc"
00155     #print "RunRegistry from: ",FULLADDRESS
00156     server = xmlrpclib.ServerProxy(FULLADDRESS)
00157     #sel_runtable="{groupName} ='" + Group + "' and {runNumber} > " + str(firstRun) + " and {datasetName} LIKE '" + Dataset + "'"
00158     sel_runtable="{groupName} ='" + Group + "' and {runNumber} > " + str(firstRun) 
00159     #sel_dcstable="{groupName} ='" + Group + "' and {runNumber} > " + str(firstRun) + " and {parDcsBpix} = 1 and {parDcsFpix} = 1 and {parDcsTibtid} = 1 and {parDcsTecM} = 1 and {parDcsTecP} = 1 and {parDcsTob} = 1 and {parDcsEbminus} = 1 and {parDcsEbplus} = 1 and {parDcsEeMinus} = 1 and {parDcsEePlus} = 1 and {parDcsEsMinus} = 1 and {parDcsEsPlus} = 1 and {parDcsHbheA} = 1 and {parDcsHbheB} = 1 and {parDcsHbheC} = 1 and {parDcsH0} = 1 and {parDcsHf} = 1"
00160 
00161     maxAttempts = 3;
00162     tries = 0;
00163     while tries<maxAttempts:
00164         try:
00165             run_data = server.DataExporter.export('RUN'           , 'GLOBAL', 'csv_runs', sel_runtable)
00166             #dcs_data = server.DataExporter.export('RUNLUMISECTION', 'GLOBAL', 'json'    , sel_dcstable)
00167             break
00168         except:
00169             print "Something wrong in accessing runregistry, retrying in 2s....", tries, "/", maxAttempts
00170             tries += 1
00171             time.sleep(2)
00172         if tries==maxAttempts:
00173             error = "Run registry unaccessible.....exiting now"
00174             return {};
00175     
00176 
00177     listOfRuns=[]
00178     for line in run_data.split("\n"):
00179         run=line.split(',')[0]
00180         if run.isdigit():
00181             listOfRuns.append(run)
00182 
00183     
00184     firstRun = listOfRuns[len(listOfRuns)-1];
00185     lastRun  = listOfRuns[0];
00186     sel_dcstable="{groupName} ='" + Group + "' and {runNumber} >= " + str(firstRun) + " and {runNumber} <= " + str(lastRun) + " and {parDcsBpix} = 1 and {parDcsFpix} = 1 and {parDcsTibtid} = 1 and {parDcsTecM} = 1 and {parDcsTecP} = 1 and {parDcsTob} = 1 and {parDcsEbminus} = 1 and {parDcsEbplus} = 1 and {parDcsEeMinus} = 1 and {parDcsEePlus} = 1 and {parDcsEsMinus} = 1 and {parDcsEsPlus} = 1 and {parDcsHbheA} = 1 and {parDcsHbheB} = 1 and {parDcsHbheC} = 1 and {parDcsH0} = 1 and {parDcsHf} = 1"
00187 
00188     tries = 0;
00189     while tries<maxAttempts:
00190         try:
00191             #run_data = server.DataExporter.export('RUN'           , 'GLOBAL', 'csv_runs', sel_runtable)
00192             dcs_data = server.DataExporter.export('RUNLUMISECTION', 'GLOBAL', 'json'    , sel_dcstable)
00193             break
00194         except:
00195             print "I was able to get the list of runs and now I am trying to access the detector status, retrying in 2s....", tries, "/", maxAttempts
00196             tries += 1
00197             time.sleep(2)
00198         if tries==maxAttempts:
00199             error = "Run registry unaccessible.....exiting now"
00200             return {};
00201 
00202     selected_dcs={}
00203     jsonList=json.loads(dcs_data)
00204 
00205     #for element in jsonList:
00206     for element in listOfRuns:
00207         #if element in listOfRuns:
00208         if element in jsonList:
00209             selected_dcs[long(element)]=jsonList[element]
00210         else:
00211             print "WARNING: Run " + element + " is a collision10 run with 0 lumis in Run Registry!" 
00212             selected_dcs[long(element)]= [[]] 
00213     #print selected_dcs        
00214     return selected_dcs
00215 
00216 ########################################################################
00217 def getLastClosedRun(DBSListOfFiles):
00218     runs = []
00219     for file in DBSListOfFiles:
00220         runNumber = getRunNumberFromDBSName(file)
00221         if runs.count(runNumber) == 0: 
00222             runs.append(runNumber)
00223 
00224     if len(runs) <= 1: #No closed run
00225         return -1
00226     else:
00227         runs.sort()
00228         return long(runs[len(runs)-2])
00229     
00230 ########################################################################
00231 def getRunNumberFromFileName(fileName):
00232 #    regExp = re.search('(\D+)_(\d+)_(\d+)_(\d+)',fileName)
00233     regExp = re.search('(\D+)_(\d+)_(\d+)_',fileName)
00234     if not regExp:
00235         return -1
00236     return long(regExp.group(3))
00237 
00238 ########################################################################
00239 def getRunNumberFromDBSName(fileName):
00240     regExp = re.search('(\D+)/(\d+)/(\d+)/(\d+)/(\D+)',fileName)
00241     if not regExp:
00242         return -1
00243     return long(regExp.group(3)+regExp.group(4))
00244     
00245 ########################################################################
00246 def getNewRunList(fromDir,lastUploadedIOV):
00247     newRunList = []
00248     listOfFiles = ls(fromDir,".txt")
00249     runFileMap = {}
00250     for fileName in listOfFiles:
00251         runNumber = getRunNumberFromFileName(fileName) 
00252         if runNumber > lastUploadedIOV:
00253             newRunList.append(fileName)
00254     return newRunList        
00255 
00256 ########################################################################
00257 def selectFilesToProcess(listOfRunsAndLumiFromDBS,listOfRunsAndLumiFromRR,newRunList,runListDir,dataSet,mailList,dbsTolerance,dbsTolerancePercent,rrTolerance,missingFilesTolerance,missingLumisTimeout):
00258     runsAndLumisProcessed = {}
00259     runsAndFiles = {}
00260     for fileName in newRunList:
00261         file = open(runListDir+fileName)
00262         for line in file:
00263             if line.find("Runnumber") != -1:
00264                 run = long(line.replace('\n','').split(' ')[1])
00265             elif line.find("LumiRange") != -1:
00266                 lumiLine = line.replace('\n','').split(' ')
00267                 begLumi = long(lumiLine[1])
00268                 endLumi = long(lumiLine[3])
00269                 if begLumi != endLumi:
00270                     error = "The lumi range is greater than 1 for run " + str(run) + " " + line + " in file: " + runListDir + fileName
00271                     exit(error)
00272                 else:
00273                     if not run in runsAndLumisProcessed:
00274                         runsAndLumisProcessed[run] = []
00275                     if begLumi in runsAndLumisProcessed[run]:
00276                         print "Lumi " + str(begLumi) + " in event " + str(run) + " already exist. This MUST not happen but right now I will ignore this lumi!"
00277                     else:    
00278                         runsAndLumisProcessed[run].append(begLumi)
00279         if not run in runsAndFiles:
00280             runsAndFiles[run] = []
00281         runsAndFiles[run].append(fileName)    
00282         file.close()
00283 
00284     rrKeys = listOfRunsAndLumiFromRR.keys()
00285     rrKeys.sort()
00286     dbsKeys = listOfRunsAndLumiFromDBS.keys()
00287     dbsKeys.sort()
00288     #I remove the last entry from DBS since I am not sure it is an already closed run!
00289     lastUnclosedRun = dbsKeys.pop()
00290     #print "Last unclosed run: " + str(lastUnclosedRun)
00291     procKeys = runsAndLumisProcessed.keys()
00292     procKeys.sort()
00293     #print "Run Registry:"    
00294     #print rrKeys
00295     #print "DBS:"    
00296     #print dbsKeys
00297     #print "List:"    
00298     #print procKeys
00299     #print lastUnclosedRun
00300     filesToProcess = []
00301     for run in rrKeys:
00302         RRList = []
00303         for lumiRange in listOfRunsAndLumiFromRR[run]:
00304             if lumiRange != []: 
00305                 for l in range(lumiRange[0],lumiRange[1]+1):
00306                     RRList.append(long(l))
00307         if run in procKeys and run < lastUnclosedRun:
00308             #print "run " + str(run) + " is in procKeys"
00309             if not run in dbsKeys and run != lastUnclosedRun:
00310                 error = "Impossible but run " + str(run) + " has been processed and it is also in the run registry but it is not in DBS!" 
00311                 exit(error)
00312             print "Working on run " + str(run)
00313             nFiles = 0
00314             for data in dataSet.split(','):
00315                 nFiles = getNumberOfFilesToProcessForRun(data,run)
00316                 if nFiles != 0:
00317                     break
00318             if len(runsAndFiles[run]) < nFiles:
00319                 print "I haven't processed all files yet : " + str(len(runsAndFiles[run])) + " out of " + str(nFiles) + " for run: " + str(run) 
00320                 if nFiles - len(runsAndFiles[run]) <= missingFilesTolerance:
00321                     timeoutManager("DBS_VERY_BIG_MISMATCH_Run"+str(run)) # resetting this timeout
00322                     timeoutType = timeoutManager("DBS_MISMATCH_Run"+str(run),missingLumisTimeout)
00323                     if timeoutType == 1:
00324                         print "WARNING: I previously set a timeout that expired...I'll continue with the script even if I didn't process all the lumis!"
00325                     else:
00326                         if timeoutType == -1:
00327                             print "WARNING: Setting the DBS_MISMATCH_Run" + str(run) + " timeout because I haven't processed all files!"
00328                         else:
00329                             print "WARNING: Timeout DBS_MISMATCH_Run" + str(run) + " is in progress."
00330                         return filesToProcess
00331                 else:
00332                     timeoutType = timeoutManager("DBS_VERY_BIG_MISMATCH_Run"+str(run),missingLumisTimeout)
00333                     if timeoutType == 1:
00334                         error = "ERROR: I previously set a timeout that expired...I can't continue with the script because there are too many (" + str(nFiles - len(runsAndFiles[run])) + " files missing) and for too long " + str(missingLumisTimeout/3600) + " hours! I will process anyway the runs before this one (" + str(run) + ")"
00335                         sendEmail(mailList,error)
00336                         return filesToProcess
00337                         #exit(error)
00338                     else:
00339                         if timeoutType == -1:
00340                             print "WARNING: Setting the DBS_VERY_BIG_MISMATCH_Run" + str(run) + " timeout because I haven't processed all files!"
00341                         else:
00342                             print "WARNING: Timeout DBS_VERY_BIG_MISMATCH_Run" + str(run) + " is in progress."
00343                         return filesToProcess
00344                     
00345             else:
00346                 timeoutManager("DBS_VERY_BIG_MISMATCH_Run"+str(run))
00347                 timeoutManager("DBS_MISMATCH_Run"+str(run))
00348                 print "I have processed " + str(len(runsAndFiles[run])) + " out of " + str(nFiles) + " files that are in DBS. So I should have all the lumis!" 
00349             errors          = []
00350             badProcessed    = []
00351             badDBSProcessed = []
00352             badDBS          = []
00353             badRRProcessed  = []
00354             badRR           = []
00355             #It is important for runsAndLumisProcessed[run] to be the first because the comparision is not ==
00356             badDBSProcessed,badDBS = compareLumiLists(runsAndLumisProcessed[run],listOfRunsAndLumiFromDBS[run],errors)
00357             for i in range(0,len(errors)):
00358                 errors[i] = errors[i].replace("listA","the processed lumis")
00359                 errors[i] = errors[i].replace("listB","DBS")
00360             #print errors
00361             #print badProcessed
00362             #print badDBS
00363             #exit("ciao")
00364             if len(badDBS) != 0:
00365                 print "This is weird because I processed more lumis than the ones that are in DBS!"
00366             if len(badDBSProcessed) != 0 and run in rrKeys:
00367                 lastError = len(errors)
00368                 #print RRList            
00369                 #It is important for runsAndLumisProcessed[run] to be the first because the comparision is not ==
00370                 badRRProcessed,badRR = compareLumiLists(runsAndLumisProcessed[run],RRList,errors)
00371                 for i in range(0,len(errors)):
00372                     errors[i] = errors[i].replace("listA","the processed lumis")
00373                     errors[i] = errors[i].replace("listB","Run Registry")
00374                 #print errors
00375                 #print badProcessed
00376                 #print badRunRegistry
00377                     
00378                 if len(badRRProcessed) != 0:    
00379                     print "I have not processed some of the lumis that are in the run registry for run: " + str(run)
00380                     for lumi in badDBSProcessed:
00381                         if lumi in badRRProcessed:
00382                             badProcessed.append(lumi)
00383                     lenA = len(badProcessed)
00384                     lenB = len(RRList)
00385                     if 100.*lenA/lenB <= dbsTolerancePercent:
00386                         print "WARNING: I didn't process " + str(100.*lenA/lenB) + "% of the lumis but I am within the " + str(dbsTolerancePercent) + "% set in the configuration. Which corrispond to " + str(lenA) + " out of " + str(lenB) + " lumis"
00387                         #print errors
00388                         badProcessed = []
00389                     elif lenA <= dbsTolerance:
00390                         print "WARNING: I didn't process " + str(lenA) + " lumis but I am within the " + str(dbsTolerance) + " lumis set in the configuration. Which corrispond to " + str(lenA) + " out of " + str(lenB) + " lumis"
00391                         #print errors
00392                         badProcessed = []
00393                     else:    
00394                         error = "ERROR: For run " + str(run) + " I didn't process " + str(100.*lenA/lenB) + "% of the lumis and I am not within the " + str(dbsTolerancePercent) + "% set in the configuration. The number of lumis that I didn't process (" + str(lenA) + " out of " + str(lenB) + ") is greater also than the " + str(dbsTolerance) + " lumis that I can tolerate. I can't process runs >= " + str(run) + " but I'll process the runs before!"
00395                         sendEmail(mailList,error)
00396                         print error
00397                         return filesToProcess
00398                         #exit(errors)
00399                     #return filesToProcess
00400                 elif len(errors) != 0:
00401                     print "The number of lumi sections processed didn't match the one in DBS but they cover all the ones in the Run Registry, so it is ok!"
00402                     #print errors
00403 
00404             #If I get here it means that I passed or the DBS or the RR test            
00405             if len(badProcessed) == 0:
00406                 for file in runsAndFiles[run]:
00407                     filesToProcess.append(file)
00408             else:
00409                 #print errors
00410                 print "This should never happen because if I have errors I return or exit! Run: " + str(run)
00411         else:
00412             error = "Run " + str(run) + " is in the run registry but it has not been processed yet!"
00413             print error
00414             timeoutType = timeoutManager("MISSING_RUNREGRUN_Run"+str(run),missingLumisTimeout)
00415             if timeoutType == 1:
00416                 if len(RRList) <= rrTolerance:
00417                     error = "WARNING: I previously set the MISSING_RUNREGRUN_Run" + str(run) + " timeout that expired...I am missing run " + str(run) + " but it only had " + str(len(RRList)) + " <= " + str(rrTolerance) + " lumis. So I will continue and ignore it... "
00418                     #print listOfRunsAndLumiFromRR[run]
00419                     print error
00420                     #sendEmail(mailList,error)
00421                 else:
00422                     error = "ERROR: I previously set the MISSING_RUNREGRUN_Run" + str(run) + " timeout that expired...I am missing run " + str(run) + " which has " + str(len(RRList)) + " > " + str(rrTolerance) + " lumis. I can't continue but I'll process the runs before this one"
00423                     sendEmail(mailList,error)
00424                     return filesToProcess
00425                     #exit(error)
00426             else:
00427                 if timeoutType == -1:
00428                     print "WARNING: Setting the MISSING_RUNREGRUN_Run" + str(run) + " timeout because I haven't processed a run!"
00429                 else:
00430                     print "WARNING: Timeout MISSING_RUNREGRUN_Run" + str(run) + " is in progress."
00431                 return filesToProcess
00432                     
00433     return filesToProcess
00434 ########################################################################
00435 def compareLumiLists(listA,listB,errors=[],tolerance=0):
00436     lenA = len(listA)
00437     lenB = len(listB)
00438     if lenA < lenB-(lenB*float(tolerance)/100):
00439         errors.append("ERROR: The number of lumi sections is different: listA(" + str(lenA) + ")!=(" + str(lenB) + ")listB")
00440     #else:
00441         #errors.append("Lumi check ok!listA(" + str(lenA) + ")-(" + str(lenB) + ")listB")
00442     #print errors
00443     listA.sort()
00444     listB.sort()
00445     #shorter = lenA
00446     #if lenB < shorter:
00447     #    shorter = lenB
00448     #a = 0
00449     #b = 0
00450     badA = []
00451     badB = []
00452     #print listB
00453     #print listA
00454     #print len(listA)
00455     #print len(listB)
00456     #counter = 1
00457     for lumi in listA:
00458         #print str(counter) + "->" + str(lumi)
00459         #counter += 1
00460         if not lumi in listB:
00461             errors.append("Lumi (" + str(lumi) + ") is in listA but not in listB")
00462             badB.append(lumi)
00463             #print "Bad B: " + str(lumi)
00464     #exit("hola")
00465     for lumi in listB:
00466         if not lumi in listA:
00467             errors.append("Lumi (" + str(lumi) + ") is in listB but not in listA")
00468             badA.append(lumi)
00469             #print "Bad A: " + str(lumi)
00470             
00471     return badA,badB
00472 
00473 ########################################################################
00474 def removeUncompleteRuns(newRunList,dataSet):
00475     processedRuns = {}
00476     for fileName in newRunList:
00477         run = getRunNumberFromFileName(fileName)
00478         if not run in processedRuns:
00479             processedRuns[run] = 0
00480         processedRuns[run] += 1
00481 
00482     for run in processedRuns.keys():   
00483         nFiles = getNumberOfFilesToProcessForRun(dataSet,run)
00484         if processedRuns[run] < nFiles:
00485             print "I haven't processed all files yet : " + str(processedRuns[run]) + " out of " + str(nFiles) + " for run: " + str(run)
00486         else:
00487             print "All files have been processed for run: " + str(run) + " (" + str(processedRuns[run]) + " out of " + str(nFiles) + ")"
00488             
00489 ########################################################################
00490 def aselectFilesToProcess(listOfFilesToProcess,newRunList):
00491     selectedFiles = []
00492     runsToProcess = {}
00493     processedRuns = {}
00494     for file in listOfFilesToProcess:
00495         run = getRunNumberFromDBSName(file)
00496 #        print "To process: " + str(run) 
00497         if run not in runsToProcess:
00498             runsToProcess[run] = 1
00499         else:
00500             runsToProcess[run] = runsToProcess[run] + 1 
00501 
00502     for file in newRunList:
00503         run = getRunNumberFromFileName(file)
00504 #        print "Processed: " + str(run)
00505         if run not in processedRuns:
00506             processedRuns[run] = 1
00507         else:
00508             processedRuns[run] = processedRuns[run] + 1 
00509 
00510     #WARNING: getLastClosedRun MUST also have a timeout otherwise the last run will not be considered
00511     lastClosedRun = getLastClosedRun(listOfFilesToProcess)
00512 #    print "LastClosedRun:-" + str(lastClosedRun) + "-"
00513 
00514     processedRunsKeys = processedRuns.keys()
00515     processedRunsKeys.sort()
00516 
00517     for run in processedRunsKeys:
00518         if run <= lastClosedRun :
00519             print "For run " + str(run) + " I have processed " + str(processedRuns[run]) + " files and in DBS there are " + str(runsToProcess[run]) + " files!"
00520             if not run in runsToProcess:
00521                 exit("ERROR: I have a result file for run " + str(run) + " but it doesn't exist in DBS. Impossible but it happened!")
00522             lumiList = getDBSLumiListForRun(run)
00523             if processedRuns[run] == runsToProcess[run]:
00524                 for file in newRunList:
00525                     if run == getRunNumberFromFileName(file):
00526                         selectedFiles.append(file)
00527             else:
00528                 exit("ERROR: For run " + str(run) + " I have processed " + str(processedRuns[run]) + " files but in DBS there are " + str(runsToProcess[run]) + " files!")
00529     return selectedFiles            
00530 
00531 ########################################################################
00532 def main():
00533     ######### COMMAND LINE OPTIONS ##############
00534     option,args = parse(__doc__)
00535 
00536     ######### Check if there is already a megascript running ########
00537     if option.lock:
00538         setLockName('.' + option.lock)
00539         if checkLock():
00540             print "There is already a megascript runnning...exiting"
00541             return
00542         else:
00543             lock()
00544             
00545 
00546     destDB = 'oracle://cms_orcon_prod/CMS_COND_31X_BEAMSPOT'
00547     if option.Test:
00548         destDB = 'oracle://cms_orcoff_prep/CMS_COND_BEAMSPOT'
00549 
00550     ######### CONFIGURATION FILE ################
00551     cfgFile = "BeamSpotWorkflow.cfg"    
00552     if option.cfg:
00553         cfgFile = option.cfg
00554     configurationFile = os.getenv("CMSSW_BASE") + "/src/RecoVertex/BeamSpotProducer/scripts/" + cfgFile
00555     configuration     = ConfigParser.ConfigParser()
00556     print 'Reading configuration from ', configurationFile
00557     configuration.read(configurationFile)
00558 
00559     sourceDir             = configuration.get('Common','SOURCE_DIR')
00560     archiveDir            = configuration.get('Common','ARCHIVE_DIR')
00561     workingDir            = configuration.get('Common','WORKING_DIR')
00562     databaseTag           = configuration.get('Common','DBTAG')
00563     dataSet               = configuration.get('Common','DATASET')
00564     fileIOVBase           = configuration.get('Common','FILE_IOV_BASE')
00565     dbIOVBase             = configuration.get('Common','DB_IOV_BASE')
00566     dbsTolerance          = float(configuration.get('Common','DBS_TOLERANCE'))
00567     dbsTolerancePercent   = float(configuration.get('Common','DBS_TOLERANCE_PERCENT'))
00568     rrTolerance           = float(configuration.get('Common','RR_TOLERANCE'))
00569     missingFilesTolerance = float(configuration.get('Common','MISSING_FILES_TOLERANCE'))
00570     missingLumisTimeout   = float(configuration.get('Common','MISSING_LUMIS_TIMEOUT'))
00571     jsonFileName          = configuration.get('Common','JSON_FILE')
00572     mailList              = configuration.get('Common','EMAIL')
00573 
00574     ######### DIRECTORIES SETUP #################
00575     if sourceDir[len(sourceDir)-1] != '/':
00576         sourceDir = sourceDir + '/'
00577     if not dirExists(sourceDir):
00578         error = "ERROR: The source directory " + sourceDir + " doesn't exist!"
00579         sendEmail(mailList,error)
00580         exit(error)
00581 
00582     if archiveDir[len(archiveDir)-1] != '/':
00583         archiveDir = archiveDir + '/'
00584     if not os.path.isdir(archiveDir):
00585         os.mkdir(archiveDir)
00586 
00587     if workingDir[len(workingDir)-1] != '/':
00588         workingDir = workingDir + '/'
00589     if not os.path.isdir(workingDir):
00590         os.mkdir(workingDir)
00591     else:
00592         os.system("rm -f "+ workingDir + "*") 
00593 
00594 
00595     print "Getting last IOV for tag: " + databaseTag
00596     lastUploadedIOV = 1
00597     if destDB == "oracle://cms_orcon_prod/CMS_COND_31X_BEAMSPOT": 
00598         lastUploadedIOV = getLastUploadedIOV(databaseTag)
00599     else:
00600         lastUploadedIOV = getLastUploadedIOV(databaseTag,destDB)
00601         
00602     #lastUploadedIOV = 133885
00603     #lastUploadedIOV = 575216380019329
00604     if dbIOVBase == "lumiid":
00605         lastUploadedIOV = unpackLumiid(lastUploadedIOV)["run"]
00606 
00607     ######### Get list of files processed after the last IOV  
00608     print "Getting list of files processed after IOV " + str(lastUploadedIOV)
00609     newProcessedRunList      = getNewRunList(sourceDir,lastUploadedIOV)
00610     if len(newProcessedRunList) == 0:
00611         exit("There are no new runs after " + str(lastUploadedIOV))
00612 
00613     ######### Copy files to archive directory
00614     print "Copying files to archive directory"
00615     copiedFiles = []
00616     for i in range(3):
00617         copiedFiles = cp(sourceDir,archiveDir,newProcessedRunList)    
00618         if len(copiedFiles) == len(newProcessedRunList):
00619             break;
00620     if len(copiedFiles) != len(newProcessedRunList):
00621         error = "ERROR: I can't copy more than " + str(len(copiedFiles)) + " files out of " + str(len(newProcessedRunList)) 
00622         sendEmail(mailList,error)
00623         exit(error)
00624 
00625 
00626     ######### Get from DBS the list of files after last IOV    
00627     #listOfFilesToProcess = getListOfFilesToProcess(dataSet,lastUploadedIOV) 
00628     print "Getting list of files from DBS"
00629     listOfRunsAndLumiFromDBS = getListOfRunsAndLumiFromDBS(dataSet,lastUploadedIOV)
00630     if len(listOfRunsAndLumiFromDBS) == 0:
00631        exit("There are no files in DBS to process") 
00632     print "Getting list of files from RR"
00633     listOfRunsAndLumiFromRR  = getListOfRunsAndLumiFromRR(lastUploadedIOV) 
00634     if(not listOfRunsAndLumiFromRR):
00635         print "Looks like I can't get anything from the run registry so I'll get the data from the json file " + jsonFileName
00636         listOfRunsAndLumiFromRR  = getListOfRunsAndLumiFromFile(lastUploadedIOV,jsonFileName) 
00637     ######### Get list of files to process for DB
00638     #selectedFilesToProcess = selectFilesToProcess(listOfFilesToProcess,copiedFiles)
00639     #completeProcessedRuns = removeUncompleteRuns(copiedFiles,dataSet)
00640     #print copiedFiles
00641     #print completeProcessedRuns
00642     #exit("complete")
00643     print "Getting list of files to process"
00644     selectedFilesToProcess = selectFilesToProcess(listOfRunsAndLumiFromDBS,listOfRunsAndLumiFromRR,copiedFiles,archiveDir,dataSet,mailList,dbsTolerance,dbsTolerancePercent,rrTolerance,missingFilesTolerance,missingLumisTimeout)
00645     if len(selectedFilesToProcess) == 0:
00646        exit("There are no files to process")
00647         
00648     #print selectedFilesToProcess
00649     ######### Copy files to working directory
00650     print "Copying files from archive to working directory"
00651     copiedFiles = []
00652     for i in range(3):
00653         copiedFiles = cp(archiveDir,workingDir,selectedFilesToProcess)    
00654         if len(copiedFiles) == len(selectedFilesToProcess):
00655             break;
00656         else:
00657             commands.getstatusoutput("rm -rf " + workingDir)
00658     if len(copiedFiles) != len(selectedFilesToProcess):
00659         error = "ERROR: I can't copy more than " + str(len(copiedFiles)) + " files out of " + str(len(selectedFilesToProcess)) + " from " + archiveDir + " to " + workingDir 
00660         sendEmail(mailList,error)
00661         exit(error)
00662 
00663     print "Sorting and cleaning beamlist"
00664     beamSpotObjList = []
00665     for fileName in copiedFiles:
00666         readBeamSpotFile(workingDir+fileName,beamSpotObjList,fileIOVBase)
00667 
00668     sortAndCleanBeamList(beamSpotObjList,fileIOVBase)
00669 
00670     if len(beamSpotObjList) == 0:
00671         error = "WARNING: None of the processed and copied payloads has a valid fit so there are no results. This shouldn't happen since we are filtering using the run register, so there should be at least one good run."
00672         exit(error)
00673 
00674     payloadFileName = "PayloadFile.txt"
00675 
00676     runBased = False
00677     if dbIOVBase == "runnumber":
00678         runBased = True
00679         
00680     payloadList = createWeightedPayloads(workingDir+payloadFileName,beamSpotObjList,runBased)
00681     if len(payloadList) == 0:
00682         error = "WARNING: I wasn't able to create any payload even if I have some BeamSpot objects."
00683         exit(error)
00684        
00685 
00686     tmpPayloadFileName = workingDir + "SingleTmpPayloadFile.txt"
00687     tmpSqliteFileName  = workingDir + "SingleTmpSqliteFile.db"
00688 
00689     writeDBTemplate = os.getenv("CMSSW_BASE") + "/src/RecoVertex/BeamSpotProducer/test/write2DB_template.py"
00690     readDBTemplate  = os.getenv("CMSSW_BASE") + "/src/RecoVertex/BeamSpotProducer/test/readDB_template.py"
00691     payloadNumber = -1
00692     iovSinceFirst = '0';
00693     iovTillLast   = '0';
00694 
00695     #Creating the final name for the combined sqlite file
00696     uuid = commands.getstatusoutput('uuidgen -t')[1]
00697     final_sqlite_file_name = databaseTag + '@' + uuid
00698     sqlite_file     = workingDir + final_sqlite_file_name + ".db"
00699     metadata_file   = workingDir + final_sqlite_file_name + ".txt"
00700 
00701     for payload in payloadList:
00702         payloadNumber += 1
00703         if option.zlarge:
00704             payload.sigmaZ = 10
00705             payload.sigmaZerr = 2.5e-05
00706         tmpFile = file(tmpPayloadFileName,'w')
00707         dumpValues(payload,tmpFile)
00708         tmpFile.close()
00709         if not writeSqliteFile(tmpSqliteFileName,databaseTag,dbIOVBase,tmpPayloadFileName,writeDBTemplate,workingDir):
00710             error = "An error occurred while writing the sqlite file: " + tmpSqliteFileName
00711             exit(error)
00712         readSqliteFile(tmpSqliteFileName,databaseTag,readDBTemplate,workingDir)
00713         
00714         ##############################################################
00715         #WARNING I am not sure if I am packing the right values
00716         if dbIOVBase == "runnumber":
00717             iov_since = str(payload.Run)
00718             iov_till  = iov_since
00719         elif dbIOVBase == "lumiid":
00720             iov_since = str( pack(int(payload.Run), int(payload.IOVfirst)) )
00721             iov_till  = str( pack(int(payload.Run), int(payload.IOVlast)) )
00722         elif dbIOVBase == "timestamp":
00723             error = "ERROR: IOV " + dbIOVBase + " still not implemented."
00724             exit(error)
00725         else:
00726             error = "ERROR: IOV " + dbIOVBase + " unrecognized!"
00727             exit(error)
00728 
00729         if payloadNumber == 0:
00730             iovSinceFirst = iov_since
00731         if payloadNumber == len(payloadList)-1:
00732             iovTillLast   = iov_till
00733             
00734         appendSqliteFile(final_sqlite_file_name + ".db", tmpSqliteFileName, databaseTag, iov_since, iov_till ,workingDir)
00735         os.system("rm -f " + tmpPayloadFileName + " " + tmpSqliteFileName)
00736 
00737         
00738     #### CREATE payload for merged output
00739 
00740     print " create MERGED payload card for dropbox ..."
00741 
00742     dfile = open(metadata_file,'w')
00743 
00744     dfile.write('destDB '  + destDB        +'\n')
00745     dfile.write('tag '     + databaseTag   +'\n')
00746     dfile.write('inputtag'                 +'\n')
00747     dfile.write('since '   + iovSinceFirst +'\n')
00748     #dfile.write('till '    + iov_till      +'\n')
00749     dfile.write('Timetype '+ dbIOVBase     +'\n')
00750 
00751     ###################################################
00752     # WARNING tagType forced to offline
00753     print "WARNING TAG TYPE forced to be just offline"
00754     tagType = "offline"
00755     checkType = tagType
00756     if tagType == "express":
00757         checkType = "hlt"
00758     dfile.write('IOVCheck ' + checkType + '\n')
00759     dfile.write('usertext Beam spot position\n')
00760             
00761     dfile.close()
00762 
00763                                                                                                 
00764 
00765     if option.upload:
00766         print " scp files to offline Drop Box"
00767         dropbox = "/DropBox"
00768         if option.Test:
00769             dropbox = "/DropBox_test"
00770         print "UPLOADING TO TEST DB"
00771         uploadSqliteFile(workingDir, final_sqlite_file_name, dropbox)
00772                    
00773     archive_sqlite_file_name = "Payloads_" + iovSinceFirst + "_" + iovTillLast + "_" + final_sqlite_file_name
00774     archive_results_file_name = "Payloads_" + iovSinceFirst + "_" + iovTillLast + "_" + databaseTag + ".txt"
00775     if not os.path.isdir(archiveDir + 'payloads'):
00776         os.mkdir(archiveDir + 'payloads')
00777     commands.getstatusoutput('mv ' + sqlite_file   + ' ' + archiveDir + 'payloads/' + archive_sqlite_file_name + '.db')
00778     commands.getstatusoutput('mv ' + metadata_file + ' ' + archiveDir + 'payloads/' + archive_sqlite_file_name + '.txt')
00779     commands.getstatusoutput('cp ' + workingDir + payloadFileName + ' ' + archiveDir + 'payloads/' + archive_results_file_name)
00780   
00781     print archiveDir + "payloads/" + archive_sqlite_file_name + '.db'
00782     print archiveDir + "payloads/" + archive_sqlite_file_name + '.txt'
00783 
00784     rmLock()
00785     
00786 if __name__ == '__main__':
00787     main()