CMS 3D CMS Logo

Functions | Variables
BeamSpotWorkflow Namespace Reference

Functions

def aselectFilesToProcess (listOfFilesToProcess, newRunList)
 
def compareLumiLists (listA, listB, errors=[], tolerance=0)
 
def getLastClosedRun (DBSListOfFiles)
 
def getLastUploadedIOV (tagName, destDB="oracle://cms_orcoff_prod/CMS_COND_31X_BEAMSPOT")
 General functions. More...
 
def getListOfFilesToProcess (dataSet, lastRun=-1)
 
def getListOfRunsAndLumiFromDBS (dataSet, lastRun=-1)
 
def getListOfRunsAndLumiFromFile (firstRun=-1, fileName="")
 
def getListOfRunsAndLumiFromRR (firstRun=-1)
 
def getNewRunList (fromDir, lastUploadedIOV)
 
def getNumberOfFilesToProcessForRun (dataSet, run)
 
def getRunNumberFromDBSName (fileName)
 
def getRunNumberFromFileName (fileName)
 
def main ()
 
def removeUncompleteRuns (newRunList, dataSet)
 
def selectFilesToProcess (listOfRunsAndLumiFromDBS, listOfRunsAndLumiFromRR, newRunList, runListDir, dataSet, mailList, dbsTolerance, dbsTolerancePercent, rrTolerance, missingFilesTolerance, missingLumisTimeout)
 

Variables

 error
 

Function Documentation

def BeamSpotWorkflow.aselectFilesToProcess (   listOfFilesToProcess,
  newRunList 
)

Definition at line 490 of file BeamSpotWorkflow.py.

References cmsRelvalreport.exit, getLastClosedRun(), getRunNumberFromDBSName(), getRunNumberFromFileName(), edm.print(), and str.

490 def aselectFilesToProcess(listOfFilesToProcess,newRunList):
491  selectedFiles = []
492  runsToProcess = {}
493  processedRuns = {}
494  for file in listOfFilesToProcess:
495  run = getRunNumberFromDBSName(file)
496 # print "To process: " + str(run)
497  if run not in runsToProcess:
498  runsToProcess[run] = 1
499  else:
500  runsToProcess[run] = runsToProcess[run] + 1
501 
502  for file in newRunList:
503  run = getRunNumberFromFileName(file)
504 # print "Processed: " + str(run)
505  if run not in processedRuns:
506  processedRuns[run] = 1
507  else:
508  processedRuns[run] = processedRuns[run] + 1
509 
510  #WARNING: getLastClosedRun MUST also have a timeout otherwise the last run will not be considered
511  lastClosedRun = getLastClosedRun(listOfFilesToProcess)
512 # print "LastClosedRun:-" + str(lastClosedRun) + "-"
513 
514  processedRunsKeys = sorted(processedRuns.keys())
515 
516  for run in processedRunsKeys:
517  if run <= lastClosedRun :
518  print("For run " + str(run) + " I have processed " + str(processedRuns[run]) + " files and in DBS there are " + str(runsToProcess[run]) + " files!")
519  if not run in runsToProcess:
520  exit("ERROR: I have a result file for run " + str(run) + " but it doesn't exist in DBS. Impossible but it happened!")
521  lumiList = getDBSLumiListForRun(run)
522  if processedRuns[run] == runsToProcess[run]:
523  for file in newRunList:
524  if run == getRunNumberFromFileName(file):
525  selectedFiles.append(file)
526  else:
527  exit("ERROR: For run " + str(run) + " I have processed " + str(processedRuns[run]) + " files but in DBS there are " + str(runsToProcess[run]) + " files!")
528  return selectedFiles
529 
def getRunNumberFromFileName(fileName)
def getLastClosedRun(DBSListOfFiles)
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
def getRunNumberFromDBSName(fileName)
def aselectFilesToProcess(listOfFilesToProcess, newRunList)
#define str(s)
def BeamSpotWorkflow.compareLumiLists (   listA,
  listB,
  errors = [],
  tolerance = 0 
)

Definition at line 435 of file BeamSpotWorkflow.py.

References objects.autophobj.float, and str.

Referenced by selectFilesToProcess().

435 def compareLumiLists(listA,listB,errors=[],tolerance=0):
436  lenA = len(listA)
437  lenB = len(listB)
438  if lenA < lenB-(lenB*float(tolerance)/100):
439  errors.append("ERROR: The number of lumi sections is different: listA(" + str(lenA) + ")!=(" + str(lenB) + ")listB")
440  #else:
441  #errors.append("Lumi check ok!listA(" + str(lenA) + ")-(" + str(lenB) + ")listB")
442  #print errors
443  listA.sort()
444  listB.sort()
445  #shorter = lenA
446  #if lenB < shorter:
447  # shorter = lenB
448  #a = 0
449  #b = 0
450  badA = []
451  badB = []
452  #print listB
453  #print listA
454  #print len(listA)
455  #print len(listB)
456  #counter = 1
457  for lumi in listA:
458  #print str(counter) + "->" + str(lumi)
459  #counter += 1
460  if not lumi in listB:
461  errors.append("Lumi (" + str(lumi) + ") is in listA but not in listB")
462  badB.append(lumi)
463  #print "Bad B: " + str(lumi)
464  #exit("hola")
465  for lumi in listB:
466  if not lumi in listA:
467  errors.append("Lumi (" + str(lumi) + ") is in listB but not in listA")
468  badA.append(lumi)
469  #print "Bad A: " + str(lumi)
470 
471  return badA,badB
472 
def compareLumiLists(listA, listB, errors=[], tolerance=0)
#define str(s)
def BeamSpotWorkflow.getLastClosedRun (   DBSListOfFiles)

Definition at line 218 of file BeamSpotWorkflow.py.

References getRunNumberFromDBSName().

Referenced by aselectFilesToProcess().

218 def getLastClosedRun(DBSListOfFiles):
219  runs = []
220  for file in DBSListOfFiles:
221  runNumber = getRunNumberFromDBSName(file)
222  if runs.count(runNumber) == 0:
223  runs.append(runNumber)
224 
225  if len(runs) <= 1: #No closed run
226  return -1
227  else:
228  runs.sort()
229  return long(runs[len(runs)-2])
230 
def getLastClosedRun(DBSListOfFiles)
def getRunNumberFromDBSName(fileName)
def BeamSpotWorkflow.getLastUploadedIOV (   tagName,
  destDB = "oracle://cms_orcoff_prod/CMS_COND_31X_BEAMSPOT" 
)

General functions.

Definition at line 57 of file BeamSpotWorkflow.py.

References cmsRelvalreport.exit, spr.find(), and edm.print().

Referenced by main().

57 def getLastUploadedIOV(tagName,destDB="oracle://cms_orcoff_prod/CMS_COND_31X_BEAMSPOT"):
58  #return 582088327592295
59  listIOVCommand = "cmscond_list_iov -c " + destDB + " -P /afs/cern.ch/cms/DB/conddb -t " + tagName
60  dbError = commands.getstatusoutput( listIOVCommand )
61  if dbError[0] != 0 :
62  if dbError[1].find("metadata entry \"" + tagName + "\" does not exist") != -1:
63  print("Creating a new tag because I got the following error contacting the DB")
64  print(dbError[1])
65  return 1
66  #return 133928
67  else:
68  exit("ERROR: Can\'t connect to db because:\n" + dbError[1])
69 
70 
71  aCommand = listIOVCommand + " | grep DB= | tail -1 | awk \'{print $1}\'"
72  output = commands.getstatusoutput( aCommand )
73 
74  #WARNING when we pass to lumi IOV this should be long long
75  if output[1] == '':
76  exit("ERROR: The tag " + tagName + " exists but I can't get the value of the last IOV")
77 
78  return long(output[1])
79 
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:20
def getLastUploadedIOV(tagName, destDB="oracle://cms_orcoff_prod/CMS_COND_31X_BEAMSPOT")
General functions.
def BeamSpotWorkflow.getListOfFilesToProcess (   dataSet,
  lastRun = -1 
)

Definition at line 81 of file BeamSpotWorkflow.py.

References split, and str.

81 def getListOfFilesToProcess(dataSet,lastRun=-1):
82  queryCommand = "dbs --search --query \"find file where dataset=" + dataSet
83  if lastRun != -1:
84  queryCommand = queryCommand + " and run > " + str(lastRun)
85  queryCommand = queryCommand + "\" | grep .root"
86 # print " >> " + queryCommand
87  output = commands.getstatusoutput( queryCommand )
88  return output[1].split('\n')
89 
def getListOfFilesToProcess(dataSet, lastRun=-1)
#define str(s)
double split
Definition: MVATrainer.cc:139
def BeamSpotWorkflow.getListOfRunsAndLumiFromDBS (   dataSet,
  lastRun = -1 
)

Definition at line 101 of file BeamSpotWorkflow.py.

References mps_setup.append, cmsRelvalreport.exit, spr.find(), edm.print(), split, and str.

Referenced by main().

101 def getListOfRunsAndLumiFromDBS(dataSet,lastRun=-1):
102  datasetList = dataSet.split(',')
103  outputList = []
104  for data in datasetList:
105  queryCommand = "dbs --search --query \"find run,lumi where dataset=" + data
106  if lastRun != -1:
107  queryCommand = queryCommand + " and run > " + str(lastRun)
108  queryCommand += "\""
109  print(" >> " + queryCommand)
110  output = []
111  for i in range(0,3):
112  output = commands.getstatusoutput( queryCommand )
113  if output[0] == 0 and not (output[1].find("ERROR") != -1 or output[1].find("Error") != -1) :
114  break
115  if output[0] != 0:
116  exit("ERROR: I can't contact DBS for the following reason:\n" + output[1])
117  #print output[1]
118  tmpList = output[1].split('\n')
119  for file in tmpList:
120  outputList.append(file)
121  runsAndLumis = {}
122  for out in outputList:
123  regExp = re.search('(\d+)\s+(\d+)',out)
124  if regExp:
125  run = long(regExp.group(1))
126  lumi = long(regExp.group(2))
127  if not run in runsAndLumis:
128  runsAndLumis[run] = []
129  runsAndLumis[run].append(lumi)
130 
131 # print runsAndLumis
132 # exit("ok")
133  return runsAndLumis
134 
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:20
def getListOfRunsAndLumiFromDBS(dataSet, lastRun=-1)
#define str(s)
double split
Definition: MVATrainer.cc:139
def BeamSpotWorkflow.getListOfRunsAndLumiFromFile (   firstRun = -1,
  fileName = "" 
)

Definition at line 136 of file BeamSpotWorkflow.py.

Referenced by main().

136 def getListOfRunsAndLumiFromFile(firstRun=-1,fileName=""):
137  file = open(fileName);
138  jsonFile = file.read();
139  file.close()
140  jsonList=json.loads(jsonFile);
141 
142  selected_dcs = {};
143  for element in jsonList:
144  selected_dcs[long(element)]=jsonList[element]
145  return selected_dcs
146 
def getListOfRunsAndLumiFromFile(firstRun=-1, fileName="")
def BeamSpotWorkflow.getListOfRunsAndLumiFromRR (   firstRun = -1)

Definition at line 148 of file BeamSpotWorkflow.py.

References edm.print(), and str.

Referenced by main().

148 def getListOfRunsAndLumiFromRR(firstRun=-1):
149  RunReg ="http://pccmsdqm04.cern.ch/runregistry"
150  #RunReg = "http://localhost:40010/runregistry"
151  #Dataset=%Online%
152  Group = "Collisions10"
153 
154  # get handler to RR XML-RPC server
155  FULLADDRESS=RunReg + "/xmlrpc"
156  #print "RunRegistry from: ",FULLADDRESS
157  server = xmlrpclib.ServerProxy(FULLADDRESS)
158  #sel_runtable="{groupName} ='" + Group + "' and {runNumber} > " + str(firstRun) + " and {datasetName} LIKE '" + Dataset + "'"
159  sel_runtable="{groupName} ='" + Group + "' and {runNumber} > " + str(firstRun)
160  #sel_dcstable="{groupName} ='" + Group + "' and {runNumber} > " + str(firstRun) + " and {parDcsBpix} = 1 and {parDcsFpix} = 1 and {parDcsTibtid} = 1 and {parDcsTecM} = 1 and {parDcsTecP} = 1 and {parDcsTob} = 1 and {parDcsEbminus} = 1 and {parDcsEbplus} = 1 and {parDcsEeMinus} = 1 and {parDcsEePlus} = 1 and {parDcsEsMinus} = 1 and {parDcsEsPlus} = 1 and {parDcsHbheA} = 1 and {parDcsHbheB} = 1 and {parDcsHbheC} = 1 and {parDcsH0} = 1 and {parDcsHf} = 1"
161 
162  maxAttempts = 3;
163  tries = 0;
164  while tries<maxAttempts:
165  try:
166  run_data = server.DataExporter.export('RUN' , 'GLOBAL', 'csv_runs', sel_runtable)
167  #dcs_data = server.DataExporter.export('RUNLUMISECTION', 'GLOBAL', 'json' , sel_dcstable)
168  break
169  except:
170  print("Something wrong in accessing runregistry, retrying in 2s....", tries, "/", maxAttempts)
171  tries += 1
172  time.sleep(2)
173  if tries==maxAttempts:
174  error = "Run registry unaccessible.....exiting now"
175  return {};
176 
177 
178  listOfRuns=[]
179  for line in run_data.split("\n"):
180  run=line.split(',')[0]
181  if run.isdigit():
182  listOfRuns.append(run)
183 
184 
185  firstRun = listOfRuns[len(listOfRuns)-1];
186  lastRun = listOfRuns[0];
187  sel_dcstable="{groupName} ='" + Group + "' and {runNumber} >= " + str(firstRun) + " and {runNumber} <= " + str(lastRun) + " and {parDcsBpix} = 1 and {parDcsFpix} = 1 and {parDcsTibtid} = 1 and {parDcsTecM} = 1 and {parDcsTecP} = 1 and {parDcsTob} = 1 and {parDcsEbminus} = 1 and {parDcsEbplus} = 1 and {parDcsEeMinus} = 1 and {parDcsEePlus} = 1 and {parDcsEsMinus} = 1 and {parDcsEsPlus} = 1 and {parDcsHbheA} = 1 and {parDcsHbheB} = 1 and {parDcsHbheC} = 1 and {parDcsH0} = 1 and {parDcsHf} = 1"
188 
189  tries = 0;
190  while tries<maxAttempts:
191  try:
192  #run_data = server.DataExporter.export('RUN' , 'GLOBAL', 'csv_runs', sel_runtable)
193  dcs_data = server.DataExporter.export('RUNLUMISECTION', 'GLOBAL', 'json' , sel_dcstable)
194  break
195  except:
196  print("I was able to get the list of runs and now I am trying to access the detector status, retrying in 2s....", tries, "/", maxAttempts)
197  tries += 1
198  time.sleep(2)
199  if tries==maxAttempts:
200  error = "Run registry unaccessible.....exiting now"
201  return {};
202 
203  selected_dcs={}
204  jsonList=json.loads(dcs_data)
205 
206  #for element in jsonList:
207  for element in listOfRuns:
208  #if element in listOfRuns:
209  if element in jsonList:
210  selected_dcs[long(element)]=jsonList[element]
211  else:
212  print("WARNING: Run " + element + " is a collision10 run with 0 lumis in Run Registry!")
213  selected_dcs[long(element)]= [[]]
214  #print selected_dcs
215  return selected_dcs
216 
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
def getListOfRunsAndLumiFromRR(firstRun=-1)
#define str(s)
def BeamSpotWorkflow.getNewRunList (   fromDir,
  lastUploadedIOV 
)

Definition at line 247 of file BeamSpotWorkflow.py.

References getRunNumberFromFileName(), and eostools.ls().

Referenced by main().

247 def getNewRunList(fromDir,lastUploadedIOV):
248  newRunList = []
249  listOfFiles = ls(fromDir,".txt")
250  runFileMap = {}
251  for fileName in listOfFiles:
252  runNumber = getRunNumberFromFileName(fileName)
253  if runNumber > lastUploadedIOV:
254  newRunList.append(fileName)
255  return newRunList
256 
def getRunNumberFromFileName(fileName)
def getNewRunList(fromDir, lastUploadedIOV)
def ls(path, rec=False)
Definition: eostools.py:349
def BeamSpotWorkflow.getNumberOfFilesToProcessForRun (   dataSet,
  run 
)

Definition at line 91 of file BeamSpotWorkflow.py.

References split, and str.

Referenced by removeUncompleteRuns(), and selectFilesToProcess().

92  queryCommand = "dbs --search --query \"find file where dataset=" + dataSet + " and run = " + str(run) + "\" | grep .root"
93  #print " >> " + queryCommand
94  output = commands.getstatusoutput( queryCommand )
95  if output[0] != 0:
96  return 0
97  else:
98  return len(output[1].split('\n'))
99 
def getNumberOfFilesToProcessForRun(dataSet, run)
#define str(s)
double split
Definition: MVATrainer.cc:139
def BeamSpotWorkflow.getRunNumberFromDBSName (   fileName)

Definition at line 240 of file BeamSpotWorkflow.py.

Referenced by aselectFilesToProcess(), and getLastClosedRun().

241  regExp = re.search('(\D+)/(\d+)/(\d+)/(\d+)/(\D+)',fileName)
242  if not regExp:
243  return -1
244  return long(regExp.group(3)+regExp.group(4))
245 
def getRunNumberFromDBSName(fileName)
def BeamSpotWorkflow.getRunNumberFromFileName (   fileName)

Definition at line 232 of file BeamSpotWorkflow.py.

Referenced by aselectFilesToProcess(), getNewRunList(), and removeUncompleteRuns().

233 # regExp = re.search('(\D+)_(\d+)_(\d+)_(\d+)',fileName)
234  regExp = re.search('(\D+)_(\d+)_(\d+)_',fileName)
235  if not regExp:
236  return -1
237  return long(regExp.group(3))
238 
def getRunNumberFromFileName(fileName)
def BeamSpotWorkflow.main ( )

Definition at line 531 of file BeamSpotWorkflow.py.

References CommonMethods.appendSqliteFile(), CommonMethods.checkLock(), SimDataFormats::CaloAnalysis.cp, CommonMethods.createWeightedPayloads(), CommonMethods.dirExists(), CommonMethods.dumpValues(), cmsRelvalreport.exit, FrontierConditions_GlobalTag_cff.file, objects.autophobj.float, getLastUploadedIOV(), getListOfRunsAndLumiFromDBS(), getListOfRunsAndLumiFromFile(), getListOfRunsAndLumiFromRR(), getNewRunList(), createfilelist.int, CommonMethods.lock(), timeUnitHelper.pack(), dumpparser.parse(), edm.print(), CommonMethods.readBeamSpotFile(), CommonMethods.readSqliteFile(), CommonMethods.rmLock(), selectFilesToProcess(), CommonMethods.sendEmail(), CommonMethods.setLockName(), CommonMethods.sortAndCleanBeamList(), str, timeUnitHelper.unpackLumiid(), CommonMethods.uploadSqliteFile(), and CommonMethods.writeSqliteFile().

531 def main():
532  ######### COMMAND LINE OPTIONS ##############
533  option,args = parse(__doc__)
534 
535  ######### Check if there is already a megascript running ########
536  if option.lock:
537  setLockName('.' + option.lock)
538  if checkLock():
539  print("There is already a megascript runnning...exiting")
540  return
541  else:
542  lock()
543 
544 
545  destDB = 'oracle://cms_orcon_prod/CMS_COND_31X_BEAMSPOT'
546  if option.Test:
547  destDB = 'oracle://cms_orcoff_prep/CMS_COND_BEAMSPOT'
548 
549  ######### CONFIGURATION FILE ################
550  cfgFile = "BeamSpotWorkflow.cfg"
551  if option.cfg:
552  cfgFile = option.cfg
553  configurationFile = os.getenv("CMSSW_BASE") + "/src/RecoVertex/BeamSpotProducer/scripts/" + cfgFile
554  configuration = ConfigParser.ConfigParser()
555  print('Reading configuration from ', configurationFile)
556  configuration.read(configurationFile)
557 
558  sourceDir = configuration.get('Common','SOURCE_DIR')
559  archiveDir = configuration.get('Common','ARCHIVE_DIR')
560  workingDir = configuration.get('Common','WORKING_DIR')
561  databaseTag = configuration.get('Common','DBTAG')
562  dataSet = configuration.get('Common','DATASET')
563  fileIOVBase = configuration.get('Common','FILE_IOV_BASE')
564  dbIOVBase = configuration.get('Common','DB_IOV_BASE')
565  dbsTolerance = float(configuration.get('Common','DBS_TOLERANCE'))
566  dbsTolerancePercent = float(configuration.get('Common','DBS_TOLERANCE_PERCENT'))
567  rrTolerance = float(configuration.get('Common','RR_TOLERANCE'))
568  missingFilesTolerance = float(configuration.get('Common','MISSING_FILES_TOLERANCE'))
569  missingLumisTimeout = float(configuration.get('Common','MISSING_LUMIS_TIMEOUT'))
570  jsonFileName = configuration.get('Common','JSON_FILE')
571  mailList = configuration.get('Common','EMAIL')
572 
573  ######### DIRECTORIES SETUP #################
574  if sourceDir[len(sourceDir)-1] != '/':
575  sourceDir = sourceDir + '/'
576  if not dirExists(sourceDir):
577  error = "ERROR: The source directory " + sourceDir + " doesn't exist!"
578  sendEmail(mailList,error)
579  exit(error)
580 
581  if archiveDir[len(archiveDir)-1] != '/':
582  archiveDir = archiveDir + '/'
583  if not os.path.isdir(archiveDir):
584  os.mkdir(archiveDir)
585 
586  if workingDir[len(workingDir)-1] != '/':
587  workingDir = workingDir + '/'
588  if not os.path.isdir(workingDir):
589  os.mkdir(workingDir)
590  else:
591  os.system("rm -f "+ workingDir + "*")
592 
593 
594  print("Getting last IOV for tag: " + databaseTag)
595  lastUploadedIOV = 1
596  if destDB == "oracle://cms_orcon_prod/CMS_COND_31X_BEAMSPOT":
597  lastUploadedIOV = getLastUploadedIOV(databaseTag)
598  else:
599  lastUploadedIOV = getLastUploadedIOV(databaseTag,destDB)
600 
601  #lastUploadedIOV = 133885
602  #lastUploadedIOV = 575216380019329
603  if dbIOVBase == "lumiid":
604  lastUploadedIOV = unpackLumiid(lastUploadedIOV)["run"]
605 
606  ######### Get list of files processed after the last IOV
607  print("Getting list of files processed after IOV " + str(lastUploadedIOV))
608  newProcessedRunList = getNewRunList(sourceDir,lastUploadedIOV)
609  if len(newProcessedRunList) == 0:
610  exit("There are no new runs after " + str(lastUploadedIOV))
611 
612  ######### Copy files to archive directory
613  print("Copying files to archive directory")
614  copiedFiles = []
615  for i in range(3):
616  copiedFiles = cp(sourceDir,archiveDir,newProcessedRunList)
617  if len(copiedFiles) == len(newProcessedRunList):
618  break;
619  if len(copiedFiles) != len(newProcessedRunList):
620  error = "ERROR: I can't copy more than " + str(len(copiedFiles)) + " files out of " + str(len(newProcessedRunList))
621  sendEmail(mailList,error)
622  exit(error)
623 
624 
625  ######### Get from DBS the list of files after last IOV
626  #listOfFilesToProcess = getListOfFilesToProcess(dataSet,lastUploadedIOV)
627  print("Getting list of files from DBS")
628  listOfRunsAndLumiFromDBS = getListOfRunsAndLumiFromDBS(dataSet,lastUploadedIOV)
629  if len(listOfRunsAndLumiFromDBS) == 0:
630  exit("There are no files in DBS to process")
631  print("Getting list of files from RR")
632  listOfRunsAndLumiFromRR = getListOfRunsAndLumiFromRR(lastUploadedIOV)
633  if(not listOfRunsAndLumiFromRR):
634  print("Looks like I can't get anything from the run registry so I'll get the data from the json file " + jsonFileName)
635  listOfRunsAndLumiFromRR = getListOfRunsAndLumiFromFile(lastUploadedIOV,jsonFileName)
636  ######### Get list of files to process for DB
637  #selectedFilesToProcess = selectFilesToProcess(listOfFilesToProcess,copiedFiles)
638  #completeProcessedRuns = removeUncompleteRuns(copiedFiles,dataSet)
639  #print copiedFiles
640  #print completeProcessedRuns
641  #exit("complete")
642  print("Getting list of files to process")
643  selectedFilesToProcess = selectFilesToProcess(listOfRunsAndLumiFromDBS,listOfRunsAndLumiFromRR,copiedFiles,archiveDir,dataSet,mailList,dbsTolerance,dbsTolerancePercent,rrTolerance,missingFilesTolerance,missingLumisTimeout)
644  if len(selectedFilesToProcess) == 0:
645  exit("There are no files to process")
646 
647  #print selectedFilesToProcess
648  ######### Copy files to working directory
649  print("Copying files from archive to working directory")
650  copiedFiles = []
651  for i in range(3):
652  copiedFiles = cp(archiveDir,workingDir,selectedFilesToProcess)
653  if len(copiedFiles) == len(selectedFilesToProcess):
654  break;
655  else:
656  commands.getstatusoutput("rm -rf " + workingDir)
657  if len(copiedFiles) != len(selectedFilesToProcess):
658  error = "ERROR: I can't copy more than " + str(len(copiedFiles)) + " files out of " + str(len(selectedFilesToProcess)) + " from " + archiveDir + " to " + workingDir
659  sendEmail(mailList,error)
660  exit(error)
661 
662  print("Sorting and cleaning beamlist")
663  beamSpotObjList = []
664  for fileName in copiedFiles:
665  readBeamSpotFile(workingDir+fileName,beamSpotObjList,fileIOVBase)
666 
667  sortAndCleanBeamList(beamSpotObjList,fileIOVBase)
668 
669  if len(beamSpotObjList) == 0:
670  error = "WARNING: None of the processed and copied payloads has a valid fit so there are no results. This shouldn't happen since we are filtering using the run register, so there should be at least one good run."
671  exit(error)
672 
673  payloadFileName = "PayloadFile.txt"
674 
675  runBased = False
676  if dbIOVBase == "runnumber":
677  runBased = True
678 
679  payloadList = createWeightedPayloads(workingDir+payloadFileName,beamSpotObjList,runBased)
680  if len(payloadList) == 0:
681  error = "WARNING: I wasn't able to create any payload even if I have some BeamSpot objects."
682  exit(error)
683 
684 
685  tmpPayloadFileName = workingDir + "SingleTmpPayloadFile.txt"
686  tmpSqliteFileName = workingDir + "SingleTmpSqliteFile.db"
687 
688  writeDBTemplate = os.getenv("CMSSW_BASE") + "/src/RecoVertex/BeamSpotProducer/test/write2DB_template.py"
689  readDBTemplate = os.getenv("CMSSW_BASE") + "/src/RecoVertex/BeamSpotProducer/test/readDB_template.py"
690  payloadNumber = -1
691  iovSinceFirst = '0';
692  iovTillLast = '0';
693 
694  #Creating the final name for the combined sqlite file
695  uuid = commands.getstatusoutput('uuidgen -t')[1]
696  final_sqlite_file_name = databaseTag + '@' + uuid
697  sqlite_file = workingDir + final_sqlite_file_name + ".db"
698  metadata_file = workingDir + final_sqlite_file_name + ".txt"
699 
700  for payload in payloadList:
701  payloadNumber += 1
702  if option.zlarge:
703  payload.sigmaZ = 10
704  payload.sigmaZerr = 2.5e-05
705  tmpFile = file(tmpPayloadFileName,'w')
706  dumpValues(payload,tmpFile)
707  tmpFile.close()
708  if not writeSqliteFile(tmpSqliteFileName,databaseTag,dbIOVBase,tmpPayloadFileName,writeDBTemplate,workingDir):
709  error = "An error occurred while writing the sqlite file: " + tmpSqliteFileName
710  exit(error)
711  readSqliteFile(tmpSqliteFileName,databaseTag,readDBTemplate,workingDir)
712 
713  ##############################################################
714  #WARNING I am not sure if I am packing the right values
715  if dbIOVBase == "runnumber":
716  iov_since = str(payload.Run)
717  iov_till = iov_since
718  elif dbIOVBase == "lumiid":
719  iov_since = str( pack(int(payload.Run), int(payload.IOVfirst)) )
720  iov_till = str( pack(int(payload.Run), int(payload.IOVlast)) )
721  elif dbIOVBase == "timestamp":
722  error = "ERROR: IOV " + dbIOVBase + " still not implemented."
723  exit(error)
724  else:
725  error = "ERROR: IOV " + dbIOVBase + " unrecognized!"
726  exit(error)
727 
728  if payloadNumber == 0:
729  iovSinceFirst = iov_since
730  if payloadNumber == len(payloadList)-1:
731  iovTillLast = iov_till
732 
733  appendSqliteFile(final_sqlite_file_name + ".db", tmpSqliteFileName, databaseTag, iov_since, iov_till ,workingDir)
734  os.system("rm -f " + tmpPayloadFileName + " " + tmpSqliteFileName)
735 
736 
737  #### CREATE payload for merged output
738 
739  print(" create MERGED payload card for dropbox ...")
740 
741  dfile = open(metadata_file,'w')
742 
743  dfile.write('destDB ' + destDB +'\n')
744  dfile.write('tag ' + databaseTag +'\n')
745  dfile.write('inputtag' +'\n')
746  dfile.write('since ' + iovSinceFirst +'\n')
747  #dfile.write('till ' + iov_till +'\n')
748  dfile.write('Timetype '+ dbIOVBase +'\n')
749 
750  ###################################################
751  # WARNING tagType forced to offline
752  print("WARNING TAG TYPE forced to be just offline")
753  tagType = "offline"
754  checkType = tagType
755  if tagType == "express":
756  checkType = "hlt"
757  dfile.write('IOVCheck ' + checkType + '\n')
758  dfile.write('usertext Beam spot position\n')
759 
760  dfile.close()
761 
762 
763 
764  if option.upload:
765  print(" scp files to offline Drop Box")
766  dropbox = "/DropBox"
767  if option.Test:
768  dropbox = "/DropBox_test"
769  print("UPLOADING TO TEST DB")
770  uploadSqliteFile(workingDir, final_sqlite_file_name, dropbox)
771 
772  archive_sqlite_file_name = "Payloads_" + iovSinceFirst + "_" + iovTillLast + "_" + final_sqlite_file_name
773  archive_results_file_name = "Payloads_" + iovSinceFirst + "_" + iovTillLast + "_" + databaseTag + ".txt"
774  if not os.path.isdir(archiveDir + 'payloads'):
775  os.mkdir(archiveDir + 'payloads')
776  commands.getstatusoutput('mv ' + sqlite_file + ' ' + archiveDir + 'payloads/' + archive_sqlite_file_name + '.db')
777  commands.getstatusoutput('mv ' + metadata_file + ' ' + archiveDir + 'payloads/' + archive_sqlite_file_name + '.txt')
778  commands.getstatusoutput('cp ' + workingDir + payloadFileName + ' ' + archiveDir + 'payloads/' + archive_results_file_name)
779 
780  print(archiveDir + "payloads/" + archive_sqlite_file_name + '.db')
781  print(archiveDir + "payloads/" + archive_sqlite_file_name + '.txt')
782 
783  rmLock()
784 
def dirExists(dir)
def pack(high, low)
def readBeamSpotFile(fileName, listbeam=[], IOVbase="runbase", firstRun='1', lastRun='4999999999')
def dumpValues(beam, file)
def setLockName(name)
def appendSqliteFile(combinedSqliteFileName, sqliteFileName, tagName, IOVSince, IOVTill, tmpDir="/tmp/")
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
def createWeightedPayloads(fileName, listbeam=[], weighted=True)
CREATE FILE FOR PAYLOADS.
def uploadSqliteFile(sqliteFileDirName, sqliteFileName, dropbox="/DropBox")
def getLastUploadedIOV(tagName, destDB="oracle://cms_orcoff_prod/CMS_COND_31X_BEAMSPOT")
General functions.
def getListOfRunsAndLumiFromFile(firstRun=-1, fileName="")
def sortAndCleanBeamList(listbeam=[], IOVbase="lumibase")
Sort and clean list of data for consecutive duplicates and bad fits.
def getNewRunList(fromDir, lastUploadedIOV)
def readSqliteFile(sqliteFileName, tagName, sqliteTemplateFile, tmpDir="/tmp/")
def parse(path, config)
Definition: dumpparser.py:13
def getListOfRunsAndLumiFromDBS(dataSet, lastRun=-1)
def selectFilesToProcess(listOfRunsAndLumiFromDBS, listOfRunsAndLumiFromRR, newRunList, runListDir, dataSet, mailList, dbsTolerance, dbsTolerancePercent, rrTolerance, missingFilesTolerance, missingLumisTimeout)
def writeSqliteFile(sqliteFileName, tagName, timeType, beamSpotFile, sqliteTemplateFile, tmpDir="/tmp/")
def sendEmail(mailList, error)
General utilities.
def getListOfRunsAndLumiFromRR(firstRun=-1)
#define str(s)
def BeamSpotWorkflow.removeUncompleteRuns (   newRunList,
  dataSet 
)

Definition at line 474 of file BeamSpotWorkflow.py.

References getNumberOfFilesToProcessForRun(), getRunNumberFromFileName(), edm.print(), and str.

474 def removeUncompleteRuns(newRunList,dataSet):
475  processedRuns = {}
476  for fileName in newRunList:
477  run = getRunNumberFromFileName(fileName)
478  if not run in processedRuns:
479  processedRuns[run] = 0
480  processedRuns[run] += 1
481 
482  for run in processedRuns.keys():
483  nFiles = getNumberOfFilesToProcessForRun(dataSet,run)
484  if processedRuns[run] < nFiles:
485  print("I haven't processed all files yet : " + str(processedRuns[run]) + " out of " + str(nFiles) + " for run: " + str(run))
486  else:
487  print("All files have been processed for run: " + str(run) + " (" + str(processedRuns[run]) + " out of " + str(nFiles) + ")")
488 
def getRunNumberFromFileName(fileName)
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
def getNumberOfFilesToProcessForRun(dataSet, run)
def removeUncompleteRuns(newRunList, dataSet)
#define str(s)
def BeamSpotWorkflow.selectFilesToProcess (   listOfRunsAndLumiFromDBS,
  listOfRunsAndLumiFromRR,
  newRunList,
  runListDir,
  dataSet,
  mailList,
  dbsTolerance,
  dbsTolerancePercent,
  rrTolerance,
  missingFilesTolerance,
  missingLumisTimeout 
)

Definition at line 258 of file BeamSpotWorkflow.py.

References mps_setup.append, compareLumiLists(), cmsRelvalreport.exit, getNumberOfFilesToProcessForRun(), edm.print(), python.rootplot.root2matplotlib.replace(), CommonMethods.sendEmail(), split, str, and CommonMethods.timeoutManager().

Referenced by main().

258 def selectFilesToProcess(listOfRunsAndLumiFromDBS,listOfRunsAndLumiFromRR,newRunList,runListDir,dataSet,mailList,dbsTolerance,dbsTolerancePercent,rrTolerance,missingFilesTolerance,missingLumisTimeout):
259  runsAndLumisProcessed = {}
260  runsAndFiles = {}
261  for fileName in newRunList:
262  file = open(runListDir+fileName)
263  for line in file:
264  if line.find("Runnumber") != -1:
265  run = long(line.replace('\n','').split(' ')[1])
266  elif line.find("LumiRange") != -1:
267  lumiLine = line.replace('\n','').split(' ')
268  begLumi = long(lumiLine[1])
269  endLumi = long(lumiLine[3])
270  if begLumi != endLumi:
271  error = "The lumi range is greater than 1 for run " + str(run) + " " + line + " in file: " + runListDir + fileName
272  exit(error)
273  else:
274  if not run in runsAndLumisProcessed:
275  runsAndLumisProcessed[run] = []
276  if begLumi in runsAndLumisProcessed[run]:
277  print("Lumi " + str(begLumi) + " in event " + str(run) + " already exist. This MUST not happen but right now I will ignore this lumi!")
278  else:
279  runsAndLumisProcessed[run].append(begLumi)
280  if not run in runsAndFiles:
281  runsAndFiles[run] = []
282  runsAndFiles[run].append(fileName)
283  file.close()
284 
285  rrKeys = sorted(listOfRunsAndLumiFromRR.keys())
286  dbsKeys = listOfRunsAndLumiFromDBS.keys()
287  dbsKeys.sort()
288  #I remove the last entry from DBS since I am not sure it is an already closed run!
289  lastUnclosedRun = dbsKeys.pop()
290  #print "Last unclosed run: " + str(lastUnclosedRun)
291  procKeys = runsAndLumisProcessed.keys()
292  procKeys.sort()
293  #print "Run Registry:"
294  #print rrKeys
295  #print "DBS:"
296  #print dbsKeys
297  #print "List:"
298  #print procKeys
299  #print lastUnclosedRun
300  filesToProcess = []
301  for run in rrKeys:
302  RRList = []
303  for lumiRange in listOfRunsAndLumiFromRR[run]:
304  if lumiRange != []:
305  for l in range(lumiRange[0],lumiRange[1]+1):
306  RRList.append(long(l))
307  if run in procKeys and run < lastUnclosedRun:
308  #print "run " + str(run) + " is in procKeys"
309  if not run in dbsKeys and run != lastUnclosedRun:
310  error = "Impossible but run " + str(run) + " has been processed and it is also in the run registry but it is not in DBS!"
311  exit(error)
312  print("Working on run " + str(run))
313  nFiles = 0
314  for data in dataSet.split(','):
315  nFiles = getNumberOfFilesToProcessForRun(data,run)
316  if nFiles != 0:
317  break
318  if len(runsAndFiles[run]) < nFiles:
319  print("I haven't processed all files yet : " + str(len(runsAndFiles[run])) + " out of " + str(nFiles) + " for run: " + str(run))
320  if nFiles - len(runsAndFiles[run]) <= missingFilesTolerance:
321  timeoutManager("DBS_VERY_BIG_MISMATCH_Run"+str(run)) # resetting this timeout
322  timeoutType = timeoutManager("DBS_MISMATCH_Run"+str(run),missingLumisTimeout)
323  if timeoutType == 1:
324  print("WARNING: I previously set a timeout that expired...I'll continue with the script even if I didn't process all the lumis!")
325  else:
326  if timeoutType == -1:
327  print("WARNING: Setting the DBS_MISMATCH_Run" + str(run) + " timeout because I haven't processed all files!")
328  else:
329  print("WARNING: Timeout DBS_MISMATCH_Run" + str(run) + " is in progress.")
330  return filesToProcess
331  else:
332  timeoutType = timeoutManager("DBS_VERY_BIG_MISMATCH_Run"+str(run),missingLumisTimeout)
333  if timeoutType == 1:
334  error = "ERROR: I previously set a timeout that expired...I can't continue with the script because there are too many (" + str(nFiles - len(runsAndFiles[run])) + " files missing) and for too long " + str(missingLumisTimeout/3600) + " hours! I will process anyway the runs before this one (" + str(run) + ")"
335  sendEmail(mailList,error)
336  return filesToProcess
337  #exit(error)
338  else:
339  if timeoutType == -1:
340  print("WARNING: Setting the DBS_VERY_BIG_MISMATCH_Run" + str(run) + " timeout because I haven't processed all files!")
341  else:
342  print("WARNING: Timeout DBS_VERY_BIG_MISMATCH_Run" + str(run) + " is in progress.")
343  return filesToProcess
344 
345  else:
346  timeoutManager("DBS_VERY_BIG_MISMATCH_Run"+str(run))
347  timeoutManager("DBS_MISMATCH_Run"+str(run))
348  print("I have processed " + str(len(runsAndFiles[run])) + " out of " + str(nFiles) + " files that are in DBS. So I should have all the lumis!")
349  errors = []
350  badProcessed = []
351  badDBSProcessed = []
352  badDBS = []
353  badRRProcessed = []
354  badRR = []
355  #It is important for runsAndLumisProcessed[run] to be the first because the comparision is not ==
356  badDBSProcessed,badDBS = compareLumiLists(runsAndLumisProcessed[run],listOfRunsAndLumiFromDBS[run],errors)
357  for i in range(0,len(errors)):
358  errors[i] = errors[i].replace("listA","the processed lumis")
359  errors[i] = errors[i].replace("listB","DBS")
360  #print errors
361  #print badProcessed
362  #print badDBS
363  #exit("ciao")
364  if len(badDBS) != 0:
365  print("This is weird because I processed more lumis than the ones that are in DBS!")
366  if len(badDBSProcessed) != 0 and run in rrKeys:
367  lastError = len(errors)
368  #print RRList
369  #It is important for runsAndLumisProcessed[run] to be the first because the comparision is not ==
370  badRRProcessed,badRR = compareLumiLists(runsAndLumisProcessed[run],RRList,errors)
371  for i in range(0,len(errors)):
372  errors[i] = errors[i].replace("listA","the processed lumis")
373  errors[i] = errors[i].replace("listB","Run Registry")
374  #print errors
375  #print badProcessed
376  #print badRunRegistry
377 
378  if len(badRRProcessed) != 0:
379  print("I have not processed some of the lumis that are in the run registry for run: " + str(run))
380  for lumi in badDBSProcessed:
381  if lumi in badRRProcessed:
382  badProcessed.append(lumi)
383  lenA = len(badProcessed)
384  lenB = len(RRList)
385  if 100.*lenA/lenB <= dbsTolerancePercent:
386  print("WARNING: I didn't process " + str(100.*lenA/lenB) + "% of the lumis but I am within the " + str(dbsTolerancePercent) + "% set in the configuration. Which corrispond to " + str(lenA) + " out of " + str(lenB) + " lumis")
387  #print errors
388  badProcessed = []
389  elif lenA <= dbsTolerance:
390  print("WARNING: I didn't process " + str(lenA) + " lumis but I am within the " + str(dbsTolerance) + " lumis set in the configuration. Which corrispond to " + str(lenA) + " out of " + str(lenB) + " lumis")
391  #print errors
392  badProcessed = []
393  else:
394  error = "ERROR: For run " + str(run) + " I didn't process " + str(100.*lenA/lenB) + "% of the lumis and I am not within the " + str(dbsTolerancePercent) + "% set in the configuration. The number of lumis that I didn't process (" + str(lenA) + " out of " + str(lenB) + ") is greater also than the " + str(dbsTolerance) + " lumis that I can tolerate. I can't process runs >= " + str(run) + " but I'll process the runs before!"
395  sendEmail(mailList,error)
396  print(error)
397  return filesToProcess
398  #exit(errors)
399  #return filesToProcess
400  elif len(errors) != 0:
401  print("The number of lumi sections processed didn't match the one in DBS but they cover all the ones in the Run Registry, so it is ok!")
402  #print errors
403 
404  #If I get here it means that I passed or the DBS or the RR test
405  if len(badProcessed) == 0:
406  for file in runsAndFiles[run]:
407  filesToProcess.append(file)
408  else:
409  #print errors
410  print("This should never happen because if I have errors I return or exit! Run: " + str(run))
411  else:
412  error = "Run " + str(run) + " is in the run registry but it has not been processed yet!"
413  print(error)
414  timeoutType = timeoutManager("MISSING_RUNREGRUN_Run"+str(run),missingLumisTimeout)
415  if timeoutType == 1:
416  if len(RRList) <= rrTolerance:
417  error = "WARNING: I previously set the MISSING_RUNREGRUN_Run" + str(run) + " timeout that expired...I am missing run " + str(run) + " but it only had " + str(len(RRList)) + " <= " + str(rrTolerance) + " lumis. So I will continue and ignore it... "
418  #print listOfRunsAndLumiFromRR[run]
419  print(error)
420  #sendEmail(mailList,error)
421  else:
422  error = "ERROR: I previously set the MISSING_RUNREGRUN_Run" + str(run) + " timeout that expired...I am missing run " + str(run) + " which has " + str(len(RRList)) + " > " + str(rrTolerance) + " lumis. I can't continue but I'll process the runs before this one"
423  sendEmail(mailList,error)
424  return filesToProcess
425  #exit(error)
426  else:
427  if timeoutType == -1:
428  print("WARNING: Setting the MISSING_RUNREGRUN_Run" + str(run) + " timeout because I haven't processed a run!")
429  else:
430  print("WARNING: Timeout MISSING_RUNREGRUN_Run" + str(run) + " is in progress.")
431  return filesToProcess
432 
433  return filesToProcess
def replace(string, replacements)
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
def compareLumiLists(listA, listB, errors=[], tolerance=0)
def getNumberOfFilesToProcessForRun(dataSet, run)
def selectFilesToProcess(listOfRunsAndLumiFromDBS, listOfRunsAndLumiFromRR, newRunList, runListDir, dataSet, mailList, dbsTolerance, dbsTolerancePercent, rrTolerance, missingFilesTolerance, missingLumisTimeout)
def timeoutManager(type, timeout=-1, fileName=".timeout")
def sendEmail(mailList, error)
General utilities.
#define str(s)
double split
Definition: MVATrainer.cc:139

Variable Documentation

BeamSpotWorkflow.error

Definition at line 51 of file BeamSpotWorkflow.py.