CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
Functions | Variables
BeamSpotWorkflow Namespace Reference

Functions

def aselectFilesToProcess
 
def compareLumiLists
 
def getLastClosedRun
 
def getLastUploadedIOV
 General functions. More...
 
def getListOfFilesToProcess
 
def getListOfRunsAndLumiFromDBS
 
def getListOfRunsAndLumiFromFile
 
def getListOfRunsAndLumiFromRR
 
def getNewRunList
 
def getNumberOfFilesToProcessForRun
 
def getRunNumberFromDBSName
 
def getRunNumberFromFileName
 
def main
 
def removeUncompleteRuns
 
def selectFilesToProcess
 

Variables

string error = "Please set a crab environment in order to get the proper JSON lib"
 

Function Documentation

def BeamSpotWorkflow.aselectFilesToProcess (   listOfFilesToProcess,
  newRunList 
)

Definition at line 490 of file BeamSpotWorkflow.py.

References cmsRelvalreport.exit, getLastClosedRun(), getRunNumberFromDBSName(), and getRunNumberFromFileName().

491 def aselectFilesToProcess(listOfFilesToProcess,newRunList):
492  selectedFiles = []
493  runsToProcess = {}
494  processedRuns = {}
495  for file in listOfFilesToProcess:
496  run = getRunNumberFromDBSName(file)
497 # print "To process: " + str(run)
498  if run not in runsToProcess:
499  runsToProcess[run] = 1
500  else:
501  runsToProcess[run] = runsToProcess[run] + 1
502 
503  for file in newRunList:
504  run = getRunNumberFromFileName(file)
505 # print "Processed: " + str(run)
506  if run not in processedRuns:
507  processedRuns[run] = 1
508  else:
509  processedRuns[run] = processedRuns[run] + 1
510 
511  #WARNING: getLastClosedRun MUST also have a timeout otherwise the last run will not be considered
512  lastClosedRun = getLastClosedRun(listOfFilesToProcess)
513 # print "LastClosedRun:-" + str(lastClosedRun) + "-"
514 
515  processedRunsKeys = processedRuns.keys()
516  processedRunsKeys.sort()
517 
518  for run in processedRunsKeys:
519  if run <= lastClosedRun :
520  print "For run " + str(run) + " I have processed " + str(processedRuns[run]) + " files and in DBS there are " + str(runsToProcess[run]) + " files!"
521  if not run in runsToProcess:
522  exit("ERROR: I have a result file for run " + str(run) + " but it doesn't exist in DBS. Impossible but it happened!")
523  lumiList = getDBSLumiListForRun(run)
524  if processedRuns[run] == runsToProcess[run]:
525  for file in newRunList:
526  if run == getRunNumberFromFileName(file):
527  selectedFiles.append(file)
528  else:
529  exit("ERROR: For run " + str(run) + " I have processed " + str(processedRuns[run]) + " files but in DBS there are " + str(runsToProcess[run]) + " files!")
530  return selectedFiles
def BeamSpotWorkflow.compareLumiLists (   listA,
  listB,
  errors = [],
  tolerance = 0 
)

Definition at line 435 of file BeamSpotWorkflow.py.

Referenced by selectFilesToProcess().

436 def compareLumiLists(listA,listB,errors=[],tolerance=0):
437  lenA = len(listA)
438  lenB = len(listB)
439  if lenA < lenB-(lenB*float(tolerance)/100):
440  errors.append("ERROR: The number of lumi sections is different: listA(" + str(lenA) + ")!=(" + str(lenB) + ")listB")
441  #else:
442  #errors.append("Lumi check ok!listA(" + str(lenA) + ")-(" + str(lenB) + ")listB")
443  #print errors
444  listA.sort()
445  listB.sort()
446  #shorter = lenA
447  #if lenB < shorter:
448  # shorter = lenB
449  #a = 0
450  #b = 0
451  badA = []
452  badB = []
453  #print listB
454  #print listA
455  #print len(listA)
456  #print len(listB)
457  #counter = 1
458  for lumi in listA:
459  #print str(counter) + "->" + str(lumi)
460  #counter += 1
461  if not lumi in listB:
462  errors.append("Lumi (" + str(lumi) + ") is in listA but not in listB")
463  badB.append(lumi)
464  #print "Bad B: " + str(lumi)
465  #exit("hola")
466  for lumi in listB:
467  if not lumi in listA:
468  errors.append("Lumi (" + str(lumi) + ") is in listB but not in listA")
469  badA.append(lumi)
470  #print "Bad A: " + str(lumi)
471 
472  return badA,badB
def BeamSpotWorkflow.getLastClosedRun (   DBSListOfFiles)

Definition at line 217 of file BeamSpotWorkflow.py.

References getRunNumberFromDBSName().

Referenced by aselectFilesToProcess().

218 def getLastClosedRun(DBSListOfFiles):
219  runs = []
220  for file in DBSListOfFiles:
221  runNumber = getRunNumberFromDBSName(file)
222  if runs.count(runNumber) == 0:
223  runs.append(runNumber)
224 
225  if len(runs) <= 1: #No closed run
226  return -1
227  else:
228  runs.sort()
229  return long(runs[len(runs)-2])
def BeamSpotWorkflow.getLastUploadedIOV (   tagName,
  destDB = "oracle://cms_orcoff_prod/CMS_COND_31X_BEAMSPOT" 
)

General functions.

Definition at line 56 of file BeamSpotWorkflow.py.

References cmsRelvalreport.exit, and spr.find().

Referenced by main().

56 
57 def getLastUploadedIOV(tagName,destDB="oracle://cms_orcoff_prod/CMS_COND_31X_BEAMSPOT"):
58  #return 582088327592295
59  listIOVCommand = "cmscond_list_iov -c " + destDB + " -P /afs/cern.ch/cms/DB/conddb -t " + tagName
60  dbError = commands.getstatusoutput( listIOVCommand )
61  if dbError[0] != 0 :
62  if dbError[1].find("metadata entry \"" + tagName + "\" does not exist") != -1:
63  print "Creating a new tag because I got the following error contacting the DB"
64  print dbError[1]
65  return 1
66  #return 133928
67  else:
68  exit("ERROR: Can\'t connect to db because:\n" + dbError[1])
69 
70 
71  aCommand = listIOVCommand + " | grep DB= | tail -1 | awk \'{print $1}\'"
72  output = commands.getstatusoutput( aCommand )
73 
74  #WARNING when we pass to lumi IOV this should be long long
75  if output[1] == '':
76  exit("ERROR: The tag " + tagName + " exists but I can't get the value of the last IOV")
77 
78  return long(output[1])
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:7
def getLastUploadedIOV
General functions.
def BeamSpotWorkflow.getListOfFilesToProcess (   dataSet,
  lastRun = -1 
)

Definition at line 80 of file BeamSpotWorkflow.py.

References split.

80 
81 def getListOfFilesToProcess(dataSet,lastRun=-1):
82  queryCommand = "dbs --search --query \"find file where dataset=" + dataSet
83  if lastRun != -1:
84  queryCommand = queryCommand + " and run > " + str(lastRun)
85  queryCommand = queryCommand + "\" | grep .root"
86 # print " >> " + queryCommand
87  output = commands.getstatusoutput( queryCommand )
88  return output[1].split('\n')
double split
Definition: MVATrainer.cc:139
def BeamSpotWorkflow.getListOfRunsAndLumiFromDBS (   dataSet,
  lastRun = -1 
)

Definition at line 100 of file BeamSpotWorkflow.py.

References python.multivaluedict.append(), cmsRelvalreport.exit, spr.find(), and split.

Referenced by main().

101 def getListOfRunsAndLumiFromDBS(dataSet,lastRun=-1):
102  datasetList = dataSet.split(',')
103  outputList = []
104  for data in datasetList:
105  queryCommand = "dbs --search --query \"find run,lumi where dataset=" + data
106  if lastRun != -1:
107  queryCommand = queryCommand + " and run > " + str(lastRun)
108  queryCommand += "\""
109  print " >> " + queryCommand
110  output = []
111  for i in range(0,3):
112  output = commands.getstatusoutput( queryCommand )
113  if output[0] == 0 and not (output[1].find("ERROR") != -1 or output[1].find("Error") != -1) :
114  break
115  if output[0] != 0:
116  exit("ERROR: I can't contact DBS for the following reason:\n" + output[1])
117  #print output[1]
118  tmpList = output[1].split('\n')
119  for file in tmpList:
120  outputList.append(file)
121  runsAndLumis = {}
122  for out in outputList:
123  regExp = re.search('(\d+)\s+(\d+)',out)
124  if regExp:
125  run = long(regExp.group(1))
126  lumi = long(regExp.group(2))
127  if not run in runsAndLumis:
128  runsAndLumis[run] = []
129  runsAndLumis[run].append(lumi)
130 
131 # print runsAndLumis
132 # exit("ok")
133  return runsAndLumis
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:7
double split
Definition: MVATrainer.cc:139
def BeamSpotWorkflow.getListOfRunsAndLumiFromFile (   firstRun = -1,
  fileName = "" 
)

Definition at line 135 of file BeamSpotWorkflow.py.

Referenced by main().

136 def getListOfRunsAndLumiFromFile(firstRun=-1,fileName=""):
137  file = open(fileName);
138  jsonFile = file.read();
139  file.close()
140  jsonList=json.loads(jsonFile);
141 
142  selected_dcs = {};
143  for element in jsonList:
144  selected_dcs[long(element)]=jsonList[element]
145  return selected_dcs
def BeamSpotWorkflow.getListOfRunsAndLumiFromRR (   firstRun = -1)

Definition at line 147 of file BeamSpotWorkflow.py.

Referenced by main().

148 def getListOfRunsAndLumiFromRR(firstRun=-1):
149  RunReg ="http://pccmsdqm04.cern.ch/runregistry"
150  #RunReg = "http://localhost:40010/runregistry"
151  #Dataset=%Online%
152  Group = "Collisions10"
153 
154  # get handler to RR XML-RPC server
155  FULLADDRESS=RunReg + "/xmlrpc"
156  #print "RunRegistry from: ",FULLADDRESS
157  server = xmlrpclib.ServerProxy(FULLADDRESS)
158  #sel_runtable="{groupName} ='" + Group + "' and {runNumber} > " + str(firstRun) + " and {datasetName} LIKE '" + Dataset + "'"
159  sel_runtable="{groupName} ='" + Group + "' and {runNumber} > " + str(firstRun)
160  #sel_dcstable="{groupName} ='" + Group + "' and {runNumber} > " + str(firstRun) + " and {parDcsBpix} = 1 and {parDcsFpix} = 1 and {parDcsTibtid} = 1 and {parDcsTecM} = 1 and {parDcsTecP} = 1 and {parDcsTob} = 1 and {parDcsEbminus} = 1 and {parDcsEbplus} = 1 and {parDcsEeMinus} = 1 and {parDcsEePlus} = 1 and {parDcsEsMinus} = 1 and {parDcsEsPlus} = 1 and {parDcsHbheA} = 1 and {parDcsHbheB} = 1 and {parDcsHbheC} = 1 and {parDcsH0} = 1 and {parDcsHf} = 1"
161 
162  maxAttempts = 3;
163  tries = 0;
164  while tries<maxAttempts:
165  try:
166  run_data = server.DataExporter.export('RUN' , 'GLOBAL', 'csv_runs', sel_runtable)
167  #dcs_data = server.DataExporter.export('RUNLUMISECTION', 'GLOBAL', 'json' , sel_dcstable)
168  break
169  except:
170  print "Something wrong in accessing runregistry, retrying in 2s....", tries, "/", maxAttempts
171  tries += 1
172  time.sleep(2)
173  if tries==maxAttempts:
174  error = "Run registry unaccessible.....exiting now"
175  return {};
176 
177 
178  listOfRuns=[]
179  for line in run_data.split("\n"):
180  run=line.split(',')[0]
181  if run.isdigit():
182  listOfRuns.append(run)
183 
184 
185  firstRun = listOfRuns[len(listOfRuns)-1];
186  lastRun = listOfRuns[0];
187  sel_dcstable="{groupName} ='" + Group + "' and {runNumber} >= " + str(firstRun) + " and {runNumber} <= " + str(lastRun) + " and {parDcsBpix} = 1 and {parDcsFpix} = 1 and {parDcsTibtid} = 1 and {parDcsTecM} = 1 and {parDcsTecP} = 1 and {parDcsTob} = 1 and {parDcsEbminus} = 1 and {parDcsEbplus} = 1 and {parDcsEeMinus} = 1 and {parDcsEePlus} = 1 and {parDcsEsMinus} = 1 and {parDcsEsPlus} = 1 and {parDcsHbheA} = 1 and {parDcsHbheB} = 1 and {parDcsHbheC} = 1 and {parDcsH0} = 1 and {parDcsHf} = 1"
188 
189  tries = 0;
190  while tries<maxAttempts:
191  try:
192  #run_data = server.DataExporter.export('RUN' , 'GLOBAL', 'csv_runs', sel_runtable)
193  dcs_data = server.DataExporter.export('RUNLUMISECTION', 'GLOBAL', 'json' , sel_dcstable)
194  break
195  except:
196  print "I was able to get the list of runs and now I am trying to access the detector status, retrying in 2s....", tries, "/", maxAttempts
197  tries += 1
198  time.sleep(2)
199  if tries==maxAttempts:
200  error = "Run registry unaccessible.....exiting now"
201  return {};
202 
203  selected_dcs={}
204  jsonList=json.loads(dcs_data)
205 
206  #for element in jsonList:
207  for element in listOfRuns:
208  #if element in listOfRuns:
209  if element in jsonList:
210  selected_dcs[long(element)]=jsonList[element]
211  else:
212  print "WARNING: Run " + element + " is a collision10 run with 0 lumis in Run Registry!"
213  selected_dcs[long(element)]= [[]]
214  #print selected_dcs
215  return selected_dcs
def BeamSpotWorkflow.getNewRunList (   fromDir,
  lastUploadedIOV 
)

Definition at line 246 of file BeamSpotWorkflow.py.

References getRunNumberFromFileName(), and python.rootplot.utilities.ls().

Referenced by main().

247 def getNewRunList(fromDir,lastUploadedIOV):
248  newRunList = []
249  listOfFiles = ls(fromDir,".txt")
250  runFileMap = {}
251  for fileName in listOfFiles:
252  runNumber = getRunNumberFromFileName(fileName)
253  if runNumber > lastUploadedIOV:
254  newRunList.append(fileName)
255  return newRunList
def BeamSpotWorkflow.getNumberOfFilesToProcessForRun (   dataSet,
  run 
)

Definition at line 90 of file BeamSpotWorkflow.py.

References split.

Referenced by removeUncompleteRuns(), and selectFilesToProcess().

90 
91 def getNumberOfFilesToProcessForRun(dataSet,run):
92  queryCommand = "dbs --search --query \"find file where dataset=" + dataSet + " and run = " + str(run) + "\" | grep .root"
93  #print " >> " + queryCommand
94  output = commands.getstatusoutput( queryCommand )
95  if output[0] != 0:
96  return 0
97  else:
98  return len(output[1].split('\n'))
def getNumberOfFilesToProcessForRun
double split
Definition: MVATrainer.cc:139
def BeamSpotWorkflow.getRunNumberFromDBSName (   fileName)

Definition at line 239 of file BeamSpotWorkflow.py.

Referenced by aselectFilesToProcess(), and getLastClosedRun().

240 def getRunNumberFromDBSName(fileName):
241  regExp = re.search('(\D+)/(\d+)/(\d+)/(\d+)/(\D+)',fileName)
242  if not regExp:
243  return -1
244  return long(regExp.group(3)+regExp.group(4))
def BeamSpotWorkflow.getRunNumberFromFileName (   fileName)

Definition at line 231 of file BeamSpotWorkflow.py.

Referenced by aselectFilesToProcess(), getNewRunList(), and removeUncompleteRuns().

232 def getRunNumberFromFileName(fileName):
233 # regExp = re.search('(\D+)_(\d+)_(\d+)_(\d+)',fileName)
234  regExp = re.search('(\D+)_(\d+)_(\d+)_',fileName)
235  if not regExp:
236  return -1
237  return long(regExp.group(3))
def BeamSpotWorkflow.main ( )

Definition at line 532 of file BeamSpotWorkflow.py.

References CommonMethods.appendSqliteFile(), CommonMethods.checkLock(), CommonMethods.cp(), CommonMethods.createWeightedPayloads(), CommonMethods.dirExists(), CommonMethods.dumpValues(), cmsRelvalreport.exit, mergeVDriftHistosByStation.file, getLastUploadedIOV(), getListOfRunsAndLumiFromDBS(), getListOfRunsAndLumiFromFile(), getListOfRunsAndLumiFromRR(), getNewRunList(), if(), CommonMethods.lock(), timeUnitHelper.pack(), triggerExpression.parse(), CommonMethods.readBeamSpotFile(), CommonMethods.readSqliteFile(), CommonMethods.rmLock(), selectFilesToProcess(), CommonMethods.sendEmail(), CommonMethods.setLockName(), CommonMethods.sortAndCleanBeamList(), timeUnitHelper.unpackLumiid(), CommonMethods.uploadSqliteFile(), and CommonMethods.writeSqliteFile().

533 def main():
534  ######### COMMAND LINE OPTIONS ##############
535  option,args = parse(__doc__)
536 
537  ######### Check if there is already a megascript running ########
538  if option.lock:
539  setLockName('.' + option.lock)
540  if checkLock():
541  print "There is already a megascript runnning...exiting"
542  return
543  else:
544  lock()
545 
546 
547  destDB = 'oracle://cms_orcon_prod/CMS_COND_31X_BEAMSPOT'
548  if option.Test:
549  destDB = 'oracle://cms_orcoff_prep/CMS_COND_BEAMSPOT'
550 
551  ######### CONFIGURATION FILE ################
552  cfgFile = "BeamSpotWorkflow.cfg"
553  if option.cfg:
554  cfgFile = option.cfg
555  configurationFile = os.getenv("CMSSW_BASE") + "/src/RecoVertex/BeamSpotProducer/scripts/" + cfgFile
556  configuration = ConfigParser.ConfigParser()
557  print 'Reading configuration from ', configurationFile
558  configuration.read(configurationFile)
559 
560  sourceDir = configuration.get('Common','SOURCE_DIR')
561  archiveDir = configuration.get('Common','ARCHIVE_DIR')
562  workingDir = configuration.get('Common','WORKING_DIR')
563  databaseTag = configuration.get('Common','DBTAG')
564  dataSet = configuration.get('Common','DATASET')
565  fileIOVBase = configuration.get('Common','FILE_IOV_BASE')
566  dbIOVBase = configuration.get('Common','DB_IOV_BASE')
567  dbsTolerance = float(configuration.get('Common','DBS_TOLERANCE'))
568  dbsTolerancePercent = float(configuration.get('Common','DBS_TOLERANCE_PERCENT'))
569  rrTolerance = float(configuration.get('Common','RR_TOLERANCE'))
570  missingFilesTolerance = float(configuration.get('Common','MISSING_FILES_TOLERANCE'))
571  missingLumisTimeout = float(configuration.get('Common','MISSING_LUMIS_TIMEOUT'))
572  jsonFileName = configuration.get('Common','JSON_FILE')
573  mailList = configuration.get('Common','EMAIL')
574 
575  ######### DIRECTORIES SETUP #################
576  if sourceDir[len(sourceDir)-1] != '/':
577  sourceDir = sourceDir + '/'
578  if not dirExists(sourceDir):
579  error = "ERROR: The source directory " + sourceDir + " doesn't exist!"
580  sendEmail(mailList,error)
581  exit(error)
582 
583  if archiveDir[len(archiveDir)-1] != '/':
584  archiveDir = archiveDir + '/'
585  if not os.path.isdir(archiveDir):
586  os.mkdir(archiveDir)
587 
588  if workingDir[len(workingDir)-1] != '/':
589  workingDir = workingDir + '/'
590  if not os.path.isdir(workingDir):
591  os.mkdir(workingDir)
592  else:
593  os.system("rm -f "+ workingDir + "*")
594 
595 
596  print "Getting last IOV for tag: " + databaseTag
597  lastUploadedIOV = 1
598  if destDB == "oracle://cms_orcon_prod/CMS_COND_31X_BEAMSPOT":
599  lastUploadedIOV = getLastUploadedIOV(databaseTag)
600  else:
601  lastUploadedIOV = getLastUploadedIOV(databaseTag,destDB)
602 
603  #lastUploadedIOV = 133885
604  #lastUploadedIOV = 575216380019329
605  if dbIOVBase == "lumiid":
606  lastUploadedIOV = unpackLumiid(lastUploadedIOV)["run"]
607 
608  ######### Get list of files processed after the last IOV
609  print "Getting list of files processed after IOV " + str(lastUploadedIOV)
610  newProcessedRunList = getNewRunList(sourceDir,lastUploadedIOV)
611  if len(newProcessedRunList) == 0:
612  exit("There are no new runs after " + str(lastUploadedIOV))
613 
614  ######### Copy files to archive directory
615  print "Copying files to archive directory"
616  copiedFiles = []
617  for i in range(3):
618  copiedFiles = cp(sourceDir,archiveDir,newProcessedRunList)
619  if len(copiedFiles) == len(newProcessedRunList):
620  break;
621  if len(copiedFiles) != len(newProcessedRunList):
622  error = "ERROR: I can't copy more than " + str(len(copiedFiles)) + " files out of " + str(len(newProcessedRunList))
623  sendEmail(mailList,error)
624  exit(error)
625 
626 
627  ######### Get from DBS the list of files after last IOV
628  #listOfFilesToProcess = getListOfFilesToProcess(dataSet,lastUploadedIOV)
629  print "Getting list of files from DBS"
630  listOfRunsAndLumiFromDBS = getListOfRunsAndLumiFromDBS(dataSet,lastUploadedIOV)
631  if len(listOfRunsAndLumiFromDBS) == 0:
632  exit("There are no files in DBS to process")
633  print "Getting list of files from RR"
634  listOfRunsAndLumiFromRR = getListOfRunsAndLumiFromRR(lastUploadedIOV)
635  if(not listOfRunsAndLumiFromRR):
636  print "Looks like I can't get anything from the run registry so I'll get the data from the json file " + jsonFileName
637  listOfRunsAndLumiFromRR = getListOfRunsAndLumiFromFile(lastUploadedIOV,jsonFileName)
638  ######### Get list of files to process for DB
639  #selectedFilesToProcess = selectFilesToProcess(listOfFilesToProcess,copiedFiles)
640  #completeProcessedRuns = removeUncompleteRuns(copiedFiles,dataSet)
641  #print copiedFiles
642  #print completeProcessedRuns
643  #exit("complete")
644  print "Getting list of files to process"
645  selectedFilesToProcess = selectFilesToProcess(listOfRunsAndLumiFromDBS,listOfRunsAndLumiFromRR,copiedFiles,archiveDir,dataSet,mailList,dbsTolerance,dbsTolerancePercent,rrTolerance,missingFilesTolerance,missingLumisTimeout)
646  if len(selectedFilesToProcess) == 0:
647  exit("There are no files to process")
648 
649  #print selectedFilesToProcess
650  ######### Copy files to working directory
651  print "Copying files from archive to working directory"
652  copiedFiles = []
653  for i in range(3):
654  copiedFiles = cp(archiveDir,workingDir,selectedFilesToProcess)
655  if len(copiedFiles) == len(selectedFilesToProcess):
656  break;
657  else:
658  commands.getstatusoutput("rm -rf " + workingDir)
659  if len(copiedFiles) != len(selectedFilesToProcess):
660  error = "ERROR: I can't copy more than " + str(len(copiedFiles)) + " files out of " + str(len(selectedFilesToProcess)) + " from " + archiveDir + " to " + workingDir
661  sendEmail(mailList,error)
662  exit(error)
663 
664  print "Sorting and cleaning beamlist"
665  beamSpotObjList = []
666  for fileName in copiedFiles:
667  readBeamSpotFile(workingDir+fileName,beamSpotObjList,fileIOVBase)
668 
669  sortAndCleanBeamList(beamSpotObjList,fileIOVBase)
670 
671  if len(beamSpotObjList) == 0:
672  error = "WARNING: None of the processed and copied payloads has a valid fit so there are no results. This shouldn't happen since we are filtering using the run register, so there should be at least one good run."
673  exit(error)
674 
675  payloadFileName = "PayloadFile.txt"
676 
677  runBased = False
678  if dbIOVBase == "runnumber":
679  runBased = True
680 
681  payloadList = createWeightedPayloads(workingDir+payloadFileName,beamSpotObjList,runBased)
682  if len(payloadList) == 0:
683  error = "WARNING: I wasn't able to create any payload even if I have some BeamSpot objects."
684  exit(error)
685 
686 
687  tmpPayloadFileName = workingDir + "SingleTmpPayloadFile.txt"
688  tmpSqliteFileName = workingDir + "SingleTmpSqliteFile.db"
689 
690  writeDBTemplate = os.getenv("CMSSW_BASE") + "/src/RecoVertex/BeamSpotProducer/test/write2DB_template.py"
691  readDBTemplate = os.getenv("CMSSW_BASE") + "/src/RecoVertex/BeamSpotProducer/test/readDB_template.py"
692  payloadNumber = -1
693  iovSinceFirst = '0';
694  iovTillLast = '0';
695 
696  #Creating the final name for the combined sqlite file
697  uuid = commands.getstatusoutput('uuidgen -t')[1]
698  final_sqlite_file_name = databaseTag + '@' + uuid
699  sqlite_file = workingDir + final_sqlite_file_name + ".db"
700  metadata_file = workingDir + final_sqlite_file_name + ".txt"
701 
702  for payload in payloadList:
703  payloadNumber += 1
704  if option.zlarge:
705  payload.sigmaZ = 10
706  payload.sigmaZerr = 2.5e-05
707  tmpFile = file(tmpPayloadFileName,'w')
708  dumpValues(payload,tmpFile)
709  tmpFile.close()
710  if not writeSqliteFile(tmpSqliteFileName,databaseTag,dbIOVBase,tmpPayloadFileName,writeDBTemplate,workingDir):
711  error = "An error occurred while writing the sqlite file: " + tmpSqliteFileName
712  exit(error)
713  readSqliteFile(tmpSqliteFileName,databaseTag,readDBTemplate,workingDir)
714 
715  ##############################################################
716  #WARNING I am not sure if I am packing the right values
717  if dbIOVBase == "runnumber":
718  iov_since = str(payload.Run)
719  iov_till = iov_since
720  elif dbIOVBase == "lumiid":
721  iov_since = str( pack(int(payload.Run), int(payload.IOVfirst)) )
722  iov_till = str( pack(int(payload.Run), int(payload.IOVlast)) )
723  elif dbIOVBase == "timestamp":
724  error = "ERROR: IOV " + dbIOVBase + " still not implemented."
725  exit(error)
726  else:
727  error = "ERROR: IOV " + dbIOVBase + " unrecognized!"
728  exit(error)
729 
730  if payloadNumber == 0:
731  iovSinceFirst = iov_since
732  if payloadNumber == len(payloadList)-1:
733  iovTillLast = iov_till
734 
735  appendSqliteFile(final_sqlite_file_name + ".db", tmpSqliteFileName, databaseTag, iov_since, iov_till ,workingDir)
736  os.system("rm -f " + tmpPayloadFileName + " " + tmpSqliteFileName)
737 
738 
739  #### CREATE payload for merged output
740 
741  print " create MERGED payload card for dropbox ..."
742 
743  dfile = open(metadata_file,'w')
744 
745  dfile.write('destDB ' + destDB +'\n')
746  dfile.write('tag ' + databaseTag +'\n')
747  dfile.write('inputtag' +'\n')
748  dfile.write('since ' + iovSinceFirst +'\n')
749  #dfile.write('till ' + iov_till +'\n')
750  dfile.write('Timetype '+ dbIOVBase +'\n')
751 
752  ###################################################
753  # WARNING tagType forced to offline
754  print "WARNING TAG TYPE forced to be just offline"
755  tagType = "offline"
756  checkType = tagType
757  if tagType == "express":
758  checkType = "hlt"
759  dfile.write('IOVCheck ' + checkType + '\n')
760  dfile.write('usertext Beam spot position\n')
761 
762  dfile.close()
763 
764 
765 
766  if option.upload:
767  print " scp files to offline Drop Box"
768  dropbox = "/DropBox"
769  if option.Test:
770  dropbox = "/DropBox_test"
771  print "UPLOADING TO TEST DB"
772  uploadSqliteFile(workingDir, final_sqlite_file_name, dropbox)
773 
774  archive_sqlite_file_name = "Payloads_" + iovSinceFirst + "_" + iovTillLast + "_" + final_sqlite_file_name
775  archive_results_file_name = "Payloads_" + iovSinceFirst + "_" + iovTillLast + "_" + databaseTag + ".txt"
776  if not os.path.isdir(archiveDir + 'payloads'):
777  os.mkdir(archiveDir + 'payloads')
778  commands.getstatusoutput('mv ' + sqlite_file + ' ' + archiveDir + 'payloads/' + archive_sqlite_file_name + '.db')
779  commands.getstatusoutput('mv ' + metadata_file + ' ' + archiveDir + 'payloads/' + archive_sqlite_file_name + '.txt')
780  commands.getstatusoutput('cp ' + workingDir + payloadFileName + ' ' + archiveDir + 'payloads/' + archive_results_file_name)
781 
782  print archiveDir + "payloads/" + archive_sqlite_file_name + '.db'
783  print archiveDir + "payloads/" + archive_sqlite_file_name + '.txt'
784 
785  rmLock()
def sendEmail
General utilities.
Evaluator * parse(const T &text)
def createWeightedPayloads
CREATE FILE FOR PAYLOADS.
def getLastUploadedIOV
General functions.
perl if(1 lt scalar(@::datatypes))
Definition: edlooper.cc:31
def sortAndCleanBeamList
Sort and clean list of data for consecutive duplicates and bad fits.
def BeamSpotWorkflow.removeUncompleteRuns (   newRunList,
  dataSet 
)

Definition at line 474 of file BeamSpotWorkflow.py.

References getNumberOfFilesToProcessForRun(), and getRunNumberFromFileName().

475 def removeUncompleteRuns(newRunList,dataSet):
476  processedRuns = {}
477  for fileName in newRunList:
478  run = getRunNumberFromFileName(fileName)
479  if not run in processedRuns:
480  processedRuns[run] = 0
481  processedRuns[run] += 1
482 
483  for run in processedRuns.keys():
484  nFiles = getNumberOfFilesToProcessForRun(dataSet,run)
485  if processedRuns[run] < nFiles:
486  print "I haven't processed all files yet : " + str(processedRuns[run]) + " out of " + str(nFiles) + " for run: " + str(run)
487  else:
488  print "All files have been processed for run: " + str(run) + " (" + str(processedRuns[run]) + " out of " + str(nFiles) + ")"
def getNumberOfFilesToProcessForRun
def BeamSpotWorkflow.selectFilesToProcess (   listOfRunsAndLumiFromDBS,
  listOfRunsAndLumiFromRR,
  newRunList,
  runListDir,
  dataSet,
  mailList,
  dbsTolerance,
  dbsTolerancePercent,
  rrTolerance,
  missingFilesTolerance,
  missingLumisTimeout 
)

Definition at line 257 of file BeamSpotWorkflow.py.

References python.multivaluedict.append(), compareLumiLists(), cmsRelvalreport.exit, getNumberOfFilesToProcessForRun(), linker.replace(), CommonMethods.sendEmail(), split, and CommonMethods.timeoutManager().

Referenced by main().

258 def selectFilesToProcess(listOfRunsAndLumiFromDBS,listOfRunsAndLumiFromRR,newRunList,runListDir,dataSet,mailList,dbsTolerance,dbsTolerancePercent,rrTolerance,missingFilesTolerance,missingLumisTimeout):
259  runsAndLumisProcessed = {}
260  runsAndFiles = {}
261  for fileName in newRunList:
262  file = open(runListDir+fileName)
263  for line in file:
264  if line.find("Runnumber") != -1:
265  run = long(line.replace('\n','').split(' ')[1])
266  elif line.find("LumiRange") != -1:
267  lumiLine = line.replace('\n','').split(' ')
268  begLumi = long(lumiLine[1])
269  endLumi = long(lumiLine[3])
270  if begLumi != endLumi:
271  error = "The lumi range is greater than 1 for run " + str(run) + " " + line + " in file: " + runListDir + fileName
272  exit(error)
273  else:
274  if not run in runsAndLumisProcessed:
275  runsAndLumisProcessed[run] = []
276  if begLumi in runsAndLumisProcessed[run]:
277  print "Lumi " + str(begLumi) + " in event " + str(run) + " already exist. This MUST not happen but right now I will ignore this lumi!"
278  else:
279  runsAndLumisProcessed[run].append(begLumi)
280  if not run in runsAndFiles:
281  runsAndFiles[run] = []
282  runsAndFiles[run].append(fileName)
283  file.close()
284 
285  rrKeys = listOfRunsAndLumiFromRR.keys()
286  rrKeys.sort()
287  dbsKeys = listOfRunsAndLumiFromDBS.keys()
288  dbsKeys.sort()
289  #I remove the last entry from DBS since I am not sure it is an already closed run!
290  lastUnclosedRun = dbsKeys.pop()
291  #print "Last unclosed run: " + str(lastUnclosedRun)
292  procKeys = runsAndLumisProcessed.keys()
293  procKeys.sort()
294  #print "Run Registry:"
295  #print rrKeys
296  #print "DBS:"
297  #print dbsKeys
298  #print "List:"
299  #print procKeys
300  #print lastUnclosedRun
301  filesToProcess = []
302  for run in rrKeys:
303  RRList = []
304  for lumiRange in listOfRunsAndLumiFromRR[run]:
305  if lumiRange != []:
306  for l in range(lumiRange[0],lumiRange[1]+1):
307  RRList.append(long(l))
308  if run in procKeys and run < lastUnclosedRun:
309  #print "run " + str(run) + " is in procKeys"
310  if not run in dbsKeys and run != lastUnclosedRun:
311  error = "Impossible but run " + str(run) + " has been processed and it is also in the run registry but it is not in DBS!"
312  exit(error)
313  print "Working on run " + str(run)
314  nFiles = 0
315  for data in dataSet.split(','):
316  nFiles = getNumberOfFilesToProcessForRun(data,run)
317  if nFiles != 0:
318  break
319  if len(runsAndFiles[run]) < nFiles:
320  print "I haven't processed all files yet : " + str(len(runsAndFiles[run])) + " out of " + str(nFiles) + " for run: " + str(run)
321  if nFiles - len(runsAndFiles[run]) <= missingFilesTolerance:
322  timeoutManager("DBS_VERY_BIG_MISMATCH_Run"+str(run)) # resetting this timeout
323  timeoutType = timeoutManager("DBS_MISMATCH_Run"+str(run),missingLumisTimeout)
324  if timeoutType == 1:
325  print "WARNING: I previously set a timeout that expired...I'll continue with the script even if I didn't process all the lumis!"
326  else:
327  if timeoutType == -1:
328  print "WARNING: Setting the DBS_MISMATCH_Run" + str(run) + " timeout because I haven't processed all files!"
329  else:
330  print "WARNING: Timeout DBS_MISMATCH_Run" + str(run) + " is in progress."
331  return filesToProcess
332  else:
333  timeoutType = timeoutManager("DBS_VERY_BIG_MISMATCH_Run"+str(run),missingLumisTimeout)
334  if timeoutType == 1:
335  error = "ERROR: I previously set a timeout that expired...I can't continue with the script because there are too many (" + str(nFiles - len(runsAndFiles[run])) + " files missing) and for too long " + str(missingLumisTimeout/3600) + " hours! I will process anyway the runs before this one (" + str(run) + ")"
336  sendEmail(mailList,error)
337  return filesToProcess
338  #exit(error)
339  else:
340  if timeoutType == -1:
341  print "WARNING: Setting the DBS_VERY_BIG_MISMATCH_Run" + str(run) + " timeout because I haven't processed all files!"
342  else:
343  print "WARNING: Timeout DBS_VERY_BIG_MISMATCH_Run" + str(run) + " is in progress."
344  return filesToProcess
345 
346  else:
347  timeoutManager("DBS_VERY_BIG_MISMATCH_Run"+str(run))
348  timeoutManager("DBS_MISMATCH_Run"+str(run))
349  print "I have processed " + str(len(runsAndFiles[run])) + " out of " + str(nFiles) + " files that are in DBS. So I should have all the lumis!"
350  errors = []
351  badProcessed = []
352  badDBSProcessed = []
353  badDBS = []
354  badRRProcessed = []
355  badRR = []
356  #It is important for runsAndLumisProcessed[run] to be the first because the comparision is not ==
357  badDBSProcessed,badDBS = compareLumiLists(runsAndLumisProcessed[run],listOfRunsAndLumiFromDBS[run],errors)
358  for i in range(0,len(errors)):
359  errors[i] = errors[i].replace("listA","the processed lumis")
360  errors[i] = errors[i].replace("listB","DBS")
361  #print errors
362  #print badProcessed
363  #print badDBS
364  #exit("ciao")
365  if len(badDBS) != 0:
366  print "This is weird because I processed more lumis than the ones that are in DBS!"
367  if len(badDBSProcessed) != 0 and run in rrKeys:
368  lastError = len(errors)
369  #print RRList
370  #It is important for runsAndLumisProcessed[run] to be the first because the comparision is not ==
371  badRRProcessed,badRR = compareLumiLists(runsAndLumisProcessed[run],RRList,errors)
372  for i in range(0,len(errors)):
373  errors[i] = errors[i].replace("listA","the processed lumis")
374  errors[i] = errors[i].replace("listB","Run Registry")
375  #print errors
376  #print badProcessed
377  #print badRunRegistry
378 
379  if len(badRRProcessed) != 0:
380  print "I have not processed some of the lumis that are in the run registry for run: " + str(run)
381  for lumi in badDBSProcessed:
382  if lumi in badRRProcessed:
383  badProcessed.append(lumi)
384  lenA = len(badProcessed)
385  lenB = len(RRList)
386  if 100.*lenA/lenB <= dbsTolerancePercent:
387  print "WARNING: I didn't process " + str(100.*lenA/lenB) + "% of the lumis but I am within the " + str(dbsTolerancePercent) + "% set in the configuration. Which corrispond to " + str(lenA) + " out of " + str(lenB) + " lumis"
388  #print errors
389  badProcessed = []
390  elif lenA <= dbsTolerance:
391  print "WARNING: I didn't process " + str(lenA) + " lumis but I am within the " + str(dbsTolerance) + " lumis set in the configuration. Which corrispond to " + str(lenA) + " out of " + str(lenB) + " lumis"
392  #print errors
393  badProcessed = []
394  else:
395  error = "ERROR: For run " + str(run) + " I didn't process " + str(100.*lenA/lenB) + "% of the lumis and I am not within the " + str(dbsTolerancePercent) + "% set in the configuration. The number of lumis that I didn't process (" + str(lenA) + " out of " + str(lenB) + ") is greater also than the " + str(dbsTolerance) + " lumis that I can tolerate. I can't process runs >= " + str(run) + " but I'll process the runs before!"
396  sendEmail(mailList,error)
397  print error
398  return filesToProcess
399  #exit(errors)
400  #return filesToProcess
401  elif len(errors) != 0:
402  print "The number of lumi sections processed didn't match the one in DBS but they cover all the ones in the Run Registry, so it is ok!"
403  #print errors
404 
405  #If I get here it means that I passed or the DBS or the RR test
406  if len(badProcessed) == 0:
407  for file in runsAndFiles[run]:
408  filesToProcess.append(file)
409  else:
410  #print errors
411  print "This should never happen because if I have errors I return or exit! Run: " + str(run)
412  else:
413  error = "Run " + str(run) + " is in the run registry but it has not been processed yet!"
414  print error
415  timeoutType = timeoutManager("MISSING_RUNREGRUN_Run"+str(run),missingLumisTimeout)
416  if timeoutType == 1:
417  if len(RRList) <= rrTolerance:
418  error = "WARNING: I previously set the MISSING_RUNREGRUN_Run" + str(run) + " timeout that expired...I am missing run " + str(run) + " but it only had " + str(len(RRList)) + " <= " + str(rrTolerance) + " lumis. So I will continue and ignore it... "
419  #print listOfRunsAndLumiFromRR[run]
420  print error
421  #sendEmail(mailList,error)
422  else:
423  error = "ERROR: I previously set the MISSING_RUNREGRUN_Run" + str(run) + " timeout that expired...I am missing run " + str(run) + " which has " + str(len(RRList)) + " > " + str(rrTolerance) + " lumis. I can't continue but I'll process the runs before this one"
424  sendEmail(mailList,error)
425  return filesToProcess
426  #exit(error)
427  else:
428  if timeoutType == -1:
429  print "WARNING: Setting the MISSING_RUNREGRUN_Run" + str(run) + " timeout because I haven't processed a run!"
430  else:
431  print "WARNING: Timeout MISSING_RUNREGRUN_Run" + str(run) + " is in progress."
432  return filesToProcess
433 
return filesToProcess
def sendEmail
General utilities.
def replace
Definition: linker.py:10
def getNumberOfFilesToProcessForRun
double split
Definition: MVATrainer.cc:139

Variable Documentation

string BeamSpotWorkflow.error = "Please set a crab environment in order to get the proper JSON lib"

Definition at line 50 of file BeamSpotWorkflow.py.