Functions | |
def | aselectFilesToProcess |
def | compareLumiLists |
def | getLastClosedRun |
def | getLastUploadedIOV |
General functions. | |
def | getListOfFilesToProcess |
def | getListOfRunsAndLumiFromDBS |
def | getListOfRunsAndLumiFromFile |
def | getListOfRunsAndLumiFromRR |
def | getNewRunList |
def | getNumberOfFilesToProcessForRun |
def | getRunNumberFromDBSName |
def | getRunNumberFromFileName |
def | main |
def | removeUncompleteRuns |
def | selectFilesToProcess |
Variables | |
string | error = "Please set a crab environment in order to get the proper JSON lib" |
def BeamSpotWorkflow::aselectFilesToProcess | ( | listOfFilesToProcess, | |
newRunList | |||
) |
Definition at line 490 of file BeamSpotWorkflow.py.
00491 : 00492 selectedFiles = [] 00493 runsToProcess = {} 00494 processedRuns = {} 00495 for file in listOfFilesToProcess: 00496 run = getRunNumberFromDBSName(file) 00497 # print "To process: " + str(run) 00498 if run not in runsToProcess: 00499 runsToProcess[run] = 1 00500 else: 00501 runsToProcess[run] = runsToProcess[run] + 1 00502 00503 for file in newRunList: 00504 run = getRunNumberFromFileName(file) 00505 # print "Processed: " + str(run) 00506 if run not in processedRuns: 00507 processedRuns[run] = 1 00508 else: 00509 processedRuns[run] = processedRuns[run] + 1 00510 00511 #WARNING: getLastClosedRun MUST also have a timeout otherwise the last run will not be considered 00512 lastClosedRun = getLastClosedRun(listOfFilesToProcess) 00513 # print "LastClosedRun:-" + str(lastClosedRun) + "-" 00514 00515 processedRunsKeys = processedRuns.keys() 00516 processedRunsKeys.sort() 00517 00518 for run in processedRunsKeys: 00519 if run <= lastClosedRun : 00520 print "For run " + str(run) + " I have processed " + str(processedRuns[run]) + " files and in DBS there are " + str(runsToProcess[run]) + " files!" 00521 if not run in runsToProcess: 00522 exit("ERROR: I have a result file for run " + str(run) + " but it doesn't exist in DBS. Impossible but it happened!") 00523 lumiList = getDBSLumiListForRun(run) 00524 if processedRuns[run] == runsToProcess[run]: 00525 for file in newRunList: 00526 if run == getRunNumberFromFileName(file): 00527 selectedFiles.append(file) 00528 else: 00529 exit("ERROR: For run " + str(run) + " I have processed " + str(processedRuns[run]) + " files but in DBS there are " + str(runsToProcess[run]) + " files!") 00530 return selectedFiles
def BeamSpotWorkflow::compareLumiLists | ( | listA, | |
listB, | |||
errors = [] , |
|||
tolerance = 0 |
|||
) |
Definition at line 435 of file BeamSpotWorkflow.py.
00436 : 00437 lenA = len(listA) 00438 lenB = len(listB) 00439 if lenA < lenB-(lenB*float(tolerance)/100): 00440 errors.append("ERROR: The number of lumi sections is different: listA(" + str(lenA) + ")!=(" + str(lenB) + ")listB") 00441 #else: 00442 #errors.append("Lumi check ok!listA(" + str(lenA) + ")-(" + str(lenB) + ")listB") 00443 #print errors 00444 listA.sort() 00445 listB.sort() 00446 #shorter = lenA 00447 #if lenB < shorter: 00448 # shorter = lenB 00449 #a = 0 00450 #b = 0 00451 badA = [] 00452 badB = [] 00453 #print listB 00454 #print listA 00455 #print len(listA) 00456 #print len(listB) 00457 #counter = 1 00458 for lumi in listA: 00459 #print str(counter) + "->" + str(lumi) 00460 #counter += 1 00461 if not lumi in listB: 00462 errors.append("Lumi (" + str(lumi) + ") is in listA but not in listB") 00463 badB.append(lumi) 00464 #print "Bad B: " + str(lumi) 00465 #exit("hola") 00466 for lumi in listB: 00467 if not lumi in listA: 00468 errors.append("Lumi (" + str(lumi) + ") is in listB but not in listA") 00469 badA.append(lumi) 00470 #print "Bad A: " + str(lumi) 00471 00472 return badA,badB
def BeamSpotWorkflow::getLastClosedRun | ( | DBSListOfFiles | ) |
Definition at line 217 of file BeamSpotWorkflow.py.
00218 : 00219 runs = [] 00220 for file in DBSListOfFiles: 00221 runNumber = getRunNumberFromDBSName(file) 00222 if runs.count(runNumber) == 0: 00223 runs.append(runNumber) 00224 00225 if len(runs) <= 1: #No closed run 00226 return -1 00227 else: 00228 runs.sort() 00229 return long(runs[len(runs)-2])
def BeamSpotWorkflow::getLastUploadedIOV | ( | tagName, | |
destDB = "oracle://cms_orcoff_prod/CMS_COND_31X_BEAMSPOT" |
|||
) |
General functions.
Definition at line 56 of file BeamSpotWorkflow.py.
00056 ://cms_orcoff_prod/CMS_COND_31X_BEAMSPOT"): 00057 #return 582088327592295 00058 listIOVCommand = "cmscond_list_iov -c " + destDB + " -P /afs/cern.ch/cms/DB/conddb -t " + tagName 00059 dbError = commands.getstatusoutput( listIOVCommand ) 00060 if dbError[0] != 0 : 00061 if dbError[1].find("metadata entry \"" + tagName + "\" does not exist") != -1: 00062 print "Creating a new tag because I got the following error contacting the DB" 00063 print dbError[1] 00064 return 1 00065 #return 133928 00066 else: 00067 exit("ERROR: Can\'t connect to db because:\n" + dbError[1]) 00068 00069 00070 aCommand = listIOVCommand + " | grep DB= | tail -1 | awk \'{print $1}\'" 00071 output = commands.getstatusoutput( aCommand ) 00072 00073 #WARNING when we pass to lumi IOV this should be long long 00074 if output[1] == '': 00075 exit("ERROR: The tag " + tagName + " exists but I can't get the value of the last IOV") 00076 00077 return long(output[1]) 00078
def BeamSpotWorkflow::getListOfFilesToProcess | ( | dataSet, | |
lastRun = -1 |
|||
) |
Definition at line 80 of file BeamSpotWorkflow.py.
00081 : 00082 queryCommand = "dbs --search --query \"find file where dataset=" + dataSet 00083 if lastRun != -1: 00084 queryCommand = queryCommand + " and run > " + str(lastRun) 00085 queryCommand = queryCommand + "\" | grep .root" 00086 # print " >> " + queryCommand 00087 output = commands.getstatusoutput( queryCommand ) 00088 return output[1].split('\n')
def BeamSpotWorkflow::getListOfRunsAndLumiFromDBS | ( | dataSet, | |
lastRun = -1 |
|||
) |
Definition at line 100 of file BeamSpotWorkflow.py.
00101 : 00102 datasetList = dataSet.split(',') 00103 outputList = [] 00104 for data in datasetList: 00105 queryCommand = "dbs --search --query \"find run,lumi where dataset=" + data 00106 if lastRun != -1: 00107 queryCommand = queryCommand + " and run > " + str(lastRun) 00108 queryCommand += "\"" 00109 print " >> " + queryCommand 00110 output = [] 00111 for i in range(0,3): 00112 output = commands.getstatusoutput( queryCommand ) 00113 if output[0] == 0 and not (output[1].find("ERROR") != -1 or output[1].find("Error") != -1) : 00114 break 00115 if output[0] != 0: 00116 exit("ERROR: I can't contact DBS for the following reason:\n" + output[1]) 00117 #print output[1] 00118 tmpList = output[1].split('\n') 00119 for file in tmpList: 00120 outputList.append(file) 00121 runsAndLumis = {} 00122 for out in outputList: 00123 regExp = re.search('(\d+)\s+(\d+)',out) 00124 if regExp: 00125 run = long(regExp.group(1)) 00126 lumi = long(regExp.group(2)) 00127 if not run in runsAndLumis: 00128 runsAndLumis[run] = [] 00129 runsAndLumis[run].append(lumi) 00130 00131 # print runsAndLumis 00132 # exit("ok") 00133 return runsAndLumis
def BeamSpotWorkflow::getListOfRunsAndLumiFromFile | ( | firstRun = -1 , |
|
fileName = "" |
|||
) |
Definition at line 135 of file BeamSpotWorkflow.py.
def BeamSpotWorkflow::getListOfRunsAndLumiFromRR | ( | firstRun = -1 | ) |
Definition at line 147 of file BeamSpotWorkflow.py.
00148 : 00149 RunReg ="http://pccmsdqm04.cern.ch/runregistry" 00150 #RunReg = "http://localhost:40010/runregistry" 00151 #Dataset=%Online% 00152 Group = "Collisions10" 00153 00154 # get handler to RR XML-RPC server 00155 FULLADDRESS=RunReg + "/xmlrpc" 00156 #print "RunRegistry from: ",FULLADDRESS 00157 server = xmlrpclib.ServerProxy(FULLADDRESS) 00158 #sel_runtable="{groupName} ='" + Group + "' and {runNumber} > " + str(firstRun) + " and {datasetName} LIKE '" + Dataset + "'" 00159 sel_runtable="{groupName} ='" + Group + "' and {runNumber} > " + str(firstRun) 00160 #sel_dcstable="{groupName} ='" + Group + "' and {runNumber} > " + str(firstRun) + " and {parDcsBpix} = 1 and {parDcsFpix} = 1 and {parDcsTibtid} = 1 and {parDcsTecM} = 1 and {parDcsTecP} = 1 and {parDcsTob} = 1 and {parDcsEbminus} = 1 and {parDcsEbplus} = 1 and {parDcsEeMinus} = 1 and {parDcsEePlus} = 1 and {parDcsEsMinus} = 1 and {parDcsEsPlus} = 1 and {parDcsHbheA} = 1 and {parDcsHbheB} = 1 and {parDcsHbheC} = 1 and {parDcsH0} = 1 and {parDcsHf} = 1" 00161 00162 maxAttempts = 3; 00163 tries = 0; 00164 while tries<maxAttempts: 00165 try: 00166 run_data = server.DataExporter.export('RUN' , 'GLOBAL', 'csv_runs', sel_runtable) 00167 #dcs_data = server.DataExporter.export('RUNLUMISECTION', 'GLOBAL', 'json' , sel_dcstable) 00168 break 00169 except: 00170 print "Something wrong in accessing runregistry, retrying in 2s....", tries, "/", maxAttempts 00171 tries += 1 00172 time.sleep(2) 00173 if tries==maxAttempts: 00174 error = "Run registry unaccessible.....exiting now" 00175 return {}; 00176 00177 00178 listOfRuns=[] 00179 for line in run_data.split("\n"): 00180 run=line.split(',')[0] 00181 if run.isdigit(): 00182 listOfRuns.append(run) 00183 00184 00185 firstRun = listOfRuns[len(listOfRuns)-1]; 00186 lastRun = listOfRuns[0]; 00187 sel_dcstable="{groupName} ='" + Group + "' and {runNumber} >= " + str(firstRun) + " and {runNumber} <= " + str(lastRun) + " and {parDcsBpix} = 1 and {parDcsFpix} = 1 and {parDcsTibtid} = 1 and {parDcsTecM} = 1 and {parDcsTecP} = 1 and {parDcsTob} = 1 and {parDcsEbminus} = 1 and {parDcsEbplus} = 1 and {parDcsEeMinus} = 1 and {parDcsEePlus} = 1 and {parDcsEsMinus} = 1 and {parDcsEsPlus} = 1 and {parDcsHbheA} = 1 and {parDcsHbheB} = 1 and {parDcsHbheC} = 1 and {parDcsH0} = 1 and {parDcsHf} = 1" 00188 00189 tries = 0; 00190 while tries<maxAttempts: 00191 try: 00192 #run_data = server.DataExporter.export('RUN' , 'GLOBAL', 'csv_runs', sel_runtable) 00193 dcs_data = server.DataExporter.export('RUNLUMISECTION', 'GLOBAL', 'json' , sel_dcstable) 00194 break 00195 except: 00196 print "I was able to get the list of runs and now I am trying to access the detector status, retrying in 2s....", tries, "/", maxAttempts 00197 tries += 1 00198 time.sleep(2) 00199 if tries==maxAttempts: 00200 error = "Run registry unaccessible.....exiting now" 00201 return {}; 00202 00203 selected_dcs={} 00204 jsonList=json.loads(dcs_data) 00205 00206 #for element in jsonList: 00207 for element in listOfRuns: 00208 #if element in listOfRuns: 00209 if element in jsonList: 00210 selected_dcs[long(element)]=jsonList[element] 00211 else: 00212 print "WARNING: Run " + element + " is a collision10 run with 0 lumis in Run Registry!" 00213 selected_dcs[long(element)]= [[]] 00214 #print selected_dcs 00215 return selected_dcs
def BeamSpotWorkflow::getNewRunList | ( | fromDir, | |
lastUploadedIOV | |||
) |
Definition at line 246 of file BeamSpotWorkflow.py.
def BeamSpotWorkflow::getNumberOfFilesToProcessForRun | ( | dataSet, | |
run | |||
) |
Definition at line 90 of file BeamSpotWorkflow.py.
00091 : 00092 queryCommand = "dbs --search --query \"find file where dataset=" + dataSet + " and run = " + str(run) + "\" | grep .root" 00093 #print " >> " + queryCommand 00094 output = commands.getstatusoutput( queryCommand ) 00095 if output[0] != 0: 00096 return 0 00097 else: 00098 return len(output[1].split('\n'))
def BeamSpotWorkflow::getRunNumberFromDBSName | ( | fileName | ) |
Definition at line 239 of file BeamSpotWorkflow.py.
def BeamSpotWorkflow::getRunNumberFromFileName | ( | fileName | ) |
Definition at line 231 of file BeamSpotWorkflow.py.
def BeamSpotWorkflow::main | ( | ) |
Definition at line 532 of file BeamSpotWorkflow.py.
00533 : 00534 ######### COMMAND LINE OPTIONS ############## 00535 option,args = parse(__doc__) 00536 00537 ######### Check if there is already a megascript running ######## 00538 if option.lock: 00539 setLockName('.' + option.lock) 00540 if checkLock(): 00541 print "There is already a megascript runnning...exiting" 00542 return 00543 else: 00544 lock() 00545 00546 00547 destDB = 'oracle://cms_orcon_prod/CMS_COND_31X_BEAMSPOT' 00548 if option.Test: 00549 destDB = 'oracle://cms_orcoff_prep/CMS_COND_BEAMSPOT' 00550 00551 ######### CONFIGURATION FILE ################ 00552 cfgFile = "BeamSpotWorkflow.cfg" 00553 if option.cfg: 00554 cfgFile = option.cfg 00555 configurationFile = os.getenv("CMSSW_BASE") + "/src/RecoVertex/BeamSpotProducer/scripts/" + cfgFile 00556 configuration = ConfigParser.ConfigParser() 00557 print 'Reading configuration from ', configurationFile 00558 configuration.read(configurationFile) 00559 00560 sourceDir = configuration.get('Common','SOURCE_DIR') 00561 archiveDir = configuration.get('Common','ARCHIVE_DIR') 00562 workingDir = configuration.get('Common','WORKING_DIR') 00563 databaseTag = configuration.get('Common','DBTAG') 00564 dataSet = configuration.get('Common','DATASET') 00565 fileIOVBase = configuration.get('Common','FILE_IOV_BASE') 00566 dbIOVBase = configuration.get('Common','DB_IOV_BASE') 00567 dbsTolerance = float(configuration.get('Common','DBS_TOLERANCE')) 00568 dbsTolerancePercent = float(configuration.get('Common','DBS_TOLERANCE_PERCENT')) 00569 rrTolerance = float(configuration.get('Common','RR_TOLERANCE')) 00570 missingFilesTolerance = float(configuration.get('Common','MISSING_FILES_TOLERANCE')) 00571 missingLumisTimeout = float(configuration.get('Common','MISSING_LUMIS_TIMEOUT')) 00572 jsonFileName = configuration.get('Common','JSON_FILE') 00573 mailList = configuration.get('Common','EMAIL') 00574 00575 ######### DIRECTORIES SETUP ################# 00576 if sourceDir[len(sourceDir)-1] != '/': 00577 sourceDir = sourceDir + '/' 00578 if not dirExists(sourceDir): 00579 error = "ERROR: The source directory " + sourceDir + " doesn't exist!" 00580 sendEmail(mailList,error) 00581 exit(error) 00582 00583 if archiveDir[len(archiveDir)-1] != '/': 00584 archiveDir = archiveDir + '/' 00585 if not os.path.isdir(archiveDir): 00586 os.mkdir(archiveDir) 00587 00588 if workingDir[len(workingDir)-1] != '/': 00589 workingDir = workingDir + '/' 00590 if not os.path.isdir(workingDir): 00591 os.mkdir(workingDir) 00592 else: 00593 os.system("rm -f "+ workingDir + "*") 00594 00595 00596 print "Getting last IOV for tag: " + databaseTag 00597 lastUploadedIOV = 1 00598 if destDB == "oracle://cms_orcon_prod/CMS_COND_31X_BEAMSPOT": 00599 lastUploadedIOV = getLastUploadedIOV(databaseTag) 00600 else: 00601 lastUploadedIOV = getLastUploadedIOV(databaseTag,destDB) 00602 00603 #lastUploadedIOV = 133885 00604 #lastUploadedIOV = 575216380019329 00605 if dbIOVBase == "lumiid": 00606 lastUploadedIOV = unpackLumiid(lastUploadedIOV)["run"] 00607 00608 ######### Get list of files processed after the last IOV 00609 print "Getting list of files processed after IOV " + str(lastUploadedIOV) 00610 newProcessedRunList = getNewRunList(sourceDir,lastUploadedIOV) 00611 if len(newProcessedRunList) == 0: 00612 exit("There are no new runs after " + str(lastUploadedIOV)) 00613 00614 ######### Copy files to archive directory 00615 print "Copying files to archive directory" 00616 copiedFiles = [] 00617 for i in range(3): 00618 copiedFiles = cp(sourceDir,archiveDir,newProcessedRunList) 00619 if len(copiedFiles) == len(newProcessedRunList): 00620 break; 00621 if len(copiedFiles) != len(newProcessedRunList): 00622 error = "ERROR: I can't copy more than " + str(len(copiedFiles)) + " files out of " + str(len(newProcessedRunList)) 00623 sendEmail(mailList,error) 00624 exit(error) 00625 00626 00627 ######### Get from DBS the list of files after last IOV 00628 #listOfFilesToProcess = getListOfFilesToProcess(dataSet,lastUploadedIOV) 00629 print "Getting list of files from DBS" 00630 listOfRunsAndLumiFromDBS = getListOfRunsAndLumiFromDBS(dataSet,lastUploadedIOV) 00631 if len(listOfRunsAndLumiFromDBS) == 0: 00632 exit("There are no files in DBS to process") 00633 print "Getting list of files from RR" 00634 listOfRunsAndLumiFromRR = getListOfRunsAndLumiFromRR(lastUploadedIOV) 00635 if(not listOfRunsAndLumiFromRR): 00636 print "Looks like I can't get anything from the run registry so I'll get the data from the json file " + jsonFileName 00637 listOfRunsAndLumiFromRR = getListOfRunsAndLumiFromFile(lastUploadedIOV,jsonFileName) 00638 ######### Get list of files to process for DB 00639 #selectedFilesToProcess = selectFilesToProcess(listOfFilesToProcess,copiedFiles) 00640 #completeProcessedRuns = removeUncompleteRuns(copiedFiles,dataSet) 00641 #print copiedFiles 00642 #print completeProcessedRuns 00643 #exit("complete") 00644 print "Getting list of files to process" 00645 selectedFilesToProcess = selectFilesToProcess(listOfRunsAndLumiFromDBS,listOfRunsAndLumiFromRR,copiedFiles,archiveDir,dataSet,mailList,dbsTolerance,dbsTolerancePercent,rrTolerance,missingFilesTolerance,missingLumisTimeout) 00646 if len(selectedFilesToProcess) == 0: 00647 exit("There are no files to process") 00648 00649 #print selectedFilesToProcess 00650 ######### Copy files to working directory 00651 print "Copying files from archive to working directory" 00652 copiedFiles = [] 00653 for i in range(3): 00654 copiedFiles = cp(archiveDir,workingDir,selectedFilesToProcess) 00655 if len(copiedFiles) == len(selectedFilesToProcess): 00656 break; 00657 else: 00658 commands.getstatusoutput("rm -rf " + workingDir) 00659 if len(copiedFiles) != len(selectedFilesToProcess): 00660 error = "ERROR: I can't copy more than " + str(len(copiedFiles)) + " files out of " + str(len(selectedFilesToProcess)) + " from " + archiveDir + " to " + workingDir 00661 sendEmail(mailList,error) 00662 exit(error) 00663 00664 print "Sorting and cleaning beamlist" 00665 beamSpotObjList = [] 00666 for fileName in copiedFiles: 00667 readBeamSpotFile(workingDir+fileName,beamSpotObjList,fileIOVBase) 00668 00669 sortAndCleanBeamList(beamSpotObjList,fileIOVBase) 00670 00671 if len(beamSpotObjList) == 0: 00672 error = "WARNING: None of the processed and copied payloads has a valid fit so there are no results. This shouldn't happen since we are filtering using the run register, so there should be at least one good run." 00673 exit(error) 00674 00675 payloadFileName = "PayloadFile.txt" 00676 00677 runBased = False 00678 if dbIOVBase == "runnumber": 00679 runBased = True 00680 00681 payloadList = createWeightedPayloads(workingDir+payloadFileName,beamSpotObjList,runBased) 00682 if len(payloadList) == 0: 00683 error = "WARNING: I wasn't able to create any payload even if I have some BeamSpot objects." 00684 exit(error) 00685 00686 00687 tmpPayloadFileName = workingDir + "SingleTmpPayloadFile.txt" 00688 tmpSqliteFileName = workingDir + "SingleTmpSqliteFile.db" 00689 00690 writeDBTemplate = os.getenv("CMSSW_BASE") + "/src/RecoVertex/BeamSpotProducer/test/write2DB_template.py" 00691 readDBTemplate = os.getenv("CMSSW_BASE") + "/src/RecoVertex/BeamSpotProducer/test/readDB_template.py" 00692 payloadNumber = -1 00693 iovSinceFirst = '0'; 00694 iovTillLast = '0'; 00695 00696 #Creating the final name for the combined sqlite file 00697 uuid = commands.getstatusoutput('uuidgen -t')[1] 00698 final_sqlite_file_name = databaseTag + '@' + uuid 00699 sqlite_file = workingDir + final_sqlite_file_name + ".db" 00700 metadata_file = workingDir + final_sqlite_file_name + ".txt" 00701 00702 for payload in payloadList: 00703 payloadNumber += 1 00704 if option.zlarge: 00705 payload.sigmaZ = 10 00706 payload.sigmaZerr = 2.5e-05 00707 tmpFile = file(tmpPayloadFileName,'w') 00708 dumpValues(payload,tmpFile) 00709 tmpFile.close() 00710 if not writeSqliteFile(tmpSqliteFileName,databaseTag,dbIOVBase,tmpPayloadFileName,writeDBTemplate,workingDir): 00711 error = "An error occurred while writing the sqlite file: " + tmpSqliteFileName 00712 exit(error) 00713 readSqliteFile(tmpSqliteFileName,databaseTag,readDBTemplate,workingDir) 00714 00715 ############################################################## 00716 #WARNING I am not sure if I am packing the right values 00717 if dbIOVBase == "runnumber": 00718 iov_since = str(payload.Run) 00719 iov_till = iov_since 00720 elif dbIOVBase == "lumiid": 00721 iov_since = str( pack(int(payload.Run), int(payload.IOVfirst)) ) 00722 iov_till = str( pack(int(payload.Run), int(payload.IOVlast)) ) 00723 elif dbIOVBase == "timestamp": 00724 error = "ERROR: IOV " + dbIOVBase + " still not implemented." 00725 exit(error) 00726 else: 00727 error = "ERROR: IOV " + dbIOVBase + " unrecognized!" 00728 exit(error) 00729 00730 if payloadNumber == 0: 00731 iovSinceFirst = iov_since 00732 if payloadNumber == len(payloadList)-1: 00733 iovTillLast = iov_till 00734 00735 appendSqliteFile(final_sqlite_file_name + ".db", tmpSqliteFileName, databaseTag, iov_since, iov_till ,workingDir) 00736 os.system("rm -f " + tmpPayloadFileName + " " + tmpSqliteFileName) 00737 00738 00739 #### CREATE payload for merged output 00740 00741 print " create MERGED payload card for dropbox ..." 00742 00743 dfile = open(metadata_file,'w') 00744 00745 dfile.write('destDB ' + destDB +'\n') 00746 dfile.write('tag ' + databaseTag +'\n') 00747 dfile.write('inputtag' +'\n') 00748 dfile.write('since ' + iovSinceFirst +'\n') 00749 #dfile.write('till ' + iov_till +'\n') 00750 dfile.write('Timetype '+ dbIOVBase +'\n') 00751 00752 ################################################### 00753 # WARNING tagType forced to offline 00754 print "WARNING TAG TYPE forced to be just offline" 00755 tagType = "offline" 00756 checkType = tagType 00757 if tagType == "express": 00758 checkType = "hlt" 00759 dfile.write('IOVCheck ' + checkType + '\n') 00760 dfile.write('usertext Beam spot position\n') 00761 00762 dfile.close() 00763 00764 00765 00766 if option.upload: 00767 print " scp files to offline Drop Box" 00768 dropbox = "/DropBox" 00769 if option.Test: 00770 dropbox = "/DropBox_test" 00771 print "UPLOADING TO TEST DB" 00772 uploadSqliteFile(workingDir, final_sqlite_file_name, dropbox) 00773 00774 archive_sqlite_file_name = "Payloads_" + iovSinceFirst + "_" + iovTillLast + "_" + final_sqlite_file_name 00775 archive_results_file_name = "Payloads_" + iovSinceFirst + "_" + iovTillLast + "_" + databaseTag + ".txt" 00776 if not os.path.isdir(archiveDir + 'payloads'): 00777 os.mkdir(archiveDir + 'payloads') 00778 commands.getstatusoutput('mv ' + sqlite_file + ' ' + archiveDir + 'payloads/' + archive_sqlite_file_name + '.db') 00779 commands.getstatusoutput('mv ' + metadata_file + ' ' + archiveDir + 'payloads/' + archive_sqlite_file_name + '.txt') 00780 commands.getstatusoutput('cp ' + workingDir + payloadFileName + ' ' + archiveDir + 'payloads/' + archive_results_file_name) 00781 00782 print archiveDir + "payloads/" + archive_sqlite_file_name + '.db' 00783 print archiveDir + "payloads/" + archive_sqlite_file_name + '.txt' 00784 00785 rmLock()
def BeamSpotWorkflow::removeUncompleteRuns | ( | newRunList, | |
dataSet | |||
) |
Definition at line 474 of file BeamSpotWorkflow.py.
00475 : 00476 processedRuns = {} 00477 for fileName in newRunList: 00478 run = getRunNumberFromFileName(fileName) 00479 if not run in processedRuns: 00480 processedRuns[run] = 0 00481 processedRuns[run] += 1 00482 00483 for run in processedRuns.keys(): 00484 nFiles = getNumberOfFilesToProcessForRun(dataSet,run) 00485 if processedRuns[run] < nFiles: 00486 print "I haven't processed all files yet : " + str(processedRuns[run]) + " out of " + str(nFiles) + " for run: " + str(run) 00487 else: 00488 print "All files have been processed for run: " + str(run) + " (" + str(processedRuns[run]) + " out of " + str(nFiles) + ")"
def BeamSpotWorkflow::selectFilesToProcess | ( | listOfRunsAndLumiFromDBS, | |
listOfRunsAndLumiFromRR, | |||
newRunList, | |||
runListDir, | |||
dataSet, | |||
mailList, | |||
dbsTolerance, | |||
dbsTolerancePercent, | |||
rrTolerance, | |||
missingFilesTolerance, | |||
missingLumisTimeout | |||
) |
Definition at line 257 of file BeamSpotWorkflow.py.
00258 : 00259 runsAndLumisProcessed = {} 00260 runsAndFiles = {} 00261 for fileName in newRunList: 00262 file = open(runListDir+fileName) 00263 for line in file: 00264 if line.find("Runnumber") != -1: 00265 run = long(line.replace('\n','').split(' ')[1]) 00266 elif line.find("LumiRange") != -1: 00267 lumiLine = line.replace('\n','').split(' ') 00268 begLumi = long(lumiLine[1]) 00269 endLumi = long(lumiLine[3]) 00270 if begLumi != endLumi: 00271 error = "The lumi range is greater than 1 for run " + str(run) + " " + line + " in file: " + runListDir + fileName 00272 exit(error) 00273 else: 00274 if not run in runsAndLumisProcessed: 00275 runsAndLumisProcessed[run] = [] 00276 if begLumi in runsAndLumisProcessed[run]: 00277 print "Lumi " + str(begLumi) + " in event " + str(run) + " already exist. This MUST not happen but right now I will ignore this lumi!" 00278 else: 00279 runsAndLumisProcessed[run].append(begLumi) 00280 if not run in runsAndFiles: 00281 runsAndFiles[run] = [] 00282 runsAndFiles[run].append(fileName) 00283 file.close() 00284 00285 rrKeys = listOfRunsAndLumiFromRR.keys() 00286 rrKeys.sort() 00287 dbsKeys = listOfRunsAndLumiFromDBS.keys() 00288 dbsKeys.sort() 00289 #I remove the last entry from DBS since I am not sure it is an already closed run! 00290 lastUnclosedRun = dbsKeys.pop() 00291 #print "Last unclosed run: " + str(lastUnclosedRun) 00292 procKeys = runsAndLumisProcessed.keys() 00293 procKeys.sort() 00294 #print "Run Registry:" 00295 #print rrKeys 00296 #print "DBS:" 00297 #print dbsKeys 00298 #print "List:" 00299 #print procKeys 00300 #print lastUnclosedRun 00301 filesToProcess = [] 00302 for run in rrKeys: 00303 RRList = [] 00304 for lumiRange in listOfRunsAndLumiFromRR[run]: 00305 if lumiRange != []: 00306 for l in range(lumiRange[0],lumiRange[1]+1): 00307 RRList.append(long(l)) 00308 if run in procKeys and run < lastUnclosedRun: 00309 #print "run " + str(run) + " is in procKeys" 00310 if not run in dbsKeys and run != lastUnclosedRun: 00311 error = "Impossible but run " + str(run) + " has been processed and it is also in the run registry but it is not in DBS!" 00312 exit(error) 00313 print "Working on run " + str(run) 00314 nFiles = 0 00315 for data in dataSet.split(','): 00316 nFiles = getNumberOfFilesToProcessForRun(data,run) 00317 if nFiles != 0: 00318 break 00319 if len(runsAndFiles[run]) < nFiles: 00320 print "I haven't processed all files yet : " + str(len(runsAndFiles[run])) + " out of " + str(nFiles) + " for run: " + str(run) 00321 if nFiles - len(runsAndFiles[run]) <= missingFilesTolerance: 00322 timeoutManager("DBS_VERY_BIG_MISMATCH_Run"+str(run)) # resetting this timeout 00323 timeoutType = timeoutManager("DBS_MISMATCH_Run"+str(run),missingLumisTimeout) 00324 if timeoutType == 1: 00325 print "WARNING: I previously set a timeout that expired...I'll continue with the script even if I didn't process all the lumis!" 00326 else: 00327 if timeoutType == -1: 00328 print "WARNING: Setting the DBS_MISMATCH_Run" + str(run) + " timeout because I haven't processed all files!" 00329 else: 00330 print "WARNING: Timeout DBS_MISMATCH_Run" + str(run) + " is in progress." 00331 return filesToProcess 00332 else: 00333 timeoutType = timeoutManager("DBS_VERY_BIG_MISMATCH_Run"+str(run),missingLumisTimeout) 00334 if timeoutType == 1: 00335 error = "ERROR: I previously set a timeout that expired...I can't continue with the script because there are too many (" + str(nFiles - len(runsAndFiles[run])) + " files missing) and for too long " + str(missingLumisTimeout/3600) + " hours! I will process anyway the runs before this one (" + str(run) + ")" 00336 sendEmail(mailList,error) 00337 return filesToProcess 00338 #exit(error) 00339 else: 00340 if timeoutType == -1: 00341 print "WARNING: Setting the DBS_VERY_BIG_MISMATCH_Run" + str(run) + " timeout because I haven't processed all files!" 00342 else: 00343 print "WARNING: Timeout DBS_VERY_BIG_MISMATCH_Run" + str(run) + " is in progress." 00344 return filesToProcess 00345 00346 else: 00347 timeoutManager("DBS_VERY_BIG_MISMATCH_Run"+str(run)) 00348 timeoutManager("DBS_MISMATCH_Run"+str(run)) 00349 print "I have processed " + str(len(runsAndFiles[run])) + " out of " + str(nFiles) + " files that are in DBS. So I should have all the lumis!" 00350 errors = [] 00351 badProcessed = [] 00352 badDBSProcessed = [] 00353 badDBS = [] 00354 badRRProcessed = [] 00355 badRR = [] 00356 #It is important for runsAndLumisProcessed[run] to be the first because the comparision is not == 00357 badDBSProcessed,badDBS = compareLumiLists(runsAndLumisProcessed[run],listOfRunsAndLumiFromDBS[run],errors) 00358 for i in range(0,len(errors)): 00359 errors[i] = errors[i].replace("listA","the processed lumis") 00360 errors[i] = errors[i].replace("listB","DBS") 00361 #print errors 00362 #print badProcessed 00363 #print badDBS 00364 #exit("ciao") 00365 if len(badDBS) != 0: 00366 print "This is weird because I processed more lumis than the ones that are in DBS!" 00367 if len(badDBSProcessed) != 0 and run in rrKeys: 00368 lastError = len(errors) 00369 #print RRList 00370 #It is important for runsAndLumisProcessed[run] to be the first because the comparision is not == 00371 badRRProcessed,badRR = compareLumiLists(runsAndLumisProcessed[run],RRList,errors) 00372 for i in range(0,len(errors)): 00373 errors[i] = errors[i].replace("listA","the processed lumis") 00374 errors[i] = errors[i].replace("listB","Run Registry") 00375 #print errors 00376 #print badProcessed 00377 #print badRunRegistry 00378 00379 if len(badRRProcessed) != 0: 00380 print "I have not processed some of the lumis that are in the run registry for run: " + str(run) 00381 for lumi in badDBSProcessed: 00382 if lumi in badRRProcessed: 00383 badProcessed.append(lumi) 00384 lenA = len(badProcessed) 00385 lenB = len(RRList) 00386 if 100.*lenA/lenB <= dbsTolerancePercent: 00387 print "WARNING: I didn't process " + str(100.*lenA/lenB) + "% of the lumis but I am within the " + str(dbsTolerancePercent) + "% set in the configuration. Which corrispond to " + str(lenA) + " out of " + str(lenB) + " lumis" 00388 #print errors 00389 badProcessed = [] 00390 elif lenA <= dbsTolerance: 00391 print "WARNING: I didn't process " + str(lenA) + " lumis but I am within the " + str(dbsTolerance) + " lumis set in the configuration. Which corrispond to " + str(lenA) + " out of " + str(lenB) + " lumis" 00392 #print errors 00393 badProcessed = [] 00394 else: 00395 error = "ERROR: For run " + str(run) + " I didn't process " + str(100.*lenA/lenB) + "% of the lumis and I am not within the " + str(dbsTolerancePercent) + "% set in the configuration. The number of lumis that I didn't process (" + str(lenA) + " out of " + str(lenB) + ") is greater also than the " + str(dbsTolerance) + " lumis that I can tolerate. I can't process runs >= " + str(run) + " but I'll process the runs before!" 00396 sendEmail(mailList,error) 00397 print error 00398 return filesToProcess 00399 #exit(errors) 00400 #return filesToProcess 00401 elif len(errors) != 0: 00402 print "The number of lumi sections processed didn't match the one in DBS but they cover all the ones in the Run Registry, so it is ok!" 00403 #print errors 00404 00405 #If I get here it means that I passed or the DBS or the RR test 00406 if len(badProcessed) == 0: 00407 for file in runsAndFiles[run]: 00408 filesToProcess.append(file) 00409 else: 00410 #print errors 00411 print "This should never happen because if I have errors I return or exit! Run: " + str(run) 00412 else: 00413 error = "Run " + str(run) + " is in the run registry but it has not been processed yet!" 00414 print error 00415 timeoutType = timeoutManager("MISSING_RUNREGRUN_Run"+str(run),missingLumisTimeout) 00416 if timeoutType == 1: 00417 if len(RRList) <= rrTolerance: 00418 error = "WARNING: I previously set the MISSING_RUNREGRUN_Run" + str(run) + " timeout that expired...I am missing run " + str(run) + " but it only had " + str(len(RRList)) + " <= " + str(rrTolerance) + " lumis. So I will continue and ignore it... " 00419 #print listOfRunsAndLumiFromRR[run] 00420 print error 00421 #sendEmail(mailList,error) 00422 else: 00423 error = "ERROR: I previously set the MISSING_RUNREGRUN_Run" + str(run) + " timeout that expired...I am missing run " + str(run) + " which has " + str(len(RRList)) + " > " + str(rrTolerance) + " lumis. I can't continue but I'll process the runs before this one" 00424 sendEmail(mailList,error) 00425 return filesToProcess 00426 #exit(error) 00427 else: 00428 if timeoutType == -1: 00429 print "WARNING: Setting the MISSING_RUNREGRUN_Run" + str(run) + " timeout because I haven't processed a run!" 00430 else: 00431 print "WARNING: Timeout MISSING_RUNREGRUN_Run" + str(run) + " is in progress." 00432 return filesToProcess 00433 return filesToProcess
string BeamSpotWorkflow::error = "Please set a crab environment in order to get the proper JSON lib" |
Definition at line 50 of file BeamSpotWorkflow.py.