CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
BeamSpotWorkflow.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #____________________________________________________________
3 #
4 # BeamSpotWorkflow
5 #
6 # A very complicate way to automatize the beam spot workflow
7 #
8 # Francisco Yumiceva, Lorenzo Uplegger
9 # yumiceva@fnal.gov, uplegger@fnal.gov
10 #
11 # Fermilab, 2010
12 #
13 #____________________________________________________________
14 
15 """
16  BeamSpotWorkflow.py
17 
18  A very complicate script to upload the results into the DB
19 
20  usage: %prog -d <data file/directory> -t <tag name>
21  -c, --cfg = CFGFILE : Use a different configuration file than the default
22  -l, --lock = LOCK : Create a lock file to have just one script running
23  -o, --overwrite : Overwrite results files when copying.
24  -T, --Test : Upload files to Test dropbox for data validation.
25  -u, --upload : Upload files to offline drop box via scp.
26  -z, --zlarge : Enlarge sigmaZ to 10 +/- 0.005 cm.
27 
28  Francisco Yumiceva (yumiceva@fnal.gov)
29  Lorenzo Uplegger (send an email to Francisco)
30  Fermilab 2010
31 
32 """
33 
34 
35 import sys,os
36 import commands, re, time
37 import datetime
38 import ConfigParser
39 import xmlrpclib
40 from BeamSpotObj import BeamSpot
41 from IOVObj import IOV
42 from CommonMethods import *
43 
44 try: # FUTURE: Python 2.6, prior to 2.6 requires simplejson
45  import json
46 except:
47  try:
48  import simplejson as json
49  except:
50  error = "Please set a crab environment in order to get the proper JSON lib"
51  exit(error)
52 
53 #####################################################################################
54 # General functions
55 #####################################################################################
56 def getLastUploadedIOV(tagName,destDB="oracle://cms_orcoff_prod/CMS_COND_31X_BEAMSPOT"):
57  #return 582088327592295
58  listIOVCommand = "cmscond_list_iov -c " + destDB + " -P /afs/cern.ch/cms/DB/conddb -t " + tagName
59  dbError = commands.getstatusoutput( listIOVCommand )
60  if dbError[0] != 0 :
61  if dbError[1].find("metadata entry \"" + tagName + "\" does not exist") != -1:
62  print "Creating a new tag because I got the following error contacting the DB"
63  print dbError[1]
64  return 1
65  #return 133928
66  else:
67  exit("ERROR: Can\'t connect to db because:\n" + dbError[1])
68 
69 
70  aCommand = listIOVCommand + " | grep DB= | tail -1 | awk \'{print $1}\'"
71  output = commands.getstatusoutput( aCommand )
72 
73  #WARNING when we pass to lumi IOV this should be long long
74  if output[1] == '':
75  exit("ERROR: The tag " + tagName + " exists but I can't get the value of the last IOV")
76 
77  return long(output[1])
78 
79 ########################################################################
80 def getListOfFilesToProcess(dataSet,lastRun=-1):
81  queryCommand = "dbs --search --query \"find file where dataset=" + dataSet
82  if lastRun != -1:
83  queryCommand = queryCommand + " and run > " + str(lastRun)
84  queryCommand = queryCommand + "\" | grep .root"
85 # print " >> " + queryCommand
86  output = commands.getstatusoutput( queryCommand )
87  return output[1].split('\n')
88 
89 ########################################################################
91  queryCommand = "dbs --search --query \"find file where dataset=" + dataSet + " and run = " + str(run) + "\" | grep .root"
92  #print " >> " + queryCommand
93  output = commands.getstatusoutput( queryCommand )
94  if output[0] != 0:
95  return 0
96  else:
97  return len(output[1].split('\n'))
98 
99 ########################################################################
100 def getListOfRunsAndLumiFromDBS(dataSet,lastRun=-1):
101  datasetList = dataSet.split(',')
102  outputList = []
103  for data in datasetList:
104  queryCommand = "dbs --search --query \"find run,lumi where dataset=" + data
105  if lastRun != -1:
106  queryCommand = queryCommand + " and run > " + str(lastRun)
107  queryCommand += "\""
108  print " >> " + queryCommand
109  output = []
110  for i in range(0,3):
111  output = commands.getstatusoutput( queryCommand )
112  if output[0] == 0 and not (output[1].find("ERROR") != -1 or output[1].find("Error") != -1) :
113  break
114  if output[0] != 0:
115  exit("ERROR: I can't contact DBS for the following reason:\n" + output[1])
116  #print output[1]
117  tmpList = output[1].split('\n')
118  for file in tmpList:
119  outputList.append(file)
120  runsAndLumis = {}
121  for out in outputList:
122  regExp = re.search('(\d+)\s+(\d+)',out)
123  if regExp:
124  run = long(regExp.group(1))
125  lumi = long(regExp.group(2))
126  if not run in runsAndLumis:
127  runsAndLumis[run] = []
128  runsAndLumis[run].append(lumi)
129 
130 # print runsAndLumis
131 # exit("ok")
132  return runsAndLumis
133 
134 #####################################################################################
135 def getListOfRunsAndLumiFromFile(firstRun=-1,fileName=""):
136  file = open(fileName);
137  jsonFile = file.read();
138  file.close()
139  jsonList=json.loads(jsonFile);
140 
141  selected_dcs = {};
142  for element in jsonList:
143  selected_dcs[long(element)]=jsonList[element]
144  return selected_dcs
145 
146 ########################################################################
147 def getListOfRunsAndLumiFromRR(firstRun=-1):
148  RunReg ="http://pccmsdqm04.cern.ch/runregistry"
149  #RunReg = "http://localhost:40010/runregistry"
150  #Dataset=%Online%
151  Group = "Collisions10"
152 
153  # get handler to RR XML-RPC server
154  FULLADDRESS=RunReg + "/xmlrpc"
155  #print "RunRegistry from: ",FULLADDRESS
156  server = xmlrpclib.ServerProxy(FULLADDRESS)
157  #sel_runtable="{groupName} ='" + Group + "' and {runNumber} > " + str(firstRun) + " and {datasetName} LIKE '" + Dataset + "'"
158  sel_runtable="{groupName} ='" + Group + "' and {runNumber} > " + str(firstRun)
159  #sel_dcstable="{groupName} ='" + Group + "' and {runNumber} > " + str(firstRun) + " and {parDcsBpix} = 1 and {parDcsFpix} = 1 and {parDcsTibtid} = 1 and {parDcsTecM} = 1 and {parDcsTecP} = 1 and {parDcsTob} = 1 and {parDcsEbminus} = 1 and {parDcsEbplus} = 1 and {parDcsEeMinus} = 1 and {parDcsEePlus} = 1 and {parDcsEsMinus} = 1 and {parDcsEsPlus} = 1 and {parDcsHbheA} = 1 and {parDcsHbheB} = 1 and {parDcsHbheC} = 1 and {parDcsH0} = 1 and {parDcsHf} = 1"
160 
161  maxAttempts = 3;
162  tries = 0;
163  while tries<maxAttempts:
164  try:
165  run_data = server.DataExporter.export('RUN' , 'GLOBAL', 'csv_runs', sel_runtable)
166  #dcs_data = server.DataExporter.export('RUNLUMISECTION', 'GLOBAL', 'json' , sel_dcstable)
167  break
168  except:
169  print "Something wrong in accessing runregistry, retrying in 2s....", tries, "/", maxAttempts
170  tries += 1
171  time.sleep(2)
172  if tries==maxAttempts:
173  error = "Run registry unaccessible.....exiting now"
174  return {};
175 
176 
177  listOfRuns=[]
178  for line in run_data.split("\n"):
179  run=line.split(',')[0]
180  if run.isdigit():
181  listOfRuns.append(run)
182 
183 
184  firstRun = listOfRuns[len(listOfRuns)-1];
185  lastRun = listOfRuns[0];
186  sel_dcstable="{groupName} ='" + Group + "' and {runNumber} >= " + str(firstRun) + " and {runNumber} <= " + str(lastRun) + " and {parDcsBpix} = 1 and {parDcsFpix} = 1 and {parDcsTibtid} = 1 and {parDcsTecM} = 1 and {parDcsTecP} = 1 and {parDcsTob} = 1 and {parDcsEbminus} = 1 and {parDcsEbplus} = 1 and {parDcsEeMinus} = 1 and {parDcsEePlus} = 1 and {parDcsEsMinus} = 1 and {parDcsEsPlus} = 1 and {parDcsHbheA} = 1 and {parDcsHbheB} = 1 and {parDcsHbheC} = 1 and {parDcsH0} = 1 and {parDcsHf} = 1"
187 
188  tries = 0;
189  while tries<maxAttempts:
190  try:
191  #run_data = server.DataExporter.export('RUN' , 'GLOBAL', 'csv_runs', sel_runtable)
192  dcs_data = server.DataExporter.export('RUNLUMISECTION', 'GLOBAL', 'json' , sel_dcstable)
193  break
194  except:
195  print "I was able to get the list of runs and now I am trying to access the detector status, retrying in 2s....", tries, "/", maxAttempts
196  tries += 1
197  time.sleep(2)
198  if tries==maxAttempts:
199  error = "Run registry unaccessible.....exiting now"
200  return {};
201 
202  selected_dcs={}
203  jsonList=json.loads(dcs_data)
204 
205  #for element in jsonList:
206  for element in listOfRuns:
207  #if element in listOfRuns:
208  if element in jsonList:
209  selected_dcs[long(element)]=jsonList[element]
210  else:
211  print "WARNING: Run " + element + " is a collision10 run with 0 lumis in Run Registry!"
212  selected_dcs[long(element)]= [[]]
213  #print selected_dcs
214  return selected_dcs
215 
216 ########################################################################
217 def getLastClosedRun(DBSListOfFiles):
218  runs = []
219  for file in DBSListOfFiles:
220  runNumber = getRunNumberFromDBSName(file)
221  if runs.count(runNumber) == 0:
222  runs.append(runNumber)
223 
224  if len(runs) <= 1: #No closed run
225  return -1
226  else:
227  runs.sort()
228  return long(runs[len(runs)-2])
229 
230 ########################################################################
232 # regExp = re.search('(\D+)_(\d+)_(\d+)_(\d+)',fileName)
233  regExp = re.search('(\D+)_(\d+)_(\d+)_',fileName)
234  if not regExp:
235  return -1
236  return long(regExp.group(3))
237 
238 ########################################################################
240  regExp = re.search('(\D+)/(\d+)/(\d+)/(\d+)/(\D+)',fileName)
241  if not regExp:
242  return -1
243  return long(regExp.group(3)+regExp.group(4))
244 
245 ########################################################################
246 def getNewRunList(fromDir,lastUploadedIOV):
247  newRunList = []
248  listOfFiles = ls(fromDir,".txt")
249  runFileMap = {}
250  for fileName in listOfFiles:
251  runNumber = getRunNumberFromFileName(fileName)
252  if runNumber > lastUploadedIOV:
253  newRunList.append(fileName)
254  return newRunList
255 
256 ########################################################################
257 def selectFilesToProcess(listOfRunsAndLumiFromDBS,listOfRunsAndLumiFromRR,newRunList,runListDir,dataSet,mailList,dbsTolerance,dbsTolerancePercent,rrTolerance,missingFilesTolerance,missingLumisTimeout):
258  runsAndLumisProcessed = {}
259  runsAndFiles = {}
260  for fileName in newRunList:
261  file = open(runListDir+fileName)
262  for line in file:
263  if line.find("Runnumber") != -1:
264  run = long(line.replace('\n','').split(' ')[1])
265  elif line.find("LumiRange") != -1:
266  lumiLine = line.replace('\n','').split(' ')
267  begLumi = long(lumiLine[1])
268  endLumi = long(lumiLine[3])
269  if begLumi != endLumi:
270  error = "The lumi range is greater than 1 for run " + str(run) + " " + line + " in file: " + runListDir + fileName
271  exit(error)
272  else:
273  if not run in runsAndLumisProcessed:
274  runsAndLumisProcessed[run] = []
275  if begLumi in runsAndLumisProcessed[run]:
276  print "Lumi " + str(begLumi) + " in event " + str(run) + " already exist. This MUST not happen but right now I will ignore this lumi!"
277  else:
278  runsAndLumisProcessed[run].append(begLumi)
279  if not run in runsAndFiles:
280  runsAndFiles[run] = []
281  runsAndFiles[run].append(fileName)
282  file.close()
283 
284  rrKeys = listOfRunsAndLumiFromRR.keys()
285  rrKeys.sort()
286  dbsKeys = listOfRunsAndLumiFromDBS.keys()
287  dbsKeys.sort()
288  #I remove the last entry from DBS since I am not sure it is an already closed run!
289  lastUnclosedRun = dbsKeys.pop()
290  #print "Last unclosed run: " + str(lastUnclosedRun)
291  procKeys = runsAndLumisProcessed.keys()
292  procKeys.sort()
293  #print "Run Registry:"
294  #print rrKeys
295  #print "DBS:"
296  #print dbsKeys
297  #print "List:"
298  #print procKeys
299  #print lastUnclosedRun
300  filesToProcess = []
301  for run in rrKeys:
302  RRList = []
303  for lumiRange in listOfRunsAndLumiFromRR[run]:
304  if lumiRange != []:
305  for l in range(lumiRange[0],lumiRange[1]+1):
306  RRList.append(long(l))
307  if run in procKeys and run < lastUnclosedRun:
308  #print "run " + str(run) + " is in procKeys"
309  if not run in dbsKeys and run != lastUnclosedRun:
310  error = "Impossible but run " + str(run) + " has been processed and it is also in the run registry but it is not in DBS!"
311  exit(error)
312  print "Working on run " + str(run)
313  nFiles = 0
314  for data in dataSet.split(','):
315  nFiles = getNumberOfFilesToProcessForRun(data,run)
316  if nFiles != 0:
317  break
318  if len(runsAndFiles[run]) < nFiles:
319  print "I haven't processed all files yet : " + str(len(runsAndFiles[run])) + " out of " + str(nFiles) + " for run: " + str(run)
320  if nFiles - len(runsAndFiles[run]) <= missingFilesTolerance:
321  timeoutManager("DBS_VERY_BIG_MISMATCH_Run"+str(run)) # resetting this timeout
322  timeoutType = timeoutManager("DBS_MISMATCH_Run"+str(run),missingLumisTimeout)
323  if timeoutType == 1:
324  print "WARNING: I previously set a timeout that expired...I'll continue with the script even if I didn't process all the lumis!"
325  else:
326  if timeoutType == -1:
327  print "WARNING: Setting the DBS_MISMATCH_Run" + str(run) + " timeout because I haven't processed all files!"
328  else:
329  print "WARNING: Timeout DBS_MISMATCH_Run" + str(run) + " is in progress."
330  return filesToProcess
331  else:
332  timeoutType = timeoutManager("DBS_VERY_BIG_MISMATCH_Run"+str(run),missingLumisTimeout)
333  if timeoutType == 1:
334  error = "ERROR: I previously set a timeout that expired...I can't continue with the script because there are too many (" + str(nFiles - len(runsAndFiles[run])) + " files missing) and for too long " + str(missingLumisTimeout/3600) + " hours! I will process anyway the runs before this one (" + str(run) + ")"
335  sendEmail(mailList,error)
336  return filesToProcess
337  #exit(error)
338  else:
339  if timeoutType == -1:
340  print "WARNING: Setting the DBS_VERY_BIG_MISMATCH_Run" + str(run) + " timeout because I haven't processed all files!"
341  else:
342  print "WARNING: Timeout DBS_VERY_BIG_MISMATCH_Run" + str(run) + " is in progress."
343  return filesToProcess
344 
345  else:
346  timeoutManager("DBS_VERY_BIG_MISMATCH_Run"+str(run))
347  timeoutManager("DBS_MISMATCH_Run"+str(run))
348  print "I have processed " + str(len(runsAndFiles[run])) + " out of " + str(nFiles) + " files that are in DBS. So I should have all the lumis!"
349  errors = []
350  badProcessed = []
351  badDBSProcessed = []
352  badDBS = []
353  badRRProcessed = []
354  badRR = []
355  #It is important for runsAndLumisProcessed[run] to be the first because the comparision is not ==
356  badDBSProcessed,badDBS = compareLumiLists(runsAndLumisProcessed[run],listOfRunsAndLumiFromDBS[run],errors)
357  for i in range(0,len(errors)):
358  errors[i] = errors[i].replace("listA","the processed lumis")
359  errors[i] = errors[i].replace("listB","DBS")
360  #print errors
361  #print badProcessed
362  #print badDBS
363  #exit("ciao")
364  if len(badDBS) != 0:
365  print "This is weird because I processed more lumis than the ones that are in DBS!"
366  if len(badDBSProcessed) != 0 and run in rrKeys:
367  lastError = len(errors)
368  #print RRList
369  #It is important for runsAndLumisProcessed[run] to be the first because the comparision is not ==
370  badRRProcessed,badRR = compareLumiLists(runsAndLumisProcessed[run],RRList,errors)
371  for i in range(0,len(errors)):
372  errors[i] = errors[i].replace("listA","the processed lumis")
373  errors[i] = errors[i].replace("listB","Run Registry")
374  #print errors
375  #print badProcessed
376  #print badRunRegistry
377 
378  if len(badRRProcessed) != 0:
379  print "I have not processed some of the lumis that are in the run registry for run: " + str(run)
380  for lumi in badDBSProcessed:
381  if lumi in badRRProcessed:
382  badProcessed.append(lumi)
383  lenA = len(badProcessed)
384  lenB = len(RRList)
385  if 100.*lenA/lenB <= dbsTolerancePercent:
386  print "WARNING: I didn't process " + str(100.*lenA/lenB) + "% of the lumis but I am within the " + str(dbsTolerancePercent) + "% set in the configuration. Which corrispond to " + str(lenA) + " out of " + str(lenB) + " lumis"
387  #print errors
388  badProcessed = []
389  elif lenA <= dbsTolerance:
390  print "WARNING: I didn't process " + str(lenA) + " lumis but I am within the " + str(dbsTolerance) + " lumis set in the configuration. Which corrispond to " + str(lenA) + " out of " + str(lenB) + " lumis"
391  #print errors
392  badProcessed = []
393  else:
394  error = "ERROR: For run " + str(run) + " I didn't process " + str(100.*lenA/lenB) + "% of the lumis and I am not within the " + str(dbsTolerancePercent) + "% set in the configuration. The number of lumis that I didn't process (" + str(lenA) + " out of " + str(lenB) + ") is greater also than the " + str(dbsTolerance) + " lumis that I can tolerate. I can't process runs >= " + str(run) + " but I'll process the runs before!"
395  sendEmail(mailList,error)
396  print error
397  return filesToProcess
398  #exit(errors)
399  #return filesToProcess
400  elif len(errors) != 0:
401  print "The number of lumi sections processed didn't match the one in DBS but they cover all the ones in the Run Registry, so it is ok!"
402  #print errors
403 
404  #If I get here it means that I passed or the DBS or the RR test
405  if len(badProcessed) == 0:
406  for file in runsAndFiles[run]:
407  filesToProcess.append(file)
408  else:
409  #print errors
410  print "This should never happen because if I have errors I return or exit! Run: " + str(run)
411  else:
412  error = "Run " + str(run) + " is in the run registry but it has not been processed yet!"
413  print error
414  timeoutType = timeoutManager("MISSING_RUNREGRUN_Run"+str(run),missingLumisTimeout)
415  if timeoutType == 1:
416  if len(RRList) <= rrTolerance:
417  error = "WARNING: I previously set the MISSING_RUNREGRUN_Run" + str(run) + " timeout that expired...I am missing run " + str(run) + " but it only had " + str(len(RRList)) + " <= " + str(rrTolerance) + " lumis. So I will continue and ignore it... "
418  #print listOfRunsAndLumiFromRR[run]
419  print error
420  #sendEmail(mailList,error)
421  else:
422  error = "ERROR: I previously set the MISSING_RUNREGRUN_Run" + str(run) + " timeout that expired...I am missing run " + str(run) + " which has " + str(len(RRList)) + " > " + str(rrTolerance) + " lumis. I can't continue but I'll process the runs before this one"
423  sendEmail(mailList,error)
424  return filesToProcess
425  #exit(error)
426  else:
427  if timeoutType == -1:
428  print "WARNING: Setting the MISSING_RUNREGRUN_Run" + str(run) + " timeout because I haven't processed a run!"
429  else:
430  print "WARNING: Timeout MISSING_RUNREGRUN_Run" + str(run) + " is in progress."
431  return filesToProcess
432 
433  return filesToProcess
434 ########################################################################
435 def compareLumiLists(listA,listB,errors=[],tolerance=0):
436  lenA = len(listA)
437  lenB = len(listB)
438  if lenA < lenB-(lenB*float(tolerance)/100):
439  errors.append("ERROR: The number of lumi sections is different: listA(" + str(lenA) + ")!=(" + str(lenB) + ")listB")
440  #else:
441  #errors.append("Lumi check ok!listA(" + str(lenA) + ")-(" + str(lenB) + ")listB")
442  #print errors
443  listA.sort()
444  listB.sort()
445  #shorter = lenA
446  #if lenB < shorter:
447  # shorter = lenB
448  #a = 0
449  #b = 0
450  badA = []
451  badB = []
452  #print listB
453  #print listA
454  #print len(listA)
455  #print len(listB)
456  #counter = 1
457  for lumi in listA:
458  #print str(counter) + "->" + str(lumi)
459  #counter += 1
460  if not lumi in listB:
461  errors.append("Lumi (" + str(lumi) + ") is in listA but not in listB")
462  badB.append(lumi)
463  #print "Bad B: " + str(lumi)
464  #exit("hola")
465  for lumi in listB:
466  if not lumi in listA:
467  errors.append("Lumi (" + str(lumi) + ") is in listB but not in listA")
468  badA.append(lumi)
469  #print "Bad A: " + str(lumi)
470 
471  return badA,badB
472 
473 ########################################################################
474 def removeUncompleteRuns(newRunList,dataSet):
475  processedRuns = {}
476  for fileName in newRunList:
477  run = getRunNumberFromFileName(fileName)
478  if not run in processedRuns:
479  processedRuns[run] = 0
480  processedRuns[run] += 1
481 
482  for run in processedRuns.keys():
483  nFiles = getNumberOfFilesToProcessForRun(dataSet,run)
484  if processedRuns[run] < nFiles:
485  print "I haven't processed all files yet : " + str(processedRuns[run]) + " out of " + str(nFiles) + " for run: " + str(run)
486  else:
487  print "All files have been processed for run: " + str(run) + " (" + str(processedRuns[run]) + " out of " + str(nFiles) + ")"
488 
489 ########################################################################
490 def aselectFilesToProcess(listOfFilesToProcess,newRunList):
491  selectedFiles = []
492  runsToProcess = {}
493  processedRuns = {}
494  for file in listOfFilesToProcess:
495  run = getRunNumberFromDBSName(file)
496 # print "To process: " + str(run)
497  if run not in runsToProcess:
498  runsToProcess[run] = 1
499  else:
500  runsToProcess[run] = runsToProcess[run] + 1
501 
502  for file in newRunList:
503  run = getRunNumberFromFileName(file)
504 # print "Processed: " + str(run)
505  if run not in processedRuns:
506  processedRuns[run] = 1
507  else:
508  processedRuns[run] = processedRuns[run] + 1
509 
510  #WARNING: getLastClosedRun MUST also have a timeout otherwise the last run will not be considered
511  lastClosedRun = getLastClosedRun(listOfFilesToProcess)
512 # print "LastClosedRun:-" + str(lastClosedRun) + "-"
513 
514  processedRunsKeys = processedRuns.keys()
515  processedRunsKeys.sort()
516 
517  for run in processedRunsKeys:
518  if run <= lastClosedRun :
519  print "For run " + str(run) + " I have processed " + str(processedRuns[run]) + " files and in DBS there are " + str(runsToProcess[run]) + " files!"
520  if not run in runsToProcess:
521  exit("ERROR: I have a result file for run " + str(run) + " but it doesn't exist in DBS. Impossible but it happened!")
522  lumiList = getDBSLumiListForRun(run)
523  if processedRuns[run] == runsToProcess[run]:
524  for file in newRunList:
525  if run == getRunNumberFromFileName(file):
526  selectedFiles.append(file)
527  else:
528  exit("ERROR: For run " + str(run) + " I have processed " + str(processedRuns[run]) + " files but in DBS there are " + str(runsToProcess[run]) + " files!")
529  return selectedFiles
530 
531 ########################################################################
532 def main():
533  ######### COMMAND LINE OPTIONS ##############
534  option,args = parse(__doc__)
535 
536  ######### Check if there is already a megascript running ########
537  if option.lock:
538  setLockName('.' + option.lock)
539  if checkLock():
540  print "There is already a megascript runnning...exiting"
541  return
542  else:
543  lock()
544 
545 
546  destDB = 'oracle://cms_orcon_prod/CMS_COND_31X_BEAMSPOT'
547  if option.Test:
548  destDB = 'oracle://cms_orcoff_prep/CMS_COND_BEAMSPOT'
549 
550  ######### CONFIGURATION FILE ################
551  cfgFile = "BeamSpotWorkflow.cfg"
552  if option.cfg:
553  cfgFile = option.cfg
554  configurationFile = os.getenv("CMSSW_BASE") + "/src/RecoVertex/BeamSpotProducer/scripts/" + cfgFile
555  configuration = ConfigParser.ConfigParser()
556  print 'Reading configuration from ', configurationFile
557  configuration.read(configurationFile)
558 
559  sourceDir = configuration.get('Common','SOURCE_DIR')
560  archiveDir = configuration.get('Common','ARCHIVE_DIR')
561  workingDir = configuration.get('Common','WORKING_DIR')
562  databaseTag = configuration.get('Common','DBTAG')
563  dataSet = configuration.get('Common','DATASET')
564  fileIOVBase = configuration.get('Common','FILE_IOV_BASE')
565  dbIOVBase = configuration.get('Common','DB_IOV_BASE')
566  dbsTolerance = float(configuration.get('Common','DBS_TOLERANCE'))
567  dbsTolerancePercent = float(configuration.get('Common','DBS_TOLERANCE_PERCENT'))
568  rrTolerance = float(configuration.get('Common','RR_TOLERANCE'))
569  missingFilesTolerance = float(configuration.get('Common','MISSING_FILES_TOLERANCE'))
570  missingLumisTimeout = float(configuration.get('Common','MISSING_LUMIS_TIMEOUT'))
571  jsonFileName = configuration.get('Common','JSON_FILE')
572  mailList = configuration.get('Common','EMAIL')
573 
574  ######### DIRECTORIES SETUP #################
575  if sourceDir[len(sourceDir)-1] != '/':
576  sourceDir = sourceDir + '/'
577  if not dirExists(sourceDir):
578  error = "ERROR: The source directory " + sourceDir + " doesn't exist!"
579  sendEmail(mailList,error)
580  exit(error)
581 
582  if archiveDir[len(archiveDir)-1] != '/':
583  archiveDir = archiveDir + '/'
584  if not os.path.isdir(archiveDir):
585  os.mkdir(archiveDir)
586 
587  if workingDir[len(workingDir)-1] != '/':
588  workingDir = workingDir + '/'
589  if not os.path.isdir(workingDir):
590  os.mkdir(workingDir)
591  else:
592  os.system("rm -f "+ workingDir + "*")
593 
594 
595  print "Getting last IOV for tag: " + databaseTag
596  lastUploadedIOV = 1
597  if destDB == "oracle://cms_orcon_prod/CMS_COND_31X_BEAMSPOT":
598  lastUploadedIOV = getLastUploadedIOV(databaseTag)
599  else:
600  lastUploadedIOV = getLastUploadedIOV(databaseTag,destDB)
601 
602  #lastUploadedIOV = 133885
603  #lastUploadedIOV = 575216380019329
604  if dbIOVBase == "lumiid":
605  lastUploadedIOV = unpackLumiid(lastUploadedIOV)["run"]
606 
607  ######### Get list of files processed after the last IOV
608  print "Getting list of files processed after IOV " + str(lastUploadedIOV)
609  newProcessedRunList = getNewRunList(sourceDir,lastUploadedIOV)
610  if len(newProcessedRunList) == 0:
611  exit("There are no new runs after " + str(lastUploadedIOV))
612 
613  ######### Copy files to archive directory
614  print "Copying files to archive directory"
615  copiedFiles = []
616  for i in range(3):
617  copiedFiles = cp(sourceDir,archiveDir,newProcessedRunList)
618  if len(copiedFiles) == len(newProcessedRunList):
619  break;
620  if len(copiedFiles) != len(newProcessedRunList):
621  error = "ERROR: I can't copy more than " + str(len(copiedFiles)) + " files out of " + str(len(newProcessedRunList))
622  sendEmail(mailList,error)
623  exit(error)
624 
625 
626  ######### Get from DBS the list of files after last IOV
627  #listOfFilesToProcess = getListOfFilesToProcess(dataSet,lastUploadedIOV)
628  print "Getting list of files from DBS"
629  listOfRunsAndLumiFromDBS = getListOfRunsAndLumiFromDBS(dataSet,lastUploadedIOV)
630  if len(listOfRunsAndLumiFromDBS) == 0:
631  exit("There are no files in DBS to process")
632  print "Getting list of files from RR"
633  listOfRunsAndLumiFromRR = getListOfRunsAndLumiFromRR(lastUploadedIOV)
634  if(not listOfRunsAndLumiFromRR):
635  print "Looks like I can't get anything from the run registry so I'll get the data from the json file " + jsonFileName
636  listOfRunsAndLumiFromRR = getListOfRunsAndLumiFromFile(lastUploadedIOV,jsonFileName)
637  ######### Get list of files to process for DB
638  #selectedFilesToProcess = selectFilesToProcess(listOfFilesToProcess,copiedFiles)
639  #completeProcessedRuns = removeUncompleteRuns(copiedFiles,dataSet)
640  #print copiedFiles
641  #print completeProcessedRuns
642  #exit("complete")
643  print "Getting list of files to process"
644  selectedFilesToProcess = selectFilesToProcess(listOfRunsAndLumiFromDBS,listOfRunsAndLumiFromRR,copiedFiles,archiveDir,dataSet,mailList,dbsTolerance,dbsTolerancePercent,rrTolerance,missingFilesTolerance,missingLumisTimeout)
645  if len(selectedFilesToProcess) == 0:
646  exit("There are no files to process")
647 
648  #print selectedFilesToProcess
649  ######### Copy files to working directory
650  print "Copying files from archive to working directory"
651  copiedFiles = []
652  for i in range(3):
653  copiedFiles = cp(archiveDir,workingDir,selectedFilesToProcess)
654  if len(copiedFiles) == len(selectedFilesToProcess):
655  break;
656  else:
657  commands.getstatusoutput("rm -rf " + workingDir)
658  if len(copiedFiles) != len(selectedFilesToProcess):
659  error = "ERROR: I can't copy more than " + str(len(copiedFiles)) + " files out of " + str(len(selectedFilesToProcess)) + " from " + archiveDir + " to " + workingDir
660  sendEmail(mailList,error)
661  exit(error)
662 
663  print "Sorting and cleaning beamlist"
664  beamSpotObjList = []
665  for fileName in copiedFiles:
666  readBeamSpotFile(workingDir+fileName,beamSpotObjList,fileIOVBase)
667 
668  sortAndCleanBeamList(beamSpotObjList,fileIOVBase)
669 
670  if len(beamSpotObjList) == 0:
671  error = "WARNING: None of the processed and copied payloads has a valid fit so there are no results. This shouldn't happen since we are filtering using the run register, so there should be at least one good run."
672  exit(error)
673 
674  payloadFileName = "PayloadFile.txt"
675 
676  runBased = False
677  if dbIOVBase == "runnumber":
678  runBased = True
679 
680  payloadList = createWeightedPayloads(workingDir+payloadFileName,beamSpotObjList,runBased)
681  if len(payloadList) == 0:
682  error = "WARNING: I wasn't able to create any payload even if I have some BeamSpot objects."
683  exit(error)
684 
685 
686  tmpPayloadFileName = workingDir + "SingleTmpPayloadFile.txt"
687  tmpSqliteFileName = workingDir + "SingleTmpSqliteFile.db"
688 
689  writeDBTemplate = os.getenv("CMSSW_BASE") + "/src/RecoVertex/BeamSpotProducer/test/write2DB_template.py"
690  readDBTemplate = os.getenv("CMSSW_BASE") + "/src/RecoVertex/BeamSpotProducer/test/readDB_template.py"
691  payloadNumber = -1
692  iovSinceFirst = '0';
693  iovTillLast = '0';
694 
695  #Creating the final name for the combined sqlite file
696  uuid = commands.getstatusoutput('uuidgen -t')[1]
697  final_sqlite_file_name = databaseTag + '@' + uuid
698  sqlite_file = workingDir + final_sqlite_file_name + ".db"
699  metadata_file = workingDir + final_sqlite_file_name + ".txt"
700 
701  for payload in payloadList:
702  payloadNumber += 1
703  if option.zlarge:
704  payload.sigmaZ = 10
705  payload.sigmaZerr = 2.5e-05
706  tmpFile = file(tmpPayloadFileName,'w')
707  dumpValues(payload,tmpFile)
708  tmpFile.close()
709  if not writeSqliteFile(tmpSqliteFileName,databaseTag,dbIOVBase,tmpPayloadFileName,writeDBTemplate,workingDir):
710  error = "An error occurred while writing the sqlite file: " + tmpSqliteFileName
711  exit(error)
712  readSqliteFile(tmpSqliteFileName,databaseTag,readDBTemplate,workingDir)
713 
714  ##############################################################
715  #WARNING I am not sure if I am packing the right values
716  if dbIOVBase == "runnumber":
717  iov_since = str(payload.Run)
718  iov_till = iov_since
719  elif dbIOVBase == "lumiid":
720  iov_since = str( pack(int(payload.Run), int(payload.IOVfirst)) )
721  iov_till = str( pack(int(payload.Run), int(payload.IOVlast)) )
722  elif dbIOVBase == "timestamp":
723  error = "ERROR: IOV " + dbIOVBase + " still not implemented."
724  exit(error)
725  else:
726  error = "ERROR: IOV " + dbIOVBase + " unrecognized!"
727  exit(error)
728 
729  if payloadNumber == 0:
730  iovSinceFirst = iov_since
731  if payloadNumber == len(payloadList)-1:
732  iovTillLast = iov_till
733 
734  appendSqliteFile(final_sqlite_file_name + ".db", tmpSqliteFileName, databaseTag, iov_since, iov_till ,workingDir)
735  os.system("rm -f " + tmpPayloadFileName + " " + tmpSqliteFileName)
736 
737 
738  #### CREATE payload for merged output
739 
740  print " create MERGED payload card for dropbox ..."
741 
742  dfile = open(metadata_file,'w')
743 
744  dfile.write('destDB ' + destDB +'\n')
745  dfile.write('tag ' + databaseTag +'\n')
746  dfile.write('inputtag' +'\n')
747  dfile.write('since ' + iovSinceFirst +'\n')
748  #dfile.write('till ' + iov_till +'\n')
749  dfile.write('Timetype '+ dbIOVBase +'\n')
750 
751  ###################################################
752  # WARNING tagType forced to offline
753  print "WARNING TAG TYPE forced to be just offline"
754  tagType = "offline"
755  checkType = tagType
756  if tagType == "express":
757  checkType = "hlt"
758  dfile.write('IOVCheck ' + checkType + '\n')
759  dfile.write('usertext Beam spot position\n')
760 
761  dfile.close()
762 
763 
764 
765  if option.upload:
766  print " scp files to offline Drop Box"
767  dropbox = "/DropBox"
768  if option.Test:
769  dropbox = "/DropBox_test"
770  print "UPLOADING TO TEST DB"
771  uploadSqliteFile(workingDir, final_sqlite_file_name, dropbox)
772 
773  archive_sqlite_file_name = "Payloads_" + iovSinceFirst + "_" + iovTillLast + "_" + final_sqlite_file_name
774  archive_results_file_name = "Payloads_" + iovSinceFirst + "_" + iovTillLast + "_" + databaseTag + ".txt"
775  if not os.path.isdir(archiveDir + 'payloads'):
776  os.mkdir(archiveDir + 'payloads')
777  commands.getstatusoutput('mv ' + sqlite_file + ' ' + archiveDir + 'payloads/' + archive_sqlite_file_name + '.db')
778  commands.getstatusoutput('mv ' + metadata_file + ' ' + archiveDir + 'payloads/' + archive_sqlite_file_name + '.txt')
779  commands.getstatusoutput('cp ' + workingDir + payloadFileName + ' ' + archiveDir + 'payloads/' + archive_results_file_name)
780 
781  print archiveDir + "payloads/" + archive_sqlite_file_name + '.db'
782  print archiveDir + "payloads/" + archive_sqlite_file_name + '.txt'
783 
784  rmLock()
785 
786 if __name__ == '__main__':
787  main()
def sendEmail
General utilities.
Evaluator * parse(const T &text)
def createWeightedPayloads
CREATE FILE FOR PAYLOADS.
def replace
Definition: linker.py:10
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:7
def getNumberOfFilesToProcessForRun
def getLastUploadedIOV
General functions.
perl if(1 lt scalar(@::datatypes))
Definition: edlooper.cc:31
def sortAndCleanBeamList
Sort and clean list of data for consecutive duplicates and bad fits.
double split
Definition: MVATrainer.cc:139