CMS 3D CMS Logo

BeamSpotWorkflow.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 #____________________________________________________________
3 #
4 # BeamSpotWorkflow
5 #
6 # A very complicate way to automatize the beam spot workflow
7 #
8 # Francisco Yumiceva, Lorenzo Uplegger
9 # yumiceva@fnal.gov, uplegger@fnal.gov
10 #
11 # Fermilab, 2010
12 #
13 #____________________________________________________________
14 
15 """
16  BeamSpotWorkflow.py
17 
18  A very complicate script to upload the results into the DB
19 
20  usage: %prog -d <data file/directory> -t <tag name>
21  -c, --cfg = CFGFILE : Use a different configuration file than the default
22  -l, --lock = LOCK : Create a lock file to have just one script running
23  -o, --overwrite : Overwrite results files when copying.
24  -T, --Test : Upload files to Test dropbox for data validation.
25  -u, --upload : Upload files to offline drop box via scp.
26  -z, --zlarge : Enlarge sigmaZ to 10 +/- 0.005 cm.
27 
28  Francisco Yumiceva (yumiceva@fnal.gov)
29  Lorenzo Uplegger (send an email to Francisco)
30  Fermilab 2010
31 
32 """
33 from __future__ import print_function
34 
35 
36 from builtins import range
37 import sys,os
38 import subprocess, re, time
39 import datetime
40 import configparser as ConfigParser
41 import xmlrpclib
42 from BeamSpotObj import BeamSpot
43 from IOVObj import IOV
44 from CommonMethods import *
45 
46 try: # FUTURE: Python 2.6, prior to 2.6 requires simplejson
47  import json
48 except:
49  try:
50  import simplejson as json
51  except:
52  error = "Please set a crab environment in order to get the proper JSON lib"
53  exit(error)
54 
55 
58 def getLastUploadedIOV(tagName,destDB="oracle://cms_orcoff_prod/CMS_COND_31X_BEAMSPOT"):
59  #return 582088327592295
60  listIOVCommand = "cmscond_list_iov -c " + destDB + " -P /afs/cern.ch/cms/DB/conddb -t " + tagName
61  dbError = subprocess.getstatusoutput( listIOVCommand )
62  if dbError[0] != 0 :
63  if dbError[1].find("metadata entry \"" + tagName + "\" does not exist") != -1:
64  print("Creating a new tag because I got the following error contacting the DB")
65  print(dbError[1])
66  return 1
67  #return 133928
68  else:
69  exit("ERROR: Can\'t connect to db because:\n" + dbError[1])
70 
71 
72  aCommand = listIOVCommand + " | grep DB= | tail -1 | awk \'{print $1}\'"
73  output = subprocess.getstatusoutput( aCommand )
74 
75  #WARNING when we pass to lumi IOV this should be long long
76  if output[1] == '':
77  exit("ERROR: The tag " + tagName + " exists but I can't get the value of the last IOV")
78 
79  return long(output[1])
80 
81 
82 def getListOfFilesToProcess(dataSet,lastRun=-1):
83  queryCommand = "dbs --search --query \"find file where dataset=" + dataSet
84  if lastRun != -1:
85  queryCommand = queryCommand + " and run > " + str(lastRun)
86  queryCommand = queryCommand + "\" | grep .root"
87 # print " >> " + queryCommand
88  output = subprocess.getstatusoutput( queryCommand )
89  return output[1].split('\n')
90 
91 
93  queryCommand = "dbs --search --query \"find file where dataset=" + dataSet + " and run = " + str(run) + "\" | grep .root"
94  #print " >> " + queryCommand
95  output = subprocess.getstatusoutput( queryCommand )
96  if output[0] != 0:
97  return 0
98  else:
99  return len(output[1].split('\n'))
100 
101 
102 def getListOfRunsAndLumiFromDBS(dataSet,lastRun=-1):
103  datasetList = dataSet.split(',')
104  outputList = []
105  for data in datasetList:
106  queryCommand = "dbs --search --query \"find run,lumi where dataset=" + data
107  if lastRun != -1:
108  queryCommand = queryCommand + " and run > " + str(lastRun)
109  queryCommand += "\""
110  print(" >> " + queryCommand)
111  output = []
112  for i in range(0,3):
113  output = subprocess.getstatusoutput( queryCommand )
114  if output[0] == 0 and not (output[1].find("ERROR") != -1 or output[1].find("Error") != -1) :
115  break
116  if output[0] != 0:
117  exit("ERROR: I can't contact DBS for the following reason:\n" + output[1])
118  #print output[1]
119  tmpList = output[1].split('\n')
120  for file in tmpList:
121  outputList.append(file)
122  runsAndLumis = {}
123  for out in outputList:
124  regExp = re.search('(\d+)\s+(\d+)',out)
125  if regExp:
126  run = long(regExp.group(1))
127  lumi = long(regExp.group(2))
128  if not run in runsAndLumis:
129  runsAndLumis[run] = []
130  runsAndLumis[run].append(lumi)
131 
132 # print runsAndLumis
133 # exit("ok")
134  return runsAndLumis
135 
136 
137 def getListOfRunsAndLumiFromFile(firstRun=-1,fileName=""):
138  file = open(fileName);
139  jsonFile = file.read();
140  file.close()
141  jsonList=json.loads(jsonFile);
142 
143  selected_dcs = {};
144  for element in jsonList:
145  selected_dcs[long(element)]=jsonList[element]
146  return selected_dcs
147 
148 
149 def getListOfRunsAndLumiFromRR(firstRun=-1):
150  RunReg ="http://pccmsdqm04.cern.ch/runregistry"
151  #RunReg = "http://localhost:40010/runregistry"
152  #Dataset=%Online%
153  Group = "Collisions10"
154 
155  # get handler to RR XML-RPC server
156  FULLADDRESS=RunReg + "/xmlrpc"
157  #print "RunRegistry from: ",FULLADDRESS
158  server = xmlrpclib.ServerProxy(FULLADDRESS)
159  #sel_runtable="{groupName} ='" + Group + "' and {runNumber} > " + str(firstRun) + " and {datasetName} LIKE '" + Dataset + "'"
160  sel_runtable="{groupName} ='" + Group + "' and {runNumber} > " + str(firstRun)
161  #sel_dcstable="{groupName} ='" + Group + "' and {runNumber} > " + str(firstRun) + " and {parDcsBpix} = 1 and {parDcsFpix} = 1 and {parDcsTibtid} = 1 and {parDcsTecM} = 1 and {parDcsTecP} = 1 and {parDcsTob} = 1 and {parDcsEbminus} = 1 and {parDcsEbplus} = 1 and {parDcsEeMinus} = 1 and {parDcsEePlus} = 1 and {parDcsEsMinus} = 1 and {parDcsEsPlus} = 1 and {parDcsHbheA} = 1 and {parDcsHbheB} = 1 and {parDcsHbheC} = 1 and {parDcsH0} = 1 and {parDcsHf} = 1"
162 
163  maxAttempts = 3;
164  tries = 0;
165  while tries<maxAttempts:
166  try:
167  run_data = server.DataExporter.export('RUN' , 'GLOBAL', 'csv_runs', sel_runtable)
168  #dcs_data = server.DataExporter.export('RUNLUMISECTION', 'GLOBAL', 'json' , sel_dcstable)
169  break
170  except:
171  print("Something wrong in accessing runregistry, retrying in 2s....", tries, "/", maxAttempts)
172  tries += 1
173  time.sleep(2)
174  if tries==maxAttempts:
175  error = "Run registry unaccessible.....exiting now"
176  return {};
177 
178 
179  listOfRuns=[]
180  for line in run_data.split("\n"):
181  run=line.split(',')[0]
182  if run.isdigit():
183  listOfRuns.append(run)
184 
185 
186  firstRun = listOfRuns[len(listOfRuns)-1];
187  lastRun = listOfRuns[0];
188  sel_dcstable="{groupName} ='" + Group + "' and {runNumber} >= " + str(firstRun) + " and {runNumber} <= " + str(lastRun) + " and {parDcsBpix} = 1 and {parDcsFpix} = 1 and {parDcsTibtid} = 1 and {parDcsTecM} = 1 and {parDcsTecP} = 1 and {parDcsTob} = 1 and {parDcsEbminus} = 1 and {parDcsEbplus} = 1 and {parDcsEeMinus} = 1 and {parDcsEePlus} = 1 and {parDcsEsMinus} = 1 and {parDcsEsPlus} = 1 and {parDcsHbheA} = 1 and {parDcsHbheB} = 1 and {parDcsHbheC} = 1 and {parDcsH0} = 1 and {parDcsHf} = 1"
189 
190  tries = 0;
191  while tries<maxAttempts:
192  try:
193  #run_data = server.DataExporter.export('RUN' , 'GLOBAL', 'csv_runs', sel_runtable)
194  dcs_data = server.DataExporter.export('RUNLUMISECTION', 'GLOBAL', 'json' , sel_dcstable)
195  break
196  except:
197  print("I was able to get the list of runs and now I am trying to access the detector status, retrying in 2s....", tries, "/", maxAttempts)
198  tries += 1
199  time.sleep(2)
200  if tries==maxAttempts:
201  error = "Run registry unaccessible.....exiting now"
202  return {};
203 
204  selected_dcs={}
205  jsonList=json.loads(dcs_data)
206 
207  #for element in jsonList:
208  for element in listOfRuns:
209  #if element in listOfRuns:
210  if element in jsonList:
211  selected_dcs[long(element)]=jsonList[element]
212  else:
213  print("WARNING: Run " + element + " is a collision10 run with 0 lumis in Run Registry!")
214  selected_dcs[long(element)]= [[]]
215  #print selected_dcs
216  return selected_dcs
217 
218 
219 def getLastClosedRun(DBSListOfFiles):
220  runs = []
221  for file in DBSListOfFiles:
222  runNumber = getRunNumberFromDBSName(file)
223  if runs.count(runNumber) == 0:
224  runs.append(runNumber)
225 
226  if len(runs) <= 1: #No closed run
227  return -1
228  else:
229  runs.sort()
230  return long(runs[len(runs)-2])
231 
232 
234 # regExp = re.search('(\D+)_(\d+)_(\d+)_(\d+)',fileName)
235  regExp = re.search('(\D+)_(\d+)_(\d+)_',fileName)
236  if not regExp:
237  return -1
238  return long(regExp.group(3))
239 
240 
242  regExp = re.search('(\D+)/(\d+)/(\d+)/(\d+)/(\D+)',fileName)
243  if not regExp:
244  return -1
245  return long(regExp.group(3)+regExp.group(4))
246 
247 
248 def getNewRunList(fromDir,lastUploadedIOV):
249  newRunList = []
250  listOfFiles = ls(fromDir,".txt")
251  runFileMap = {}
252  for fileName in listOfFiles:
253  runNumber = getRunNumberFromFileName(fileName)
254  if runNumber > lastUploadedIOV:
255  newRunList.append(fileName)
256  return newRunList
257 
258 
259 def selectFilesToProcess(listOfRunsAndLumiFromDBS,listOfRunsAndLumiFromRR,newRunList,runListDir,dataSet,mailList,dbsTolerance,dbsTolerancePercent,rrTolerance,missingFilesTolerance,missingLumisTimeout):
260  runsAndLumisProcessed = {}
261  runsAndFiles = {}
262  for fileName in newRunList:
263  file = open(runListDir+fileName)
264  for line in file:
265  if line.find("Runnumber") != -1:
266  run = long(line.replace('\n','').split(' ')[1])
267  elif line.find("LumiRange") != -1:
268  lumiLine = line.replace('\n','').split(' ')
269  begLumi = long(lumiLine[1])
270  endLumi = long(lumiLine[3])
271  if begLumi != endLumi:
272  error = "The lumi range is greater than 1 for run " + str(run) + " " + line + " in file: " + runListDir + fileName
273  exit(error)
274  else:
275  if not run in runsAndLumisProcessed:
276  runsAndLumisProcessed[run] = []
277  if begLumi in runsAndLumisProcessed[run]:
278  print("Lumi " + str(begLumi) + " in event " + str(run) + " already exist. This MUST not happen but right now I will ignore this lumi!")
279  else:
280  runsAndLumisProcessed[run].append(begLumi)
281  if not run in runsAndFiles:
282  runsAndFiles[run] = []
283  runsAndFiles[run].append(fileName)
284  file.close()
285 
286  rrKeys = sorted(listOfRunsAndLumiFromRR.keys())
287  dbsKeys = listOfRunsAndLumiFromDBS.keys()
288  dbsKeys.sort()
289  #I remove the last entry from DBS since I am not sure it is an already closed run!
290  lastUnclosedRun = dbsKeys.pop()
291  #print "Last unclosed run: " + str(lastUnclosedRun)
292  procKeys = runsAndLumisProcessed.keys()
293  procKeys.sort()
294  #print "Run Registry:"
295  #print rrKeys
296  #print "DBS:"
297  #print dbsKeys
298  #print "List:"
299  #print procKeys
300  #print lastUnclosedRun
301  filesToProcess = []
302  for run in rrKeys:
303  RRList = []
304  for lumiRange in listOfRunsAndLumiFromRR[run]:
305  if lumiRange != []:
306  for l in range(lumiRange[0],lumiRange[1]+1):
307  RRList.append(long(l))
308  if run in procKeys and run < lastUnclosedRun:
309  #print "run " + str(run) + " is in procKeys"
310  if not run in dbsKeys and run != lastUnclosedRun:
311  error = "Impossible but run " + str(run) + " has been processed and it is also in the run registry but it is not in DBS!"
312  exit(error)
313  print("Working on run " + str(run))
314  nFiles = 0
315  for data in dataSet.split(','):
316  nFiles = getNumberOfFilesToProcessForRun(data,run)
317  if nFiles != 0:
318  break
319  if len(runsAndFiles[run]) < nFiles:
320  print("I haven't processed all files yet : " + str(len(runsAndFiles[run])) + " out of " + str(nFiles) + " for run: " + str(run))
321  if nFiles - len(runsAndFiles[run]) <= missingFilesTolerance:
322  timeoutManager("DBS_VERY_BIG_MISMATCH_Run"+str(run)) # resetting this timeout
323  timeoutType = timeoutManager("DBS_MISMATCH_Run"+str(run),missingLumisTimeout)
324  if timeoutType == 1:
325  print("WARNING: I previously set a timeout that expired...I'll continue with the script even if I didn't process all the lumis!")
326  else:
327  if timeoutType == -1:
328  print("WARNING: Setting the DBS_MISMATCH_Run" + str(run) + " timeout because I haven't processed all files!")
329  else:
330  print("WARNING: Timeout DBS_MISMATCH_Run" + str(run) + " is in progress.")
331  return filesToProcess
332  else:
333  timeoutType = timeoutManager("DBS_VERY_BIG_MISMATCH_Run"+str(run),missingLumisTimeout)
334  if timeoutType == 1:
335  error = "ERROR: I previously set a timeout that expired...I can't continue with the script because there are too many (" + str(nFiles - len(runsAndFiles[run])) + " files missing) and for too long " + str(missingLumisTimeout/3600) + " hours! I will process anyway the runs before this one (" + str(run) + ")"
336  sendEmail(mailList,error)
337  return filesToProcess
338  #exit(error)
339  else:
340  if timeoutType == -1:
341  print("WARNING: Setting the DBS_VERY_BIG_MISMATCH_Run" + str(run) + " timeout because I haven't processed all files!")
342  else:
343  print("WARNING: Timeout DBS_VERY_BIG_MISMATCH_Run" + str(run) + " is in progress.")
344  return filesToProcess
345 
346  else:
347  timeoutManager("DBS_VERY_BIG_MISMATCH_Run"+str(run))
348  timeoutManager("DBS_MISMATCH_Run"+str(run))
349  print("I have processed " + str(len(runsAndFiles[run])) + " out of " + str(nFiles) + " files that are in DBS. So I should have all the lumis!")
350  errors = []
351  badProcessed = []
352  badDBSProcessed = []
353  badDBS = []
354  badRRProcessed = []
355  badRR = []
356  #It is important for runsAndLumisProcessed[run] to be the first because the comparision is not ==
357  badDBSProcessed,badDBS = compareLumiLists(runsAndLumisProcessed[run],listOfRunsAndLumiFromDBS[run],errors)
358  for i in range(0,len(errors)):
359  errors[i] = errors[i].replace("listA","the processed lumis")
360  errors[i] = errors[i].replace("listB","DBS")
361  #print errors
362  #print badProcessed
363  #print badDBS
364  #exit("ciao")
365  if len(badDBS) != 0:
366  print("This is weird because I processed more lumis than the ones that are in DBS!")
367  if len(badDBSProcessed) != 0 and run in rrKeys:
368  lastError = len(errors)
369  #print RRList
370  #It is important for runsAndLumisProcessed[run] to be the first because the comparision is not ==
371  badRRProcessed,badRR = compareLumiLists(runsAndLumisProcessed[run],RRList,errors)
372  for i in range(0,len(errors)):
373  errors[i] = errors[i].replace("listA","the processed lumis")
374  errors[i] = errors[i].replace("listB","Run Registry")
375  #print errors
376  #print badProcessed
377  #print badRunRegistry
378 
379  if len(badRRProcessed) != 0:
380  print("I have not processed some of the lumis that are in the run registry for run: " + str(run))
381  for lumi in badDBSProcessed:
382  if lumi in badRRProcessed:
383  badProcessed.append(lumi)
384  lenA = len(badProcessed)
385  lenB = len(RRList)
386  if 100.*lenA/lenB <= dbsTolerancePercent:
387  print("WARNING: I didn't process " + str(100.*lenA/lenB) + "% of the lumis but I am within the " + str(dbsTolerancePercent) + "% set in the configuration. Which corrispond to " + str(lenA) + " out of " + str(lenB) + " lumis")
388  #print errors
389  badProcessed = []
390  elif lenA <= dbsTolerance:
391  print("WARNING: I didn't process " + str(lenA) + " lumis but I am within the " + str(dbsTolerance) + " lumis set in the configuration. Which corrispond to " + str(lenA) + " out of " + str(lenB) + " lumis")
392  #print errors
393  badProcessed = []
394  else:
395  error = "ERROR: For run " + str(run) + " I didn't process " + str(100.*lenA/lenB) + "% of the lumis and I am not within the " + str(dbsTolerancePercent) + "% set in the configuration. The number of lumis that I didn't process (" + str(lenA) + " out of " + str(lenB) + ") is greater also than the " + str(dbsTolerance) + " lumis that I can tolerate. I can't process runs >= " + str(run) + " but I'll process the runs before!"
396  sendEmail(mailList,error)
397  print(error)
398  return filesToProcess
399  #exit(errors)
400  #return filesToProcess
401  elif len(errors) != 0:
402  print("The number of lumi sections processed didn't match the one in DBS but they cover all the ones in the Run Registry, so it is ok!")
403  #print errors
404 
405  #If I get here it means that I passed or the DBS or the RR test
406  if len(badProcessed) == 0:
407  for file in runsAndFiles[run]:
408  filesToProcess.append(file)
409  else:
410  #print errors
411  print("This should never happen because if I have errors I return or exit! Run: " + str(run))
412  else:
413  error = "Run " + str(run) + " is in the run registry but it has not been processed yet!"
414  print(error)
415  timeoutType = timeoutManager("MISSING_RUNREGRUN_Run"+str(run),missingLumisTimeout)
416  if timeoutType == 1:
417  if len(RRList) <= rrTolerance:
418  error = "WARNING: I previously set the MISSING_RUNREGRUN_Run" + str(run) + " timeout that expired...I am missing run " + str(run) + " but it only had " + str(len(RRList)) + " <= " + str(rrTolerance) + " lumis. So I will continue and ignore it... "
419  #print listOfRunsAndLumiFromRR[run]
420  print(error)
421  #sendEmail(mailList,error)
422  else:
423  error = "ERROR: I previously set the MISSING_RUNREGRUN_Run" + str(run) + " timeout that expired...I am missing run " + str(run) + " which has " + str(len(RRList)) + " > " + str(rrTolerance) + " lumis. I can't continue but I'll process the runs before this one"
424  sendEmail(mailList,error)
425  return filesToProcess
426  #exit(error)
427  else:
428  if timeoutType == -1:
429  print("WARNING: Setting the MISSING_RUNREGRUN_Run" + str(run) + " timeout because I haven't processed a run!")
430  else:
431  print("WARNING: Timeout MISSING_RUNREGRUN_Run" + str(run) + " is in progress.")
432  return filesToProcess
433 
434  return filesToProcess
435 
436 def compareLumiLists(listA,listB,errors=[],tolerance=0):
437  lenA = len(listA)
438  lenB = len(listB)
439  if lenA < lenB-(lenB*float(tolerance)/100):
440  errors.append("ERROR: The number of lumi sections is different: listA(" + str(lenA) + ")!=(" + str(lenB) + ")listB")
441  #else:
442  #errors.append("Lumi check ok!listA(" + str(lenA) + ")-(" + str(lenB) + ")listB")
443  #print errors
444  listA.sort()
445  listB.sort()
446  #shorter = lenA
447  #if lenB < shorter:
448  # shorter = lenB
449  #a = 0
450  #b = 0
451  badA = []
452  badB = []
453  #print listB
454  #print listA
455  #print len(listA)
456  #print len(listB)
457  #counter = 1
458  for lumi in listA:
459  #print str(counter) + "->" + str(lumi)
460  #counter += 1
461  if not lumi in listB:
462  errors.append("Lumi (" + str(lumi) + ") is in listA but not in listB")
463  badB.append(lumi)
464  #print "Bad B: " + str(lumi)
465  #exit("hola")
466  for lumi in listB:
467  if not lumi in listA:
468  errors.append("Lumi (" + str(lumi) + ") is in listB but not in listA")
469  badA.append(lumi)
470  #print "Bad A: " + str(lumi)
471 
472  return badA,badB
473 
474 
475 def removeUncompleteRuns(newRunList,dataSet):
476  processedRuns = {}
477  for fileName in newRunList:
478  run = getRunNumberFromFileName(fileName)
479  if not run in processedRuns:
480  processedRuns[run] = 0
481  processedRuns[run] += 1
482 
483  for run in processedRuns.keys():
484  nFiles = getNumberOfFilesToProcessForRun(dataSet,run)
485  if processedRuns[run] < nFiles:
486  print("I haven't processed all files yet : " + str(processedRuns[run]) + " out of " + str(nFiles) + " for run: " + str(run))
487  else:
488  print("All files have been processed for run: " + str(run) + " (" + str(processedRuns[run]) + " out of " + str(nFiles) + ")")
489 
490 
491 def aselectFilesToProcess(listOfFilesToProcess,newRunList):
492  selectedFiles = []
493  runsToProcess = {}
494  processedRuns = {}
495  for file in listOfFilesToProcess:
496  run = getRunNumberFromDBSName(file)
497 # print "To process: " + str(run)
498  if run not in runsToProcess:
499  runsToProcess[run] = 1
500  else:
501  runsToProcess[run] = runsToProcess[run] + 1
502 
503  for file in newRunList:
504  run = getRunNumberFromFileName(file)
505 # print "Processed: " + str(run)
506  if run not in processedRuns:
507  processedRuns[run] = 1
508  else:
509  processedRuns[run] = processedRuns[run] + 1
510 
511  #WARNING: getLastClosedRun MUST also have a timeout otherwise the last run will not be considered
512  lastClosedRun = getLastClosedRun(listOfFilesToProcess)
513 # print "LastClosedRun:-" + str(lastClosedRun) + "-"
514 
515  processedRunsKeys = sorted(processedRuns.keys())
516 
517  for run in processedRunsKeys:
518  if run <= lastClosedRun :
519  print("For run " + str(run) + " I have processed " + str(processedRuns[run]) + " files and in DBS there are " + str(runsToProcess[run]) + " files!")
520  if not run in runsToProcess:
521  exit("ERROR: I have a result file for run " + str(run) + " but it doesn't exist in DBS. Impossible but it happened!")
522  lumiList = getDBSLumiListForRun(run)
523  if processedRuns[run] == runsToProcess[run]:
524  for file in newRunList:
525  if run == getRunNumberFromFileName(file):
526  selectedFiles.append(file)
527  else:
528  exit("ERROR: For run " + str(run) + " I have processed " + str(processedRuns[run]) + " files but in DBS there are " + str(runsToProcess[run]) + " files!")
529  return selectedFiles
530 
531 
532 def main():
533 
534  option,args = parse(__doc__)
535 
536 
537  if option.lock:
538  setLockName('.' + option.lock)
539  if checkLock():
540  print("There is already a megascript runnning...exiting")
541  return
542  else:
543  lock()
544 
545 
546  destDB = 'oracle://cms_orcon_prod/CMS_COND_31X_BEAMSPOT'
547  if option.Test:
548  destDB = 'oracle://cms_orcoff_prep/CMS_COND_BEAMSPOT'
549 
550 
551  cfgFile = "BeamSpotWorkflow.cfg"
552  if option.cfg:
553  cfgFile = option.cfg
554  configurationFile = os.getenv("CMSSW_BASE") + "/src/RecoVertex/BeamSpotProducer/scripts/" + cfgFile
555  configuration = ConfigParser.ConfigParser()
556  print('Reading configuration from ', configurationFile)
557  configuration.read(configurationFile)
558 
559  sourceDir = configuration.get('Common','SOURCE_DIR')
560  archiveDir = configuration.get('Common','ARCHIVE_DIR')
561  workingDir = configuration.get('Common','WORKING_DIR')
562  databaseTag = configuration.get('Common','DBTAG')
563  dataSet = configuration.get('Common','DATASET')
564  fileIOVBase = configuration.get('Common','FILE_IOV_BASE')
565  dbIOVBase = configuration.get('Common','DB_IOV_BASE')
566  dbsTolerance = float(configuration.get('Common','DBS_TOLERANCE'))
567  dbsTolerancePercent = float(configuration.get('Common','DBS_TOLERANCE_PERCENT'))
568  rrTolerance = float(configuration.get('Common','RR_TOLERANCE'))
569  missingFilesTolerance = float(configuration.get('Common','MISSING_FILES_TOLERANCE'))
570  missingLumisTimeout = float(configuration.get('Common','MISSING_LUMIS_TIMEOUT'))
571  jsonFileName = configuration.get('Common','JSON_FILE')
572  mailList = configuration.get('Common','EMAIL')
573 
574 
575  if sourceDir[len(sourceDir)-1] != '/':
576  sourceDir = sourceDir + '/'
577  if not dirExists(sourceDir):
578  error = "ERROR: The source directory " + sourceDir + " doesn't exist!"
579  sendEmail(mailList,error)
580  exit(error)
581 
582  if archiveDir[len(archiveDir)-1] != '/':
583  archiveDir = archiveDir + '/'
584  if not os.path.isdir(archiveDir):
585  os.mkdir(archiveDir)
586 
587  if workingDir[len(workingDir)-1] != '/':
588  workingDir = workingDir + '/'
589  if not os.path.isdir(workingDir):
590  os.mkdir(workingDir)
591  else:
592  os.system("rm -f "+ workingDir + "*")
593 
594 
595  print("Getting last IOV for tag: " + databaseTag)
596  lastUploadedIOV = 1
597  if destDB == "oracle://cms_orcon_prod/CMS_COND_31X_BEAMSPOT":
598  lastUploadedIOV = getLastUploadedIOV(databaseTag)
599  else:
600  lastUploadedIOV = getLastUploadedIOV(databaseTag,destDB)
601 
602  #lastUploadedIOV = 133885
603  #lastUploadedIOV = 575216380019329
604  if dbIOVBase == "lumiid":
605  lastUploadedIOV = unpackLumiid(lastUploadedIOV)["run"]
606 
607 
608  print("Getting list of files processed after IOV " + str(lastUploadedIOV))
609  newProcessedRunList = getNewRunList(sourceDir,lastUploadedIOV)
610  if len(newProcessedRunList) == 0:
611  exit("There are no new runs after " + str(lastUploadedIOV))
612 
613 
614  print("Copying files to archive directory")
615  copiedFiles = []
616  for i in range(3):
617  copiedFiles = cp(sourceDir,archiveDir,newProcessedRunList)
618  if len(copiedFiles) == len(newProcessedRunList):
619  break;
620  if len(copiedFiles) != len(newProcessedRunList):
621  error = "ERROR: I can't copy more than " + str(len(copiedFiles)) + " files out of " + str(len(newProcessedRunList))
622  sendEmail(mailList,error)
623  exit(error)
624 
625 
626 
628  print("Getting list of files from DBS")
629  listOfRunsAndLumiFromDBS = getListOfRunsAndLumiFromDBS(dataSet,lastUploadedIOV)
630  if len(listOfRunsAndLumiFromDBS) == 0:
631  exit("There are no files in DBS to process")
632  print("Getting list of files from RR")
633  listOfRunsAndLumiFromRR = getListOfRunsAndLumiFromRR(lastUploadedIOV)
634  if(not listOfRunsAndLumiFromRR):
635  print("Looks like I can't get anything from the run registry so I'll get the data from the json file " + jsonFileName)
636  listOfRunsAndLumiFromRR = getListOfRunsAndLumiFromFile(lastUploadedIOV,jsonFileName)
637 
643  print("Getting list of files to process")
644  selectedFilesToProcess = selectFilesToProcess(listOfRunsAndLumiFromDBS,listOfRunsAndLumiFromRR,copiedFiles,archiveDir,dataSet,mailList,dbsTolerance,dbsTolerancePercent,rrTolerance,missingFilesTolerance,missingLumisTimeout)
645  if len(selectedFilesToProcess) == 0:
646  exit("There are no files to process")
647 
648  #print selectedFilesToProcess
649 
650  print("Copying files from archive to working directory")
651  copiedFiles = []
652  for i in range(3):
653  copiedFiles = cp(archiveDir,workingDir,selectedFilesToProcess)
654  if len(copiedFiles) == len(selectedFilesToProcess):
655  break;
656  else:
657  subprocess.getstatusoutput("rm -rf " + workingDir)
658  if len(copiedFiles) != len(selectedFilesToProcess):
659  error = "ERROR: I can't copy more than " + str(len(copiedFiles)) + " files out of " + str(len(selectedFilesToProcess)) + " from " + archiveDir + " to " + workingDir
660  sendEmail(mailList,error)
661  exit(error)
662 
663  print("Sorting and cleaning beamlist")
664  beamSpotObjList = []
665  for fileName in copiedFiles:
666  readBeamSpotFile(workingDir+fileName,beamSpotObjList,fileIOVBase)
667 
668  sortAndCleanBeamList(beamSpotObjList,fileIOVBase)
669 
670  if len(beamSpotObjList) == 0:
671  error = "WARNING: None of the processed and copied payloads has a valid fit so there are no results. This shouldn't happen since we are filtering using the run register, so there should be at least one good run."
672  exit(error)
673 
674  payloadFileName = "PayloadFile.txt"
675 
676  runBased = False
677  if dbIOVBase == "runnumber":
678  runBased = True
679 
680  payloadList = createWeightedPayloads(workingDir+payloadFileName,beamSpotObjList,runBased)
681  if len(payloadList) == 0:
682  error = "WARNING: I wasn't able to create any payload even if I have some BeamSpot objects."
683  exit(error)
684 
685 
686  tmpPayloadFileName = workingDir + "SingleTmpPayloadFile.txt"
687  tmpSqliteFileName = workingDir + "SingleTmpSqliteFile.db"
688 
689  writeDBTemplate = os.getenv("CMSSW_BASE") + "/src/RecoVertex/BeamSpotProducer/test/write2DB_template.py"
690  readDBTemplate = os.getenv("CMSSW_BASE") + "/src/RecoVertex/BeamSpotProducer/test/readDB_template.py"
691  payloadNumber = -1
692  iovSinceFirst = '0';
693  iovTillLast = '0';
694 
695  #Creating the final name for the combined sqlite file
696  uuid = subprocess.getstatusoutput('uuidgen -t')[1]
697  final_sqlite_file_name = databaseTag + '@' + uuid
698  sqlite_file = workingDir + final_sqlite_file_name + ".db"
699  metadata_file = workingDir + final_sqlite_file_name + ".txt"
700 
701  for payload in payloadList:
702  payloadNumber += 1
703  if option.zlarge:
704  payload.sigmaZ = 10
705  payload.sigmaZerr = 2.5e-05
706  tmpFile = file(tmpPayloadFileName,'w')
707  dumpValues(payload,tmpFile)
708  tmpFile.close()
709  if not writeSqliteFile(tmpSqliteFileName,databaseTag,dbIOVBase,tmpPayloadFileName,writeDBTemplate,workingDir):
710  error = "An error occurred while writing the sqlite file: " + tmpSqliteFileName
711  exit(error)
712  readSqliteFile(tmpSqliteFileName,databaseTag,readDBTemplate,workingDir)
713 
714 
716  if dbIOVBase == "runnumber":
717  iov_since = str(payload.Run)
718  iov_till = iov_since
719  elif dbIOVBase == "lumiid":
720  iov_since = str( pack(int(payload.Run), int(payload.IOVfirst)) )
721  iov_till = str( pack(int(payload.Run), int(payload.IOVlast)) )
722  elif dbIOVBase == "timestamp":
723  error = "ERROR: IOV " + dbIOVBase + " still not implemented."
724  exit(error)
725  else:
726  error = "ERROR: IOV " + dbIOVBase + " unrecognized!"
727  exit(error)
728 
729  if payloadNumber == 0:
730  iovSinceFirst = iov_since
731  if payloadNumber == len(payloadList)-1:
732  iovTillLast = iov_till
733 
734  appendSqliteFile(final_sqlite_file_name + ".db", tmpSqliteFileName, databaseTag, iov_since, iov_till ,workingDir)
735  os.system("rm -f " + tmpPayloadFileName + " " + tmpSqliteFileName)
736 
737 
738 
739 
740  print(" create MERGED payload card for dropbox ...")
741 
742  dfile = open(metadata_file,'w')
743 
744  dfile.write('destDB ' + destDB +'\n')
745  dfile.write('tag ' + databaseTag +'\n')
746  dfile.write('inputtag' +'\n')
747  dfile.write('since ' + iovSinceFirst +'\n')
748  #dfile.write('till ' + iov_till +'\n')
749  dfile.write('Timetype '+ dbIOVBase +'\n')
750 
751 
753  print("WARNING TAG TYPE forced to be just offline")
754  tagType = "offline"
755  checkType = tagType
756  if tagType == "express":
757  checkType = "hlt"
758  dfile.write('IOVCheck ' + checkType + '\n')
759  dfile.write('usertext Beam spot position\n')
760 
761  dfile.close()
762 
763 
764 
765  if option.upload:
766  print(" scp files to offline Drop Box")
767  dropbox = "/DropBox"
768  if option.Test:
769  dropbox = "/DropBox_test"
770  print("UPLOADING TO TEST DB")
771  uploadSqliteFile(workingDir, final_sqlite_file_name, dropbox)
772 
773  archive_sqlite_file_name = "Payloads_" + iovSinceFirst + "_" + iovTillLast + "_" + final_sqlite_file_name
774  archive_results_file_name = "Payloads_" + iovSinceFirst + "_" + iovTillLast + "_" + databaseTag + ".txt"
775  if not os.path.isdir(archiveDir + 'payloads'):
776  os.mkdir(archiveDir + 'payloads')
777  subprocess.getstatusoutput('mv ' + sqlite_file + ' ' + archiveDir + 'payloads/' + archive_sqlite_file_name + '.db')
778  subprocess.getstatusoutput('mv ' + metadata_file + ' ' + archiveDir + 'payloads/' + archive_sqlite_file_name + '.txt')
779  subprocess.getstatusoutput('cp ' + workingDir + payloadFileName + ' ' + archiveDir + 'payloads/' + archive_results_file_name)
780 
781  print(archiveDir + "payloads/" + archive_sqlite_file_name + '.db')
782  print(archiveDir + "payloads/" + archive_sqlite_file_name + '.txt')
783 
784  rmLock()
785 
786 if __name__ == '__main__':
787  main()
def pack(high, low)
vector< string > parse(string line, const string &delimiter)
def readBeamSpotFile(fileName, listbeam=[], IOVbase="runbase", firstRun='1', lastRun='4999999999')
def dumpValues(beam, file)
def getRunNumberFromFileName(fileName)
def getLastClosedRun(DBSListOfFiles)
def setLockName(name)
def replace(string, replacements)
def appendSqliteFile(combinedSqliteFileName, sqliteFileName, tagName, IOVSince, IOVTill, tmpDir="/tmp/")
def createWeightedPayloads(fileName, listbeam=[], weighted=True)
CREATE FILE FOR PAYLOADS.
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:19
def uploadSqliteFile(sqliteFileDirName, sqliteFileName, dropbox="/DropBox")
def getLastUploadedIOV(tagName, destDB="oracle://cms_orcoff_prod/CMS_COND_31X_BEAMSPOT")
General functions.
def getListOfRunsAndLumiFromFile(firstRun=-1, fileName="")
def compareLumiLists(listA, listB, errors=[], tolerance=0)
def sortAndCleanBeamList(listbeam=[], IOVbase="lumibase")
Sort and clean list of data for consecutive duplicates and bad fits.
def getNewRunList(fromDir, lastUploadedIOV)
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
def readSqliteFile(sqliteFileName, tagName, sqliteTemplateFile, tmpDir="/tmp/")
def getNumberOfFilesToProcessForRun(dataSet, run)
def getRunNumberFromDBSName(fileName)
def removeUncompleteRuns(newRunList, dataSet)
def getListOfFilesToProcess(dataSet, lastRun=-1)
def ls(path, rec=False)
Definition: eostools.py:349
def aselectFilesToProcess(listOfFilesToProcess, newRunList)
def getListOfRunsAndLumiFromDBS(dataSet, lastRun=-1)
int dirExists(const std::string &path)
Definition: Util.h:119
def selectFilesToProcess(listOfRunsAndLumiFromDBS, listOfRunsAndLumiFromRR, newRunList, runListDir, dataSet, mailList, dbsTolerance, dbsTolerancePercent, rrTolerance, missingFilesTolerance, missingLumisTimeout)
def timeoutManager(type, timeout=-1, fileName=".timeout")
def writeSqliteFile(sqliteFileName, tagName, timeType, beamSpotFile, sqliteTemplateFile, tmpDir="/tmp/")
def sendEmail(mailList, error)
General utilities.
Definition: main.py:1
def getListOfRunsAndLumiFromRR(firstRun=-1)
#define str(s)
def exit(msg="")