Functions | |
def | aselectFilesToProcess |
def | compareLumiLists |
def | getLastClosedRun |
def | getLastUploadedIOV |
General functions. | |
def | getListOfFilesToProcess |
def | getListOfRunsAndLumiFromDBS |
def | getListOfRunsAndLumiFromRR |
def | getNewRunList |
def | getNumberOfFilesToProcessForRun |
def | getRunNumberFromDBSName |
def | getRunNumberFromFileName |
def | main |
def | removeUncompleteRuns |
def | selectFilesToProcess |
Variables | |
string | error = "Please set a crab environment in order to get the proper JSON lib" |
def BeamSpotWorkflow::aselectFilesToProcess | ( | listOfFilesToProcess, | |
newRunList | |||
) |
Definition at line 459 of file BeamSpotWorkflow.py.
00460 : 00461 selectedFiles = [] 00462 runsToProcess = {} 00463 processedRuns = {} 00464 for file in listOfFilesToProcess: 00465 run = getRunNumberFromDBSName(file) 00466 # print "To process: " + str(run) 00467 if run not in runsToProcess: 00468 runsToProcess[run] = 1 00469 else: 00470 runsToProcess[run] = runsToProcess[run] + 1 00471 00472 for file in newRunList: 00473 run = getRunNumberFromFileName(file) 00474 # print "Processed: " + str(run) 00475 if run not in processedRuns: 00476 processedRuns[run] = 1 00477 else: 00478 processedRuns[run] = processedRuns[run] + 1 00479 00480 #WARNING: getLastClosedRun MUST also have a timeout otherwise the last run will not be considered 00481 lastClosedRun = getLastClosedRun(listOfFilesToProcess) 00482 # print "LastClosedRun:-" + str(lastClosedRun) + "-" 00483 00484 processedRunsKeys = processedRuns.keys() 00485 processedRunsKeys.sort() 00486 00487 for run in processedRunsKeys: 00488 if run <= lastClosedRun : 00489 print "For run " + str(run) + " I have processed " + str(processedRuns[run]) + " files and in DBS there are " + str(runsToProcess[run]) + " files!" 00490 if not run in runsToProcess: 00491 exit("ERROR: I have a result file for run " + str(run) + " but it doesn't exist in DBS. Impossible but it happened!") 00492 lumiList = getDBSLumiListForRun(run) 00493 if processedRuns[run] == runsToProcess[run]: 00494 for file in newRunList: 00495 if run == getRunNumberFromFileName(file): 00496 selectedFiles.append(file) 00497 else: 00498 exit("ERROR: For run " + str(run) + " I have processed " + str(processedRuns[run]) + " files but in DBS there are " + str(runsToProcess[run]) + " files!") 00499 return selectedFiles
def BeamSpotWorkflow::compareLumiLists | ( | listA, | |
listB, | |||
errors = [] , |
|||
tolerance = 0 |
|||
) |
Definition at line 404 of file BeamSpotWorkflow.py.
00405 : 00406 lenA = len(listA) 00407 lenB = len(listB) 00408 if lenA < lenB-(lenB*float(tolerance)/100): 00409 errors.append("ERROR: The number of lumi sections is different: listA(" + str(lenA) + ")!=(" + str(lenB) + ")listB") 00410 #else: 00411 #errors.append("Lumi check ok!listA(" + str(lenA) + ")-(" + str(lenB) + ")listB") 00412 #print errors 00413 listA.sort() 00414 listB.sort() 00415 #shorter = lenA 00416 #if lenB < shorter: 00417 # shorter = lenB 00418 #a = 0 00419 #b = 0 00420 badA = [] 00421 badB = [] 00422 #print listB 00423 #print listA 00424 #print len(listA) 00425 #print len(listB) 00426 #counter = 1 00427 for lumi in listA: 00428 #print str(counter) + "->" + str(lumi) 00429 #counter += 1 00430 if not lumi in listB: 00431 errors.append("Lumi (" + str(lumi) + ") is in listA but not in listB") 00432 badB.append(lumi) 00433 #print "Bad B: " + str(lumi) 00434 #exit("hola") 00435 for lumi in listB: 00436 if not lumi in listA: 00437 errors.append("Lumi (" + str(lumi) + ") is in listB but not in listA") 00438 badA.append(lumi) 00439 #print "Bad A: " + str(lumi) 00440 00441 return badA,badB
def BeamSpotWorkflow::getLastClosedRun | ( | DBSListOfFiles | ) |
Definition at line 186 of file BeamSpotWorkflow.py.
00187 : 00188 runs = [] 00189 for file in DBSListOfFiles: 00190 runNumber = getRunNumberFromDBSName(file) 00191 if runs.count(runNumber) == 0: 00192 runs.append(runNumber) 00193 00194 if len(runs) <= 1: #No closed run 00195 return -1 00196 else: 00197 runs.sort() 00198 return long(runs[len(runs)-2])
def BeamSpotWorkflow::getLastUploadedIOV | ( | tagName, | |
destDB = "oracle://cms_orcoff_prod/CMS_COND_31X_BEAMSPOT" |
|||
) |
General functions.
Definition at line 56 of file BeamSpotWorkflow.py.
00056 ://cms_orcoff_prod/CMS_COND_31X_BEAMSPOT"): 00057 #return 582088327592295 00058 listIOVCommand = "cmscond_list_iov -c " + destDB + " -P /afs/cern.ch/cms/DB/conddb -t " + tagName 00059 dbError = commands.getstatusoutput( listIOVCommand ) 00060 if dbError[0] != 0 : 00061 if dbError[1].find("metadata entry \"" + tagName + "\" does not exist") != -1: 00062 print "Creating a new tag because I got the following error contacting the DB" 00063 print dbError[1] 00064 return 1 00065 #return 133928 00066 else: 00067 exit("ERROR: Can\'t connect to db because:\n" + dbError[1]) 00068 00069 00070 aCommand = listIOVCommand + " | grep DB= | tail -1 | awk \'{print $1}\'" 00071 output = commands.getstatusoutput( aCommand ) 00072 00073 #WARNING when we pass to lumi IOV this should be long long 00074 if output[1] == '': 00075 exit("ERROR: The tag " + tagName + " exists but I can't get the value of the last IOV") 00076 00077 return long(output[1]) 00078
def BeamSpotWorkflow::getListOfFilesToProcess | ( | dataSet, | |
lastRun = -1 |
|||
) |
Definition at line 80 of file BeamSpotWorkflow.py.
00081 : 00082 queryCommand = "dbs --search --query \"find file where dataset=" + dataSet 00083 if lastRun != -1: 00084 queryCommand = queryCommand + " and run > " + str(lastRun) 00085 queryCommand = queryCommand + "\" | grep .root" 00086 # print " >> " + queryCommand 00087 output = commands.getstatusoutput( queryCommand ) 00088 return output[1].split('\n')
def BeamSpotWorkflow::getListOfRunsAndLumiFromDBS | ( | dataSet, | |
lastRun = -1 |
|||
) |
Definition at line 100 of file BeamSpotWorkflow.py.
00101 : 00102 datasetList = dataSet.split(',') 00103 outputList = [] 00104 for data in datasetList: 00105 queryCommand = "dbs --search --query \"find run,lumi where dataset=" + data 00106 if lastRun != -1: 00107 queryCommand = queryCommand + " and run > " + str(lastRun) 00108 queryCommand += "\"" 00109 print " >> " + queryCommand 00110 output = [] 00111 for i in range(0,3): 00112 output = commands.getstatusoutput( queryCommand ) 00113 if output[0] == 0 and not (output[1].find("ERROR") != -1 or output[1].find("Error") != -1) : 00114 break 00115 if output[0] != 0: 00116 exit("ERROR: I can't contact DBS for the following reason:\n" + output[1]) 00117 #print output[1] 00118 tmpList = output[1].split('\n') 00119 for file in tmpList: 00120 outputList.append(file) 00121 runsAndLumis = {} 00122 for out in outputList: 00123 regExp = re.search('(\d+)\s+(\d+)',out) 00124 if regExp: 00125 run = long(regExp.group(1)) 00126 lumi = long(regExp.group(2)) 00127 if not run in runsAndLumis: 00128 runsAndLumis[run] = [] 00129 runsAndLumis[run].append(lumi) 00130 00131 # print runsAndLumis 00132 # exit("ok") 00133 return runsAndLumis
def BeamSpotWorkflow::getListOfRunsAndLumiFromRR | ( | lastRun = -1 | ) |
Definition at line 135 of file BeamSpotWorkflow.py.
00136 : 00137 RunReg ="https://pccmsdqm04.cern.ch/runregistry" 00138 #RunReg = "https://localhost:40010/runregistry" 00139 #Dataset=%Online% 00140 Group = "Collisions10" 00141 00142 # get handler to RR XML-RPC server 00143 FULLADDRESS=RunReg + "/xmlrpc" 00144 #print "RunRegistry from: ",FULLADDRESS 00145 server = xmlrpclib.ServerProxy(FULLADDRESS) 00146 #sel_runtable="{groupName} ='" + Group + "' and {runNumber} > " + str(lastRun) + " and {datasetName} LIKE '" + Dataset + "'" 00147 sel_runtable="{groupName} ='" + Group + "' and {runNumber} > " + str(lastRun) 00148 sel_dcstable="{groupName} ='" + Group + "' and {runNumber} > " + str(lastRun) + " and {parDcsBpix} = 1 and {parDcsFpix} = 1 and {parDcsTibtid} = 1 and {parDcsTecM} = 1 and {parDcsTecP} = 1 and {parDcsTob} = 1 and {parDcsEbminus} = 1 and {parDcsEbplus} = 1 and {parDcsEeMinus} = 1 and {parDcsEePlus} = 1 and {parDcsEsMinus} = 1 and {parDcsEsPlus} = 1 and {parDcsHbheA} = 1 and {parDcsHbheB} = 1 and {parDcsHbheC} = 1 and {parDcsH0} = 1 and {parDcsHf} = 1" 00149 00150 tries = 0; 00151 while tries<10: 00152 try: 00153 run_data = server.DataExporter.export('RUN' , 'GLOBAL', 'csv_runs', sel_runtable) 00154 dcs_data = server.DataExporter.export('RUNLUMISECTION', 'GLOBAL', 'json' , sel_dcstable) 00155 break 00156 except: 00157 print "Something wrong in accessing runregistry, retrying in 5s...." 00158 tries += 1 00159 time.sleep(5) 00160 if tries==10: 00161 error = "Run registry unaccessible.....exiting now" 00162 exit(error) 00163 00164 00165 listOfRuns=[] 00166 for line in run_data.split("\n"): 00167 run=line.split(',')[0] 00168 if run.isdigit(): 00169 listOfRuns.append(run) 00170 00171 00172 selected_dcs={} 00173 jsonList=json.loads(dcs_data) 00174 00175 #for element in jsonList: 00176 for element in listOfRuns: 00177 #if element in listOfRuns: 00178 if element in jsonList: 00179 selected_dcs[long(element)]=jsonList[element] 00180 else: 00181 print "WARNING: Run " + element + " is a collision10 run with 0 lumis in Run Registry!" 00182 selected_dcs[long(element)]= [[]] 00183 #print selected_dcs 00184 return selected_dcs
def BeamSpotWorkflow::getNewRunList | ( | fromDir, | |
lastUploadedIOV | |||
) |
Definition at line 215 of file BeamSpotWorkflow.py.
def BeamSpotWorkflow::getNumberOfFilesToProcessForRun | ( | dataSet, | |
run | |||
) |
Definition at line 90 of file BeamSpotWorkflow.py.
00091 : 00092 queryCommand = "dbs --search --query \"find file where dataset=" + dataSet + " and run = " + str(run) + "\" | grep .root" 00093 #print " >> " + queryCommand 00094 output = commands.getstatusoutput( queryCommand ) 00095 if output[0] != 0: 00096 return 0 00097 else: 00098 return len(output[1].split('\n'))
def BeamSpotWorkflow::getRunNumberFromDBSName | ( | fileName | ) |
Definition at line 208 of file BeamSpotWorkflow.py.
def BeamSpotWorkflow::getRunNumberFromFileName | ( | fileName | ) |
Definition at line 200 of file BeamSpotWorkflow.py.
def BeamSpotWorkflow::main | ( | ) |
Definition at line 501 of file BeamSpotWorkflow.py.
00502 : 00503 ######### COMMAND LINE OPTIONS ############## 00504 option,args = parse(__doc__) 00505 00506 ######### Check if there is already a megascript running ######## 00507 if option.lock: 00508 setLockName('.' + option.lock) 00509 if checkLock(): 00510 print "There is already a megascript runnning...exiting" 00511 return 00512 else: 00513 lock() 00514 00515 00516 destDB = 'oracle://cms_orcon_prod/CMS_COND_31X_BEAMSPOT' 00517 if option.Test: 00518 destDB = 'oracle://cms_orcoff_prep/CMS_COND_BEAMSPOT' 00519 00520 ######### CONFIGURATION FILE ################ 00521 cfgFile = "BeamSpotWorkflow.cfg" 00522 if option.cfg: 00523 cfgFile = option.cfg 00524 configurationFile = os.getenv("CMSSW_BASE") + "/src/RecoVertex/BeamSpotProducer/scripts/" + cfgFile 00525 configuration = ConfigParser.ConfigParser() 00526 print 'Reading configuration from ', configurationFile 00527 configuration.read(configurationFile) 00528 00529 sourceDir = configuration.get('Common','SOURCE_DIR') 00530 archiveDir = configuration.get('Common','ARCHIVE_DIR') 00531 workingDir = configuration.get('Common','WORKING_DIR') 00532 databaseTag = configuration.get('Common','DBTAG') 00533 dataSet = configuration.get('Common','DATASET') 00534 fileIOVBase = configuration.get('Common','FILE_IOV_BASE') 00535 dbIOVBase = configuration.get('Common','DB_IOV_BASE') 00536 dbsTolerance = float(configuration.get('Common','DBS_TOLERANCE')) 00537 dbsTolerancePercent = float(configuration.get('Common','DBS_TOLERANCE_PERCENT')) 00538 rrTolerance = float(configuration.get('Common','RR_TOLERANCE')) 00539 missingFilesTolerance = float(configuration.get('Common','MISSING_FILES_TOLERANCE')) 00540 missingLumisTimeout = float(configuration.get('Common','MISSING_LUMIS_TIMEOUT')) 00541 mailList = configuration.get('Common','EMAIL') 00542 00543 ######### DIRECTORIES SETUP ################# 00544 if sourceDir[len(sourceDir)-1] != '/': 00545 sourceDir = sourceDir + '/' 00546 if not dirExists(sourceDir): 00547 error = "ERROR: The source directory " + sourceDir + " doesn't exist!" 00548 sendEmail(mailList,error) 00549 exit(error) 00550 00551 if archiveDir[len(archiveDir)-1] != '/': 00552 archiveDir = archiveDir + '/' 00553 if not os.path.isdir(archiveDir): 00554 os.mkdir(archiveDir) 00555 00556 if workingDir[len(workingDir)-1] != '/': 00557 workingDir = workingDir + '/' 00558 if not os.path.isdir(workingDir): 00559 os.mkdir(workingDir) 00560 else: 00561 os.system("rm -f "+ workingDir + "*") 00562 00563 00564 print "Getting last IOV for tag: " + databaseTag 00565 lastUploadedIOV = 1 00566 if destDB == "oracle://cms_orcon_prod/CMS_COND_31X_BEAMSPOT": 00567 lastUploadedIOV = getLastUploadedIOV(databaseTag) 00568 else: 00569 lastUploadedIOV = getLastUploadedIOV(databaseTag,destDB) 00570 00571 #lastUploadedIOV = 133885 00572 #lastUploadedIOV = 575216380019329 00573 if dbIOVBase == "lumiid": 00574 lastUploadedIOV = unpackLumiid(lastUploadedIOV)["run"] 00575 00576 ######### Get list of files processed after the last IOV 00577 print "Getting list of files processed after IOV " + str(lastUploadedIOV) 00578 newProcessedRunList = getNewRunList(sourceDir,lastUploadedIOV) 00579 if len(newProcessedRunList) == 0: 00580 exit("There are no new runs after " + str(lastUploadedIOV)) 00581 00582 ######### Copy files to archive directory 00583 print "Copying files to archive directory" 00584 copiedFiles = [] 00585 for i in range(3): 00586 copiedFiles = cp(sourceDir,archiveDir,newProcessedRunList) 00587 if len(copiedFiles) == len(newProcessedRunList): 00588 break; 00589 if len(copiedFiles) != len(newProcessedRunList): 00590 error = "ERROR: I can't copy more than " + str(len(copiedFiles)) + " files out of " + str(len(newProcessedRunList)) 00591 sendEmail(mailList,error) 00592 exit(error) 00593 00594 00595 ######### Get from DBS the list of files after last IOV 00596 #listOfFilesToProcess = getListOfFilesToProcess(dataSet,lastUploadedIOV) 00597 print "Getting list of files from DBS" 00598 listOfRunsAndLumiFromDBS = getListOfRunsAndLumiFromDBS(dataSet,lastUploadedIOV) 00599 if len(listOfRunsAndLumiFromDBS) == 0: 00600 exit("There are no files in DBS to process") 00601 print "Getting list of files from RR" 00602 listOfRunsAndLumiFromRR = getListOfRunsAndLumiFromRR(lastUploadedIOV) 00603 ######### Get list of files to process for DB 00604 #selectedFilesToProcess = selectFilesToProcess(listOfFilesToProcess,copiedFiles) 00605 #completeProcessedRuns = removeUncompleteRuns(copiedFiles,dataSet) 00606 #print copiedFiles 00607 #print completeProcessedRuns 00608 #exit("complete") 00609 print "Getting list of files to process" 00610 selectedFilesToProcess = selectFilesToProcess(listOfRunsAndLumiFromDBS,listOfRunsAndLumiFromRR,copiedFiles,archiveDir,dataSet,mailList,dbsTolerance,dbsTolerancePercent,rrTolerance,missingFilesTolerance,missingLumisTimeout) 00611 if len(selectedFilesToProcess) == 0: 00612 exit("There are no files to process") 00613 00614 #print selectedFilesToProcess 00615 ######### Copy files to working directory 00616 print "Copying files from archive to working directory" 00617 copiedFiles = [] 00618 for i in range(3): 00619 copiedFiles = cp(archiveDir,workingDir,selectedFilesToProcess) 00620 if len(copiedFiles) == len(selectedFilesToProcess): 00621 break; 00622 else: 00623 commands.getstatusoutput("rm -rf " + workingDir) 00624 if len(copiedFiles) != len(selectedFilesToProcess): 00625 error = "ERROR: I can't copy more than " + str(len(copiedFiles)) + " files out of " + str(len(selectedFilesToProcess)) + " from " + archiveDir + " to " + workingDir 00626 sendEmail(mailList,error) 00627 exit(error) 00628 00629 print "Sorting and cleaning beamlist" 00630 beamSpotObjList = [] 00631 for fileName in copiedFiles: 00632 readBeamSpotFile(workingDir+fileName,beamSpotObjList,fileIOVBase) 00633 00634 sortAndCleanBeamList(beamSpotObjList,fileIOVBase) 00635 00636 if len(beamSpotObjList) == 0: 00637 error = "WARNING: None of the processed and copied payloads has a valid fit so there are no results. This shouldn't happen since we are filtering using the run register, so there should be at least one good run." 00638 exit(error) 00639 00640 payloadFileName = "PayloadFile.txt" 00641 00642 runBased = False 00643 if dbIOVBase == "runnumber": 00644 runBased = True 00645 00646 payloadList = createWeightedPayloads(workingDir+payloadFileName,beamSpotObjList,runBased) 00647 if len(payloadList) == 0: 00648 error = "WARNING: I wasn't able to create any payload even if I have some BeamSpot objects." 00649 exit(error) 00650 00651 00652 tmpPayloadFileName = workingDir + "SingleTmpPayloadFile.txt" 00653 tmpSqliteFileName = workingDir + "SingleTmpSqliteFile.db" 00654 00655 writeDBTemplate = os.getenv("CMSSW_BASE") + "/src/RecoVertex/BeamSpotProducer/test/write2DB_template.py" 00656 readDBTemplate = os.getenv("CMSSW_BASE") + "/src/RecoVertex/BeamSpotProducer/test/readDB_template.py" 00657 payloadNumber = -1 00658 iovSinceFirst = '0'; 00659 iovTillLast = '0'; 00660 00661 #Creating the final name for the combined sqlite file 00662 uuid = commands.getstatusoutput('uuidgen -t')[1] 00663 final_sqlite_file_name = databaseTag + '@' + uuid 00664 sqlite_file = workingDir + final_sqlite_file_name + ".db" 00665 metadata_file = workingDir + final_sqlite_file_name + ".txt" 00666 00667 for payload in payloadList: 00668 payloadNumber += 1 00669 if option.zlarge: 00670 payload.sigmaZ = 10 00671 payload.sigmaZerr = 2.5e-05 00672 tmpFile = file(tmpPayloadFileName,'w') 00673 dumpValues(payload,tmpFile) 00674 tmpFile.close() 00675 if not writeSqliteFile(tmpSqliteFileName,databaseTag,dbIOVBase,tmpPayloadFileName,writeDBTemplate,workingDir): 00676 error = "An error occurred while writing the sqlite file: " + tmpSqliteFileName 00677 exit(error) 00678 readSqliteFile(tmpSqliteFileName,databaseTag,readDBTemplate,workingDir) 00679 00680 ############################################################## 00681 #WARNING I am not sure if I am packing the right values 00682 if dbIOVBase == "runnumber": 00683 iov_since = str(payload.Run) 00684 iov_till = iov_since 00685 elif dbIOVBase == "lumiid": 00686 iov_since = str( pack(int(payload.Run), int(payload.IOVfirst)) ) 00687 iov_till = str( pack(int(payload.Run), int(payload.IOVlast)) ) 00688 elif dbIOVBase == "timestamp": 00689 error = "ERROR: IOV " + dbIOVBase + " still not implemented." 00690 exit(error) 00691 else: 00692 error = "ERROR: IOV " + dbIOVBase + " unrecognized!" 00693 exit(error) 00694 00695 if payloadNumber == 0: 00696 iovSinceFirst = iov_since 00697 if payloadNumber == len(payloadList)-1: 00698 iovTillLast = iov_till 00699 00700 appendSqliteFile(final_sqlite_file_name + ".db", tmpSqliteFileName, databaseTag, iov_since, iov_till ,workingDir) 00701 os.system("rm -f " + tmpPayloadFileName + " " + tmpSqliteFileName) 00702 00703 00704 #### CREATE payload for merged output 00705 00706 print " create MERGED payload card for dropbox ..." 00707 00708 dfile = open(metadata_file,'w') 00709 00710 dfile.write('destDB ' + destDB +'\n') 00711 dfile.write('tag ' + databaseTag +'\n') 00712 dfile.write('inputtag' +'\n') 00713 dfile.write('since ' + iovSinceFirst +'\n') 00714 #dfile.write('till ' + iov_till +'\n') 00715 dfile.write('Timetype '+ dbIOVBase +'\n') 00716 00717 ################################################### 00718 # WARNING tagType forced to offline 00719 print "WARNING TAG TYPE forced to be just offline" 00720 tagType = "offline" 00721 checkType = tagType 00722 if tagType == "express": 00723 checkType = "hlt" 00724 dfile.write('IOVCheck ' + checkType + '\n') 00725 dfile.write('usertext Beam spot position\n') 00726 00727 dfile.close() 00728 00729 00730 00731 if option.upload: 00732 print " scp files to offline Drop Box" 00733 dropbox = "/DropBox" 00734 if option.Test: 00735 dropbox = "/DropBox_test" 00736 print "UPLOADING TO TEST DB" 00737 uploadSqliteFile(workingDir, final_sqlite_file_name, dropbox) 00738 00739 archive_sqlite_file_name = "Payloads_" + iovSinceFirst + "_" + iovTillLast + "_" + final_sqlite_file_name 00740 archive_results_file_name = "Payloads_" + iovSinceFirst + "_" + iovTillLast + "_" + databaseTag + ".txt" 00741 if not os.path.isdir(archiveDir + 'payloads'): 00742 os.mkdir(archiveDir + 'payloads') 00743 commands.getstatusoutput('mv ' + sqlite_file + ' ' + archiveDir + 'payloads/' + archive_sqlite_file_name + '.db') 00744 commands.getstatusoutput('mv ' + metadata_file + ' ' + archiveDir + 'payloads/' + archive_sqlite_file_name + '.txt') 00745 commands.getstatusoutput('cp ' + workingDir + payloadFileName + ' ' + archiveDir + 'payloads/' + archive_results_file_name) 00746 00747 print archiveDir + "payloads/" + archive_sqlite_file_name + '.db' 00748 print archiveDir + "payloads/" + archive_sqlite_file_name + '.txt' 00749 00750 rmLock()
def BeamSpotWorkflow::removeUncompleteRuns | ( | newRunList, | |
dataSet | |||
) |
Definition at line 443 of file BeamSpotWorkflow.py.
00444 : 00445 processedRuns = {} 00446 for fileName in newRunList: 00447 run = getRunNumberFromFileName(fileName) 00448 if not run in processedRuns: 00449 processedRuns[run] = 0 00450 processedRuns[run] += 1 00451 00452 for run in processedRuns.keys(): 00453 nFiles = getNumberOfFilesToProcessForRun(dataSet,run) 00454 if processedRuns[run] < nFiles: 00455 print "I haven't processed all files yet : " + str(processedRuns[run]) + " out of " + str(nFiles) + " for run: " + str(run) 00456 else: 00457 print "All files have been processed for run: " + str(run) + " (" + str(processedRuns[run]) + " out of " + str(nFiles) + ")"
def BeamSpotWorkflow::selectFilesToProcess | ( | listOfRunsAndLumiFromDBS, | |
listOfRunsAndLumiFromRR, | |||
newRunList, | |||
runListDir, | |||
dataSet, | |||
mailList, | |||
dbsTolerance, | |||
dbsTolerancePercent, | |||
rrTolerance, | |||
missingFilesTolerance, | |||
missingLumisTimeout | |||
) |
Definition at line 226 of file BeamSpotWorkflow.py.
00227 : 00228 runsAndLumisProcessed = {} 00229 runsAndFiles = {} 00230 for fileName in newRunList: 00231 file = open(runListDir+fileName) 00232 for line in file: 00233 if line.find("Runnumber") != -1: 00234 run = long(line.replace('\n','').split(' ')[1]) 00235 elif line.find("LumiRange") != -1: 00236 lumiLine = line.replace('\n','').split(' ') 00237 begLumi = long(lumiLine[1]) 00238 endLumi = long(lumiLine[3]) 00239 if begLumi != endLumi: 00240 error = "The lumi range is greater than 1 for run " + str(run) + " " + line + " in file: " + runListDir + fileName 00241 exit(error) 00242 else: 00243 if not run in runsAndLumisProcessed: 00244 runsAndLumisProcessed[run] = [] 00245 if begLumi in runsAndLumisProcessed[run]: 00246 print "Lumi " + str(begLumi) + " in event " + str(run) + " already exist. This MUST not happen but right now I will ignore this lumi!" 00247 else: 00248 runsAndLumisProcessed[run].append(begLumi) 00249 if not run in runsAndFiles: 00250 runsAndFiles[run] = [] 00251 runsAndFiles[run].append(fileName) 00252 file.close() 00253 00254 rrKeys = listOfRunsAndLumiFromRR.keys() 00255 rrKeys.sort() 00256 dbsKeys = listOfRunsAndLumiFromDBS.keys() 00257 dbsKeys.sort() 00258 #I remove the last entry from DBS since I am not sure it is an already closed run! 00259 lastUnclosedRun = dbsKeys.pop() 00260 #print "Last unclosed run: " + str(lastUnclosedRun) 00261 procKeys = runsAndLumisProcessed.keys() 00262 procKeys.sort() 00263 #print "Run Registry:" 00264 #print rrKeys 00265 #print "DBS:" 00266 #print dbsKeys 00267 #print "List:" 00268 #print procKeys 00269 #print lastUnclosedRun 00270 filesToProcess = [] 00271 for run in rrKeys: 00272 RRList = [] 00273 for lumiRange in listOfRunsAndLumiFromRR[run]: 00274 if lumiRange != []: 00275 for l in range(lumiRange[0],lumiRange[1]+1): 00276 RRList.append(long(l)) 00277 if run in procKeys and run < lastUnclosedRun: 00278 #print "run " + str(run) + " is in procKeys" 00279 if not run in dbsKeys and run != lastUnclosedRun: 00280 error = "Impossible but run " + str(run) + " has been processed and it is also in the run registry but it is not in DBS!" 00281 exit(error) 00282 print "Working on run " + str(run) 00283 nFiles = 0 00284 for data in dataSet.split(','): 00285 nFiles = getNumberOfFilesToProcessForRun(data,run) 00286 if nFiles != 0: 00287 break 00288 if len(runsAndFiles[run]) < nFiles: 00289 print "I haven't processed all files yet : " + str(len(runsAndFiles[run])) + " out of " + str(nFiles) + " for run: " + str(run) 00290 if nFiles - len(runsAndFiles[run]) <= missingFilesTolerance: 00291 timeoutManager("DBS_VERY_BIG_MISMATCH_Run"+str(run)) # resetting this timeout 00292 timeoutType = timeoutManager("DBS_MISMATCH_Run"+str(run),missingLumisTimeout) 00293 if timeoutType == 1: 00294 print "WARNING: I previously set a timeout that expired...I'll continue with the script even if I didn't process all the lumis!" 00295 else: 00296 if timeoutType == -1: 00297 print "WARNING: Setting the DBS_MISMATCH_Run" + str(run) + " timeout because I haven't processed all files!" 00298 else: 00299 print "WARNING: Timeout DBS_MISMATCH_Run" + str(run) + " is in progress." 00300 return filesToProcess 00301 else: 00302 timeoutType = timeoutManager("DBS_VERY_BIG_MISMATCH_Run"+str(run),missingLumisTimeout) 00303 if timeoutType == 1: 00304 error = "ERROR: I previously set a timeout that expired...I can't continue with the script because there are too many (" + str(nFiles - len(runsAndFiles[run])) + " files missing) and for too long " + str(missingLumisTimeout/3600) + " hours! I will process anyway the runs before this one (" + str(run) + ")" 00305 sendEmail(mailList,error) 00306 return filesToProcess 00307 #exit(error) 00308 else: 00309 if timeoutType == -1: 00310 print "WARNING: Setting the DBS_VERY_BIG_MISMATCH_Run" + str(run) + " timeout because I haven't processed all files!" 00311 else: 00312 print "WARNING: Timeout DBS_VERY_BIG_MISMATCH_Run" + str(run) + " is in progress." 00313 return filesToProcess 00314 00315 else: 00316 timeoutManager("DBS_VERY_BIG_MISMATCH_Run"+str(run)) 00317 timeoutManager("DBS_MISMATCH_Run"+str(run)) 00318 print "I have processed " + str(len(runsAndFiles[run])) + " out of " + str(nFiles) + " files that are in DBS. So I should have all the lumis!" 00319 errors = [] 00320 badProcessed = [] 00321 badDBSProcessed = [] 00322 badDBS = [] 00323 badRRProcessed = [] 00324 badRR = [] 00325 #It is important for runsAndLumisProcessed[run] to be the first because the comparision is not == 00326 badDBSProcessed,badDBS = compareLumiLists(runsAndLumisProcessed[run],listOfRunsAndLumiFromDBS[run],errors) 00327 for i in range(0,len(errors)): 00328 errors[i] = errors[i].replace("listA","the processed lumis") 00329 errors[i] = errors[i].replace("listB","DBS") 00330 #print errors 00331 #print badProcessed 00332 #print badDBS 00333 #exit("ciao") 00334 if len(badDBS) != 0: 00335 print "This is weird because I processed more lumis than the ones that are in DBS!" 00336 if len(badDBSProcessed) != 0 and run in rrKeys: 00337 lastError = len(errors) 00338 #print RRList 00339 #It is important for runsAndLumisProcessed[run] to be the first because the comparision is not == 00340 badRRProcessed,badRR = compareLumiLists(runsAndLumisProcessed[run],RRList,errors) 00341 for i in range(0,len(errors)): 00342 errors[i] = errors[i].replace("listA","the processed lumis") 00343 errors[i] = errors[i].replace("listB","Run Registry") 00344 #print errors 00345 #print badProcessed 00346 #print badRunRegistry 00347 00348 if len(badRRProcessed) != 0: 00349 print "I have not processed some of the lumis that are in the run registry for run: " + str(run) 00350 for lumi in badDBSProcessed: 00351 if lumi in badRRProcessed: 00352 badProcessed.append(lumi) 00353 lenA = len(badProcessed) 00354 lenB = len(RRList) 00355 if 100.*lenA/lenB <= dbsTolerancePercent: 00356 print "WARNING: I didn't process " + str(100.*lenA/lenB) + "% of the lumis but I am within the " + str(dbsTolerancePercent) + "% set in the configuration. Which corrispond to " + str(lenA) + " out of " + str(lenB) + " lumis" 00357 #print errors 00358 badProcessed = [] 00359 elif lenA <= dbsTolerance: 00360 print "WARNING: I didn't process " + str(lenA) + " lumis but I am within the " + str(dbsTolerance) + " lumis set in the configuration. Which corrispond to " + str(lenA) + " out of " + str(lenB) + " lumis" 00361 #print errors 00362 badProcessed = [] 00363 else: 00364 error = "ERROR: For run " + str(run) + " I didn't process " + str(100.*lenA/lenB) + "% of the lumis and I am not within the " + str(dbsTolerancePercent) + "% set in the configuration. The number of lumis that I didn't process (" + str(lenA) + " out of " + str(lenB) + ") is greater also than the " + str(dbsTolerance) + " lumis that I can tolerate. I can't process runs >= " + str(run) + " but I'll process the runs before!" 00365 sendEmail(mailList,error) 00366 print error 00367 return filesToProcess 00368 #exit(errors) 00369 #return filesToProcess 00370 elif len(errors) != 0: 00371 print "The number of lumi sections processed didn't match the one in DBS but they cover all the ones in the Run Registry, so it is ok!" 00372 #print errors 00373 00374 #If I get here it means that I passed or the DBS or the RR test 00375 if len(badProcessed) == 0: 00376 for file in runsAndFiles[run]: 00377 filesToProcess.append(file) 00378 else: 00379 #print errors 00380 print "This should never happen because if I have errors I return or exit! Run: " + str(run) 00381 else: 00382 error = "Run " + str(run) + " is in the run registry but it has not been processed yet!" 00383 print error 00384 timeoutType = timeoutManager("MISSING_RUNREGRUN_Run"+str(run),missingLumisTimeout) 00385 if timeoutType == 1: 00386 if len(RRList) <= rrTolerance: 00387 error = "WARNING: I previously set the MISSING_RUNREGRUN_Run" + str(run) + " timeout that expired...I am missing run " + str(run) + " but it only had " + str(len(RRList)) + " <= " + str(rrTolerance) + " lumis. So I will continue and ignore it... " 00388 #print listOfRunsAndLumiFromRR[run] 00389 print error 00390 #sendEmail(mailList,error) 00391 else: 00392 error = "ERROR: I previously set the MISSING_RUNREGRUN_Run" + str(run) + " timeout that expired...I am missing run " + str(run) + " which has " + str(len(RRList)) + " > " + str(rrTolerance) + " lumis. I can't continue but I'll process the runs before this one" 00393 sendEmail(mailList,error) 00394 return filesToProcess 00395 #exit(error) 00396 else: 00397 if timeoutType == -1: 00398 print "WARNING: Setting the MISSING_RUNREGRUN_Run" + str(run) + " timeout because I haven't processed a run!" 00399 else: 00400 print "WARNING: Timeout MISSING_RUNREGRUN_Run" + str(run) + " is in progress." 00401 return filesToProcess 00402 return filesToProcess
string BeamSpotWorkflow::error = "Please set a crab environment in order to get the proper JSON lib" |
Definition at line 50 of file BeamSpotWorkflow.py.