CMS 3D CMS Logo

BeamSpotWorkflow.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #____________________________________________________________
3 #
4 # BeamSpotWorkflow
5 #
6 # A very complicate way to automatize the beam spot workflow
7 #
8 # Francisco Yumiceva, Lorenzo Uplegger
9 # yumiceva@fnal.gov, uplegger@fnal.gov
10 #
11 # Fermilab, 2010
12 #
13 #____________________________________________________________
14 
15 """
16  BeamSpotWorkflow.py
17 
18  A very complicate script to upload the results into the DB
19 
20  usage: %prog -d <data file/directory> -t <tag name>
21  -c, --cfg = CFGFILE : Use a different configuration file than the default
22  -l, --lock = LOCK : Create a lock file to have just one script running
23  -o, --overwrite : Overwrite results files when copying.
24  -T, --Test : Upload files to Test dropbox for data validation.
25  -u, --upload : Upload files to offline drop box via scp.
26  -z, --zlarge : Enlarge sigmaZ to 10 +/- 0.005 cm.
27 
28  Francisco Yumiceva (yumiceva@fnal.gov)
29  Lorenzo Uplegger (send an email to Francisco)
30  Fermilab 2010
31 
32 """
33 
34 
35 import sys,os
36 import commands, re, time
37 import datetime
38 import ConfigParser
39 import xmlrpclib
40 from BeamSpotObj import BeamSpot
41 from IOVObj import IOV
42 from CommonMethods import *
43 
44 try: # FUTURE: Python 2.6, prior to 2.6 requires simplejson
45  import json
46 except:
47  try:
48  import simplejson as json
49  except:
50  error = "Please set a crab environment in order to get the proper JSON lib"
51  exit(error)
52 
53 #####################################################################################
54 # General functions
55 #####################################################################################
56 def getLastUploadedIOV(tagName,destDB="oracle://cms_orcoff_prod/CMS_COND_31X_BEAMSPOT"):
57  #return 582088327592295
58  listIOVCommand = "cmscond_list_iov -c " + destDB + " -P /afs/cern.ch/cms/DB/conddb -t " + tagName
59  dbError = commands.getstatusoutput( listIOVCommand )
60  if dbError[0] != 0 :
61  if dbError[1].find("metadata entry \"" + tagName + "\" does not exist") != -1:
62  print "Creating a new tag because I got the following error contacting the DB"
63  print dbError[1]
64  return 1
65  #return 133928
66  else:
67  exit("ERROR: Can\'t connect to db because:\n" + dbError[1])
68 
69 
70  aCommand = listIOVCommand + " | grep DB= | tail -1 | awk \'{print $1}\'"
71  output = commands.getstatusoutput( aCommand )
72 
73  #WARNING when we pass to lumi IOV this should be long long
74  if output[1] == '':
75  exit("ERROR: The tag " + tagName + " exists but I can't get the value of the last IOV")
76 
77  return long(output[1])
78 
79 ########################################################################
80 def getListOfFilesToProcess(dataSet,lastRun=-1):
81  queryCommand = "dbs --search --query \"find file where dataset=" + dataSet
82  if lastRun != -1:
83  queryCommand = queryCommand + " and run > " + str(lastRun)
84  queryCommand = queryCommand + "\" | grep .root"
85 # print " >> " + queryCommand
86  output = commands.getstatusoutput( queryCommand )
87  return output[1].split('\n')
88 
89 ########################################################################
91  queryCommand = "dbs --search --query \"find file where dataset=" + dataSet + " and run = " + str(run) + "\" | grep .root"
92  #print " >> " + queryCommand
93  output = commands.getstatusoutput( queryCommand )
94  if output[0] != 0:
95  return 0
96  else:
97  return len(output[1].split('\n'))
98 
99 ########################################################################
100 def getListOfRunsAndLumiFromDBS(dataSet,lastRun=-1):
101  datasetList = dataSet.split(',')
102  outputList = []
103  for data in datasetList:
104  queryCommand = "dbs --search --query \"find run,lumi where dataset=" + data
105  if lastRun != -1:
106  queryCommand = queryCommand + " and run > " + str(lastRun)
107  queryCommand += "\""
108  print " >> " + queryCommand
109  output = []
110  for i in range(0,3):
111  output = commands.getstatusoutput( queryCommand )
112  if output[0] == 0 and not (output[1].find("ERROR") != -1 or output[1].find("Error") != -1) :
113  break
114  if output[0] != 0:
115  exit("ERROR: I can't contact DBS for the following reason:\n" + output[1])
116  #print output[1]
117  tmpList = output[1].split('\n')
118  for file in tmpList:
119  outputList.append(file)
120  runsAndLumis = {}
121  for out in outputList:
122  regExp = re.search('(\d+)\s+(\d+)',out)
123  if regExp:
124  run = long(regExp.group(1))
125  lumi = long(regExp.group(2))
126  if not run in runsAndLumis:
127  runsAndLumis[run] = []
128  runsAndLumis[run].append(lumi)
129 
130 # print runsAndLumis
131 # exit("ok")
132  return runsAndLumis
133 
134 #####################################################################################
135 def getListOfRunsAndLumiFromFile(firstRun=-1,fileName=""):
136  file = open(fileName);
137  jsonFile = file.read();
138  file.close()
139  jsonList=json.loads(jsonFile);
140 
141  selected_dcs = {};
142  for element in jsonList:
143  selected_dcs[long(element)]=jsonList[element]
144  return selected_dcs
145 
146 ########################################################################
147 def getListOfRunsAndLumiFromRR(firstRun=-1):
148  RunReg ="http://pccmsdqm04.cern.ch/runregistry"
149  #RunReg = "http://localhost:40010/runregistry"
150  #Dataset=%Online%
151  Group = "Collisions10"
152 
153  # get handler to RR XML-RPC server
154  FULLADDRESS=RunReg + "/xmlrpc"
155  #print "RunRegistry from: ",FULLADDRESS
156  server = xmlrpclib.ServerProxy(FULLADDRESS)
157  #sel_runtable="{groupName} ='" + Group + "' and {runNumber} > " + str(firstRun) + " and {datasetName} LIKE '" + Dataset + "'"
158  sel_runtable="{groupName} ='" + Group + "' and {runNumber} > " + str(firstRun)
159  #sel_dcstable="{groupName} ='" + Group + "' and {runNumber} > " + str(firstRun) + " and {parDcsBpix} = 1 and {parDcsFpix} = 1 and {parDcsTibtid} = 1 and {parDcsTecM} = 1 and {parDcsTecP} = 1 and {parDcsTob} = 1 and {parDcsEbminus} = 1 and {parDcsEbplus} = 1 and {parDcsEeMinus} = 1 and {parDcsEePlus} = 1 and {parDcsEsMinus} = 1 and {parDcsEsPlus} = 1 and {parDcsHbheA} = 1 and {parDcsHbheB} = 1 and {parDcsHbheC} = 1 and {parDcsH0} = 1 and {parDcsHf} = 1"
160 
161  maxAttempts = 3;
162  tries = 0;
163  while tries<maxAttempts:
164  try:
165  run_data = server.DataExporter.export('RUN' , 'GLOBAL', 'csv_runs', sel_runtable)
166  #dcs_data = server.DataExporter.export('RUNLUMISECTION', 'GLOBAL', 'json' , sel_dcstable)
167  break
168  except:
169  print "Something wrong in accessing runregistry, retrying in 2s....", tries, "/", maxAttempts
170  tries += 1
171  time.sleep(2)
172  if tries==maxAttempts:
173  error = "Run registry unaccessible.....exiting now"
174  return {};
175 
176 
177  listOfRuns=[]
178  for line in run_data.split("\n"):
179  run=line.split(',')[0]
180  if run.isdigit():
181  listOfRuns.append(run)
182 
183 
184  firstRun = listOfRuns[len(listOfRuns)-1];
185  lastRun = listOfRuns[0];
186  sel_dcstable="{groupName} ='" + Group + "' and {runNumber} >= " + str(firstRun) + " and {runNumber} <= " + str(lastRun) + " and {parDcsBpix} = 1 and {parDcsFpix} = 1 and {parDcsTibtid} = 1 and {parDcsTecM} = 1 and {parDcsTecP} = 1 and {parDcsTob} = 1 and {parDcsEbminus} = 1 and {parDcsEbplus} = 1 and {parDcsEeMinus} = 1 and {parDcsEePlus} = 1 and {parDcsEsMinus} = 1 and {parDcsEsPlus} = 1 and {parDcsHbheA} = 1 and {parDcsHbheB} = 1 and {parDcsHbheC} = 1 and {parDcsH0} = 1 and {parDcsHf} = 1"
187 
188  tries = 0;
189  while tries<maxAttempts:
190  try:
191  #run_data = server.DataExporter.export('RUN' , 'GLOBAL', 'csv_runs', sel_runtable)
192  dcs_data = server.DataExporter.export('RUNLUMISECTION', 'GLOBAL', 'json' , sel_dcstable)
193  break
194  except:
195  print "I was able to get the list of runs and now I am trying to access the detector status, retrying in 2s....", tries, "/", maxAttempts
196  tries += 1
197  time.sleep(2)
198  if tries==maxAttempts:
199  error = "Run registry unaccessible.....exiting now"
200  return {};
201 
202  selected_dcs={}
203  jsonList=json.loads(dcs_data)
204 
205  #for element in jsonList:
206  for element in listOfRuns:
207  #if element in listOfRuns:
208  if element in jsonList:
209  selected_dcs[long(element)]=jsonList[element]
210  else:
211  print "WARNING: Run " + element + " is a collision10 run with 0 lumis in Run Registry!"
212  selected_dcs[long(element)]= [[]]
213  #print selected_dcs
214  return selected_dcs
215 
216 ########################################################################
217 def getLastClosedRun(DBSListOfFiles):
218  runs = []
219  for file in DBSListOfFiles:
220  runNumber = getRunNumberFromDBSName(file)
221  if runs.count(runNumber) == 0:
222  runs.append(runNumber)
223 
224  if len(runs) <= 1: #No closed run
225  return -1
226  else:
227  runs.sort()
228  return long(runs[len(runs)-2])
229 
230 ########################################################################
232 # regExp = re.search('(\D+)_(\d+)_(\d+)_(\d+)',fileName)
233  regExp = re.search('(\D+)_(\d+)_(\d+)_',fileName)
234  if not regExp:
235  return -1
236  return long(regExp.group(3))
237 
238 ########################################################################
240  regExp = re.search('(\D+)/(\d+)/(\d+)/(\d+)/(\D+)',fileName)
241  if not regExp:
242  return -1
243  return long(regExp.group(3)+regExp.group(4))
244 
245 ########################################################################
246 def getNewRunList(fromDir,lastUploadedIOV):
247  newRunList = []
248  listOfFiles = ls(fromDir,".txt")
249  runFileMap = {}
250  for fileName in listOfFiles:
251  runNumber = getRunNumberFromFileName(fileName)
252  if runNumber > lastUploadedIOV:
253  newRunList.append(fileName)
254  return newRunList
255 
256 ########################################################################
257 def selectFilesToProcess(listOfRunsAndLumiFromDBS,listOfRunsAndLumiFromRR,newRunList,runListDir,dataSet,mailList,dbsTolerance,dbsTolerancePercent,rrTolerance,missingFilesTolerance,missingLumisTimeout):
258  runsAndLumisProcessed = {}
259  runsAndFiles = {}
260  for fileName in newRunList:
261  file = open(runListDir+fileName)
262  for line in file:
263  if line.find("Runnumber") != -1:
264  run = long(line.replace('\n','').split(' ')[1])
265  elif line.find("LumiRange") != -1:
266  lumiLine = line.replace('\n','').split(' ')
267  begLumi = long(lumiLine[1])
268  endLumi = long(lumiLine[3])
269  if begLumi != endLumi:
270  error = "The lumi range is greater than 1 for run " + str(run) + " " + line + " in file: " + runListDir + fileName
271  exit(error)
272  else:
273  if not run in runsAndLumisProcessed:
274  runsAndLumisProcessed[run] = []
275  if begLumi in runsAndLumisProcessed[run]:
276  print "Lumi " + str(begLumi) + " in event " + str(run) + " already exist. This MUST not happen but right now I will ignore this lumi!"
277  else:
278  runsAndLumisProcessed[run].append(begLumi)
279  if not run in runsAndFiles:
280  runsAndFiles[run] = []
281  runsAndFiles[run].append(fileName)
282  file.close()
283 
284  rrKeys = sorted(listOfRunsAndLumiFromRR.keys())
285  dbsKeys = listOfRunsAndLumiFromDBS.keys()
286  dbsKeys.sort()
287  #I remove the last entry from DBS since I am not sure it is an already closed run!
288  lastUnclosedRun = dbsKeys.pop()
289  #print "Last unclosed run: " + str(lastUnclosedRun)
290  procKeys = runsAndLumisProcessed.keys()
291  procKeys.sort()
292  #print "Run Registry:"
293  #print rrKeys
294  #print "DBS:"
295  #print dbsKeys
296  #print "List:"
297  #print procKeys
298  #print lastUnclosedRun
299  filesToProcess = []
300  for run in rrKeys:
301  RRList = []
302  for lumiRange in listOfRunsAndLumiFromRR[run]:
303  if lumiRange != []:
304  for l in range(lumiRange[0],lumiRange[1]+1):
305  RRList.append(long(l))
306  if run in procKeys and run < lastUnclosedRun:
307  #print "run " + str(run) + " is in procKeys"
308  if not run in dbsKeys and run != lastUnclosedRun:
309  error = "Impossible but run " + str(run) + " has been processed and it is also in the run registry but it is not in DBS!"
310  exit(error)
311  print "Working on run " + str(run)
312  nFiles = 0
313  for data in dataSet.split(','):
314  nFiles = getNumberOfFilesToProcessForRun(data,run)
315  if nFiles != 0:
316  break
317  if len(runsAndFiles[run]) < nFiles:
318  print "I haven't processed all files yet : " + str(len(runsAndFiles[run])) + " out of " + str(nFiles) + " for run: " + str(run)
319  if nFiles - len(runsAndFiles[run]) <= missingFilesTolerance:
320  timeoutManager("DBS_VERY_BIG_MISMATCH_Run"+str(run)) # resetting this timeout
321  timeoutType = timeoutManager("DBS_MISMATCH_Run"+str(run),missingLumisTimeout)
322  if timeoutType == 1:
323  print "WARNING: I previously set a timeout that expired...I'll continue with the script even if I didn't process all the lumis!"
324  else:
325  if timeoutType == -1:
326  print "WARNING: Setting the DBS_MISMATCH_Run" + str(run) + " timeout because I haven't processed all files!"
327  else:
328  print "WARNING: Timeout DBS_MISMATCH_Run" + str(run) + " is in progress."
329  return filesToProcess
330  else:
331  timeoutType = timeoutManager("DBS_VERY_BIG_MISMATCH_Run"+str(run),missingLumisTimeout)
332  if timeoutType == 1:
333  error = "ERROR: I previously set a timeout that expired...I can't continue with the script because there are too many (" + str(nFiles - len(runsAndFiles[run])) + " files missing) and for too long " + str(missingLumisTimeout/3600) + " hours! I will process anyway the runs before this one (" + str(run) + ")"
334  sendEmail(mailList,error)
335  return filesToProcess
336  #exit(error)
337  else:
338  if timeoutType == -1:
339  print "WARNING: Setting the DBS_VERY_BIG_MISMATCH_Run" + str(run) + " timeout because I haven't processed all files!"
340  else:
341  print "WARNING: Timeout DBS_VERY_BIG_MISMATCH_Run" + str(run) + " is in progress."
342  return filesToProcess
343 
344  else:
345  timeoutManager("DBS_VERY_BIG_MISMATCH_Run"+str(run))
346  timeoutManager("DBS_MISMATCH_Run"+str(run))
347  print "I have processed " + str(len(runsAndFiles[run])) + " out of " + str(nFiles) + " files that are in DBS. So I should have all the lumis!"
348  errors = []
349  badProcessed = []
350  badDBSProcessed = []
351  badDBS = []
352  badRRProcessed = []
353  badRR = []
354  #It is important for runsAndLumisProcessed[run] to be the first because the comparision is not ==
355  badDBSProcessed,badDBS = compareLumiLists(runsAndLumisProcessed[run],listOfRunsAndLumiFromDBS[run],errors)
356  for i in range(0,len(errors)):
357  errors[i] = errors[i].replace("listA","the processed lumis")
358  errors[i] = errors[i].replace("listB","DBS")
359  #print errors
360  #print badProcessed
361  #print badDBS
362  #exit("ciao")
363  if len(badDBS) != 0:
364  print "This is weird because I processed more lumis than the ones that are in DBS!"
365  if len(badDBSProcessed) != 0 and run in rrKeys:
366  lastError = len(errors)
367  #print RRList
368  #It is important for runsAndLumisProcessed[run] to be the first because the comparision is not ==
369  badRRProcessed,badRR = compareLumiLists(runsAndLumisProcessed[run],RRList,errors)
370  for i in range(0,len(errors)):
371  errors[i] = errors[i].replace("listA","the processed lumis")
372  errors[i] = errors[i].replace("listB","Run Registry")
373  #print errors
374  #print badProcessed
375  #print badRunRegistry
376 
377  if len(badRRProcessed) != 0:
378  print "I have not processed some of the lumis that are in the run registry for run: " + str(run)
379  for lumi in badDBSProcessed:
380  if lumi in badRRProcessed:
381  badProcessed.append(lumi)
382  lenA = len(badProcessed)
383  lenB = len(RRList)
384  if 100.*lenA/lenB <= dbsTolerancePercent:
385  print "WARNING: I didn't process " + str(100.*lenA/lenB) + "% of the lumis but I am within the " + str(dbsTolerancePercent) + "% set in the configuration. Which corrispond to " + str(lenA) + " out of " + str(lenB) + " lumis"
386  #print errors
387  badProcessed = []
388  elif lenA <= dbsTolerance:
389  print "WARNING: I didn't process " + str(lenA) + " lumis but I am within the " + str(dbsTolerance) + " lumis set in the configuration. Which corrispond to " + str(lenA) + " out of " + str(lenB) + " lumis"
390  #print errors
391  badProcessed = []
392  else:
393  error = "ERROR: For run " + str(run) + " I didn't process " + str(100.*lenA/lenB) + "% of the lumis and I am not within the " + str(dbsTolerancePercent) + "% set in the configuration. The number of lumis that I didn't process (" + str(lenA) + " out of " + str(lenB) + ") is greater also than the " + str(dbsTolerance) + " lumis that I can tolerate. I can't process runs >= " + str(run) + " but I'll process the runs before!"
394  sendEmail(mailList,error)
395  print error
396  return filesToProcess
397  #exit(errors)
398  #return filesToProcess
399  elif len(errors) != 0:
400  print "The number of lumi sections processed didn't match the one in DBS but they cover all the ones in the Run Registry, so it is ok!"
401  #print errors
402 
403  #If I get here it means that I passed or the DBS or the RR test
404  if len(badProcessed) == 0:
405  for file in runsAndFiles[run]:
406  filesToProcess.append(file)
407  else:
408  #print errors
409  print "This should never happen because if I have errors I return or exit! Run: " + str(run)
410  else:
411  error = "Run " + str(run) + " is in the run registry but it has not been processed yet!"
412  print error
413  timeoutType = timeoutManager("MISSING_RUNREGRUN_Run"+str(run),missingLumisTimeout)
414  if timeoutType == 1:
415  if len(RRList) <= rrTolerance:
416  error = "WARNING: I previously set the MISSING_RUNREGRUN_Run" + str(run) + " timeout that expired...I am missing run " + str(run) + " but it only had " + str(len(RRList)) + " <= " + str(rrTolerance) + " lumis. So I will continue and ignore it... "
417  #print listOfRunsAndLumiFromRR[run]
418  print error
419  #sendEmail(mailList,error)
420  else:
421  error = "ERROR: I previously set the MISSING_RUNREGRUN_Run" + str(run) + " timeout that expired...I am missing run " + str(run) + " which has " + str(len(RRList)) + " > " + str(rrTolerance) + " lumis. I can't continue but I'll process the runs before this one"
422  sendEmail(mailList,error)
423  return filesToProcess
424  #exit(error)
425  else:
426  if timeoutType == -1:
427  print "WARNING: Setting the MISSING_RUNREGRUN_Run" + str(run) + " timeout because I haven't processed a run!"
428  else:
429  print "WARNING: Timeout MISSING_RUNREGRUN_Run" + str(run) + " is in progress."
430  return filesToProcess
431 
432  return filesToProcess
433 ########################################################################
434 def compareLumiLists(listA,listB,errors=[],tolerance=0):
435  lenA = len(listA)
436  lenB = len(listB)
437  if lenA < lenB-(lenB*float(tolerance)/100):
438  errors.append("ERROR: The number of lumi sections is different: listA(" + str(lenA) + ")!=(" + str(lenB) + ")listB")
439  #else:
440  #errors.append("Lumi check ok!listA(" + str(lenA) + ")-(" + str(lenB) + ")listB")
441  #print errors
442  listA.sort()
443  listB.sort()
444  #shorter = lenA
445  #if lenB < shorter:
446  # shorter = lenB
447  #a = 0
448  #b = 0
449  badA = []
450  badB = []
451  #print listB
452  #print listA
453  #print len(listA)
454  #print len(listB)
455  #counter = 1
456  for lumi in listA:
457  #print str(counter) + "->" + str(lumi)
458  #counter += 1
459  if not lumi in listB:
460  errors.append("Lumi (" + str(lumi) + ") is in listA but not in listB")
461  badB.append(lumi)
462  #print "Bad B: " + str(lumi)
463  #exit("hola")
464  for lumi in listB:
465  if not lumi in listA:
466  errors.append("Lumi (" + str(lumi) + ") is in listB but not in listA")
467  badA.append(lumi)
468  #print "Bad A: " + str(lumi)
469 
470  return badA,badB
471 
472 ########################################################################
473 def removeUncompleteRuns(newRunList,dataSet):
474  processedRuns = {}
475  for fileName in newRunList:
476  run = getRunNumberFromFileName(fileName)
477  if not run in processedRuns:
478  processedRuns[run] = 0
479  processedRuns[run] += 1
480 
481  for run in processedRuns.keys():
482  nFiles = getNumberOfFilesToProcessForRun(dataSet,run)
483  if processedRuns[run] < nFiles:
484  print "I haven't processed all files yet : " + str(processedRuns[run]) + " out of " + str(nFiles) + " for run: " + str(run)
485  else:
486  print "All files have been processed for run: " + str(run) + " (" + str(processedRuns[run]) + " out of " + str(nFiles) + ")"
487 
488 ########################################################################
489 def aselectFilesToProcess(listOfFilesToProcess,newRunList):
490  selectedFiles = []
491  runsToProcess = {}
492  processedRuns = {}
493  for file in listOfFilesToProcess:
494  run = getRunNumberFromDBSName(file)
495 # print "To process: " + str(run)
496  if run not in runsToProcess:
497  runsToProcess[run] = 1
498  else:
499  runsToProcess[run] = runsToProcess[run] + 1
500 
501  for file in newRunList:
502  run = getRunNumberFromFileName(file)
503 # print "Processed: " + str(run)
504  if run not in processedRuns:
505  processedRuns[run] = 1
506  else:
507  processedRuns[run] = processedRuns[run] + 1
508 
509  #WARNING: getLastClosedRun MUST also have a timeout otherwise the last run will not be considered
510  lastClosedRun = getLastClosedRun(listOfFilesToProcess)
511 # print "LastClosedRun:-" + str(lastClosedRun) + "-"
512 
513  processedRunsKeys = sorted(processedRuns.keys())
514 
515  for run in processedRunsKeys:
516  if run <= lastClosedRun :
517  print "For run " + str(run) + " I have processed " + str(processedRuns[run]) + " files and in DBS there are " + str(runsToProcess[run]) + " files!"
518  if not run in runsToProcess:
519  exit("ERROR: I have a result file for run " + str(run) + " but it doesn't exist in DBS. Impossible but it happened!")
520  lumiList = getDBSLumiListForRun(run)
521  if processedRuns[run] == runsToProcess[run]:
522  for file in newRunList:
523  if run == getRunNumberFromFileName(file):
524  selectedFiles.append(file)
525  else:
526  exit("ERROR: For run " + str(run) + " I have processed " + str(processedRuns[run]) + " files but in DBS there are " + str(runsToProcess[run]) + " files!")
527  return selectedFiles
528 
529 ########################################################################
530 def main():
531  ######### COMMAND LINE OPTIONS ##############
532  option,args = parse(__doc__)
533 
534  ######### Check if there is already a megascript running ########
535  if option.lock:
536  setLockName('.' + option.lock)
537  if checkLock():
538  print "There is already a megascript runnning...exiting"
539  return
540  else:
541  lock()
542 
543 
544  destDB = 'oracle://cms_orcon_prod/CMS_COND_31X_BEAMSPOT'
545  if option.Test:
546  destDB = 'oracle://cms_orcoff_prep/CMS_COND_BEAMSPOT'
547 
548  ######### CONFIGURATION FILE ################
549  cfgFile = "BeamSpotWorkflow.cfg"
550  if option.cfg:
551  cfgFile = option.cfg
552  configurationFile = os.getenv("CMSSW_BASE") + "/src/RecoVertex/BeamSpotProducer/scripts/" + cfgFile
553  configuration = ConfigParser.ConfigParser()
554  print 'Reading configuration from ', configurationFile
555  configuration.read(configurationFile)
556 
557  sourceDir = configuration.get('Common','SOURCE_DIR')
558  archiveDir = configuration.get('Common','ARCHIVE_DIR')
559  workingDir = configuration.get('Common','WORKING_DIR')
560  databaseTag = configuration.get('Common','DBTAG')
561  dataSet = configuration.get('Common','DATASET')
562  fileIOVBase = configuration.get('Common','FILE_IOV_BASE')
563  dbIOVBase = configuration.get('Common','DB_IOV_BASE')
564  dbsTolerance = float(configuration.get('Common','DBS_TOLERANCE'))
565  dbsTolerancePercent = float(configuration.get('Common','DBS_TOLERANCE_PERCENT'))
566  rrTolerance = float(configuration.get('Common','RR_TOLERANCE'))
567  missingFilesTolerance = float(configuration.get('Common','MISSING_FILES_TOLERANCE'))
568  missingLumisTimeout = float(configuration.get('Common','MISSING_LUMIS_TIMEOUT'))
569  jsonFileName = configuration.get('Common','JSON_FILE')
570  mailList = configuration.get('Common','EMAIL')
571 
572  ######### DIRECTORIES SETUP #################
573  if sourceDir[len(sourceDir)-1] != '/':
574  sourceDir = sourceDir + '/'
575  if not dirExists(sourceDir):
576  error = "ERROR: The source directory " + sourceDir + " doesn't exist!"
577  sendEmail(mailList,error)
578  exit(error)
579 
580  if archiveDir[len(archiveDir)-1] != '/':
581  archiveDir = archiveDir + '/'
582  if not os.path.isdir(archiveDir):
583  os.mkdir(archiveDir)
584 
585  if workingDir[len(workingDir)-1] != '/':
586  workingDir = workingDir + '/'
587  if not os.path.isdir(workingDir):
588  os.mkdir(workingDir)
589  else:
590  os.system("rm -f "+ workingDir + "*")
591 
592 
593  print "Getting last IOV for tag: " + databaseTag
594  lastUploadedIOV = 1
595  if destDB == "oracle://cms_orcon_prod/CMS_COND_31X_BEAMSPOT":
596  lastUploadedIOV = getLastUploadedIOV(databaseTag)
597  else:
598  lastUploadedIOV = getLastUploadedIOV(databaseTag,destDB)
599 
600  #lastUploadedIOV = 133885
601  #lastUploadedIOV = 575216380019329
602  if dbIOVBase == "lumiid":
603  lastUploadedIOV = unpackLumiid(lastUploadedIOV)["run"]
604 
605  ######### Get list of files processed after the last IOV
606  print "Getting list of files processed after IOV " + str(lastUploadedIOV)
607  newProcessedRunList = getNewRunList(sourceDir,lastUploadedIOV)
608  if len(newProcessedRunList) == 0:
609  exit("There are no new runs after " + str(lastUploadedIOV))
610 
611  ######### Copy files to archive directory
612  print "Copying files to archive directory"
613  copiedFiles = []
614  for i in range(3):
615  copiedFiles = cp(sourceDir,archiveDir,newProcessedRunList)
616  if len(copiedFiles) == len(newProcessedRunList):
617  break;
618  if len(copiedFiles) != len(newProcessedRunList):
619  error = "ERROR: I can't copy more than " + str(len(copiedFiles)) + " files out of " + str(len(newProcessedRunList))
620  sendEmail(mailList,error)
621  exit(error)
622 
623 
624  ######### Get from DBS the list of files after last IOV
625  #listOfFilesToProcess = getListOfFilesToProcess(dataSet,lastUploadedIOV)
626  print "Getting list of files from DBS"
627  listOfRunsAndLumiFromDBS = getListOfRunsAndLumiFromDBS(dataSet,lastUploadedIOV)
628  if len(listOfRunsAndLumiFromDBS) == 0:
629  exit("There are no files in DBS to process")
630  print "Getting list of files from RR"
631  listOfRunsAndLumiFromRR = getListOfRunsAndLumiFromRR(lastUploadedIOV)
632  if(not listOfRunsAndLumiFromRR):
633  print "Looks like I can't get anything from the run registry so I'll get the data from the json file " + jsonFileName
634  listOfRunsAndLumiFromRR = getListOfRunsAndLumiFromFile(lastUploadedIOV,jsonFileName)
635  ######### Get list of files to process for DB
636  #selectedFilesToProcess = selectFilesToProcess(listOfFilesToProcess,copiedFiles)
637  #completeProcessedRuns = removeUncompleteRuns(copiedFiles,dataSet)
638  #print copiedFiles
639  #print completeProcessedRuns
640  #exit("complete")
641  print "Getting list of files to process"
642  selectedFilesToProcess = selectFilesToProcess(listOfRunsAndLumiFromDBS,listOfRunsAndLumiFromRR,copiedFiles,archiveDir,dataSet,mailList,dbsTolerance,dbsTolerancePercent,rrTolerance,missingFilesTolerance,missingLumisTimeout)
643  if len(selectedFilesToProcess) == 0:
644  exit("There are no files to process")
645 
646  #print selectedFilesToProcess
647  ######### Copy files to working directory
648  print "Copying files from archive to working directory"
649  copiedFiles = []
650  for i in range(3):
651  copiedFiles = cp(archiveDir,workingDir,selectedFilesToProcess)
652  if len(copiedFiles) == len(selectedFilesToProcess):
653  break;
654  else:
655  commands.getstatusoutput("rm -rf " + workingDir)
656  if len(copiedFiles) != len(selectedFilesToProcess):
657  error = "ERROR: I can't copy more than " + str(len(copiedFiles)) + " files out of " + str(len(selectedFilesToProcess)) + " from " + archiveDir + " to " + workingDir
658  sendEmail(mailList,error)
659  exit(error)
660 
661  print "Sorting and cleaning beamlist"
662  beamSpotObjList = []
663  for fileName in copiedFiles:
664  readBeamSpotFile(workingDir+fileName,beamSpotObjList,fileIOVBase)
665 
666  sortAndCleanBeamList(beamSpotObjList,fileIOVBase)
667 
668  if len(beamSpotObjList) == 0:
669  error = "WARNING: None of the processed and copied payloads has a valid fit so there are no results. This shouldn't happen since we are filtering using the run register, so there should be at least one good run."
670  exit(error)
671 
672  payloadFileName = "PayloadFile.txt"
673 
674  runBased = False
675  if dbIOVBase == "runnumber":
676  runBased = True
677 
678  payloadList = createWeightedPayloads(workingDir+payloadFileName,beamSpotObjList,runBased)
679  if len(payloadList) == 0:
680  error = "WARNING: I wasn't able to create any payload even if I have some BeamSpot objects."
681  exit(error)
682 
683 
684  tmpPayloadFileName = workingDir + "SingleTmpPayloadFile.txt"
685  tmpSqliteFileName = workingDir + "SingleTmpSqliteFile.db"
686 
687  writeDBTemplate = os.getenv("CMSSW_BASE") + "/src/RecoVertex/BeamSpotProducer/test/write2DB_template.py"
688  readDBTemplate = os.getenv("CMSSW_BASE") + "/src/RecoVertex/BeamSpotProducer/test/readDB_template.py"
689  payloadNumber = -1
690  iovSinceFirst = '0';
691  iovTillLast = '0';
692 
693  #Creating the final name for the combined sqlite file
694  uuid = commands.getstatusoutput('uuidgen -t')[1]
695  final_sqlite_file_name = databaseTag + '@' + uuid
696  sqlite_file = workingDir + final_sqlite_file_name + ".db"
697  metadata_file = workingDir + final_sqlite_file_name + ".txt"
698 
699  for payload in payloadList:
700  payloadNumber += 1
701  if option.zlarge:
702  payload.sigmaZ = 10
703  payload.sigmaZerr = 2.5e-05
704  tmpFile = file(tmpPayloadFileName,'w')
705  dumpValues(payload,tmpFile)
706  tmpFile.close()
707  if not writeSqliteFile(tmpSqliteFileName,databaseTag,dbIOVBase,tmpPayloadFileName,writeDBTemplate,workingDir):
708  error = "An error occurred while writing the sqlite file: " + tmpSqliteFileName
709  exit(error)
710  readSqliteFile(tmpSqliteFileName,databaseTag,readDBTemplate,workingDir)
711 
712  ##############################################################
713  #WARNING I am not sure if I am packing the right values
714  if dbIOVBase == "runnumber":
715  iov_since = str(payload.Run)
716  iov_till = iov_since
717  elif dbIOVBase == "lumiid":
718  iov_since = str( pack(int(payload.Run), int(payload.IOVfirst)) )
719  iov_till = str( pack(int(payload.Run), int(payload.IOVlast)) )
720  elif dbIOVBase == "timestamp":
721  error = "ERROR: IOV " + dbIOVBase + " still not implemented."
722  exit(error)
723  else:
724  error = "ERROR: IOV " + dbIOVBase + " unrecognized!"
725  exit(error)
726 
727  if payloadNumber == 0:
728  iovSinceFirst = iov_since
729  if payloadNumber == len(payloadList)-1:
730  iovTillLast = iov_till
731 
732  appendSqliteFile(final_sqlite_file_name + ".db", tmpSqliteFileName, databaseTag, iov_since, iov_till ,workingDir)
733  os.system("rm -f " + tmpPayloadFileName + " " + tmpSqliteFileName)
734 
735 
736  #### CREATE payload for merged output
737 
738  print " create MERGED payload card for dropbox ..."
739 
740  dfile = open(metadata_file,'w')
741 
742  dfile.write('destDB ' + destDB +'\n')
743  dfile.write('tag ' + databaseTag +'\n')
744  dfile.write('inputtag' +'\n')
745  dfile.write('since ' + iovSinceFirst +'\n')
746  #dfile.write('till ' + iov_till +'\n')
747  dfile.write('Timetype '+ dbIOVBase +'\n')
748 
749  ###################################################
750  # WARNING tagType forced to offline
751  print "WARNING TAG TYPE forced to be just offline"
752  tagType = "offline"
753  checkType = tagType
754  if tagType == "express":
755  checkType = "hlt"
756  dfile.write('IOVCheck ' + checkType + '\n')
757  dfile.write('usertext Beam spot position\n')
758 
759  dfile.close()
760 
761 
762 
763  if option.upload:
764  print " scp files to offline Drop Box"
765  dropbox = "/DropBox"
766  if option.Test:
767  dropbox = "/DropBox_test"
768  print "UPLOADING TO TEST DB"
769  uploadSqliteFile(workingDir, final_sqlite_file_name, dropbox)
770 
771  archive_sqlite_file_name = "Payloads_" + iovSinceFirst + "_" + iovTillLast + "_" + final_sqlite_file_name
772  archive_results_file_name = "Payloads_" + iovSinceFirst + "_" + iovTillLast + "_" + databaseTag + ".txt"
773  if not os.path.isdir(archiveDir + 'payloads'):
774  os.mkdir(archiveDir + 'payloads')
775  commands.getstatusoutput('mv ' + sqlite_file + ' ' + archiveDir + 'payloads/' + archive_sqlite_file_name + '.db')
776  commands.getstatusoutput('mv ' + metadata_file + ' ' + archiveDir + 'payloads/' + archive_sqlite_file_name + '.txt')
777  commands.getstatusoutput('cp ' + workingDir + payloadFileName + ' ' + archiveDir + 'payloads/' + archive_results_file_name)
778 
779  print archiveDir + "payloads/" + archive_sqlite_file_name + '.db'
780  print archiveDir + "payloads/" + archive_sqlite_file_name + '.txt'
781 
782  rmLock()
783 
784 if __name__ == '__main__':
785  main()
def dirExists(dir)
def pack(high, low)
def readBeamSpotFile(fileName, listbeam=[], IOVbase="runbase", firstRun='1', lastRun='4999999999')
def dumpValues(beam, file)
def getRunNumberFromFileName(fileName)
def getLastClosedRun(DBSListOfFiles)
def setLockName(name)
def replace(string, replacements)
def appendSqliteFile(combinedSqliteFileName, sqliteFileName, tagName, IOVSince, IOVTill, tmpDir="/tmp/")
def createWeightedPayloads(fileName, listbeam=[], weighted=True)
CREATE FILE FOR PAYLOADS.
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:20
def uploadSqliteFile(sqliteFileDirName, sqliteFileName, dropbox="/DropBox")
def getLastUploadedIOV(tagName, destDB="oracle://cms_orcoff_prod/CMS_COND_31X_BEAMSPOT")
General functions.
def getListOfRunsAndLumiFromFile(firstRun=-1, fileName="")
def compareLumiLists(listA, listB, errors=[], tolerance=0)
def sortAndCleanBeamList(listbeam=[], IOVbase="lumibase")
Sort and clean list of data for consecutive duplicates and bad fits.
def getNewRunList(fromDir, lastUploadedIOV)
def readSqliteFile(sqliteFileName, tagName, sqliteTemplateFile, tmpDir="/tmp/")
def getNumberOfFilesToProcessForRun(dataSet, run)
def getRunNumberFromDBSName(fileName)
def removeUncompleteRuns(newRunList, dataSet)
def getListOfFilesToProcess(dataSet, lastRun=-1)
def ls(path, rec=False)
Definition: eostools.py:348
def aselectFilesToProcess(listOfFilesToProcess, newRunList)
def parse(path, config)
Definition: dumpparser.py:13
def getListOfRunsAndLumiFromDBS(dataSet, lastRun=-1)
def selectFilesToProcess(listOfRunsAndLumiFromDBS, listOfRunsAndLumiFromRR, newRunList, runListDir, dataSet, mailList, dbsTolerance, dbsTolerancePercent, rrTolerance, missingFilesTolerance, missingLumisTimeout)
def timeoutManager(type, timeout=-1, fileName=".timeout")
Definition: CommonMethods.py:9
def writeSqliteFile(sqliteFileName, tagName, timeType, beamSpotFile, sqliteTemplateFile, tmpDir="/tmp/")
def sendEmail(mailList, error)
General utilities.
Definition: main.py:1
def getListOfRunsAndLumiFromRR(firstRun=-1)
#define str(s)
double split
Definition: MVATrainer.cc:139