CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
dqmPostProcessing_online.py
Go to the documentation of this file.
1 #! /usr/bin/env python
2 
3 import os, time, sys, shutil, glob, smtplib, re
4 from datetime import datetime
5 from email.MIMEText import MIMEText
6 #from ROOT import TFile
7 sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
8 
9 DIR = '/data/dqm/dropbox' # directory to search new files
10 DB = '/home/dqm/dqm.db' #master db
11 TMPDB = '/home/dqm/dqm.db.tmp' # temporal db
12 FILEDIR = '/data/dqm/merged' # directory, to which merged file is stored
13 DONEDIR = '/data/dqm/done' # directory, to which processed files are stored
14 WAITTIME = 120 # waiting time for new files (sec)
15 MAX_TOTAL_RUNS = 400
16 MAX_RUNS = 10
17 
18 YourMail = "lilopera@cern.ch"
19 ServerMail = "dqm@srv-C2D05-19.cms"
20 
21 def sendmail(EmailAddress,run):
22  s=smtplib.SMTP("localhost")
23  tolist=[EmailAddress, "lat@cern.ch"]
24  body="File merge failed by unknown reason for run"+run
25  msg = MIMEText(body)
26  msg['Subject'] = "File merge failed."
27  msg['From'] = ServerMail
28  msg['To'] = EmailAddress
29  s.sendmail(ServerMail,tolist,msg.as_string())
30  s.quit()
31 
32 def filecheck(rootfile):
33  f = TFile(rootfile)
34  if (f.IsZombie()):
35  #print "File corrupted"
36  f.Close()
37  return 0
38  else:
39  hist = f.FindObjectAny("reportSummaryContents")
40  #(skip filecheck for HcalTiming files!!)
41  if (hist == None and rootfile.rfind('HcalTiming') == -1):
42  #print "File is incomplete"
43  f.Close()
44  return 0
45  else:
46  #print "File is OK"
47  f.Close()
48  return 1
49 
50 while True:
51  #### search new files
52  NRUNS = 0
53  NFOUND = 0
54  NEW = {}
55  for dir, subdirs, files in os.walk(DIR):
56  for f in files:
57  if not f.startswith("DQM_Reference") and re.match(r'^DQM_.*_R[0-9]{9}\.root$', f):
58  runnr = f[-14:-5]
59  donefile = "%s/%s/%s/%s" % (DONEDIR, runnr[0:3], runnr[3:6], f)
60  f = "%s/%s" % (dir, f)
61  if os.path.exists(donefile) and os.stat(donefile).st_size == os.stat(f).st_size:
62  print "WARNING: %s was already processed but re-appeared" % f
63  os.remove(f)
64  continue
65  NEW.setdefault(runnr, []).append(f)
66  NFOUND += 1
67 
68  if NFOUND:
69  print '%s: found %d new files in %d runs.' % (datetime.now(), NFOUND, len(NEW))
70 
71  newFiles = []
72  allOldFiles = []
73  for run in sorted(NEW.keys())[::-1]:
74  NRUNS += 1
75  if NRUNS > MAX_RUNS:
76  break
77 
78  files = NEW[run]
79  runnr = "%09d" % long(run)
80  destdir = "%s/%s/%s" % (FILEDIR, runnr[0:3], runnr[3:6])
81  donedir = "%s/%s/%s" % (DONEDIR, runnr[0:3], runnr[3:6])
82  oldfiles = sorted(glob.glob("%s/DQM_V????_R%s.root" % (destdir, runnr)))[::-1]
83  if len(oldfiles) > 0:
84  version = int(oldfiles[0][-20:-16]) + 1
85  files.append(oldfiles[0])
86  else:
87  version = 1
88 
89  if not os.path.exists(destdir):
90  os.makedirs(destdir)
91  if not os.path.exists(donedir):
92  os.makedirs(donedir)
93 
94  destfile = "%s/DQM_V%04d_R%s.root" % (destdir, version, runnr)
95  logfile = "%s.log" % destfile[:-5]
96  tmpdestfile = "%s.tmp" % destfile
97 
98  print 'Merging run %s to %s (adding %s to %s)' % (run, destfile, files, oldfiles)
99  LOGFILE = open(logfile, 'a')
100  LOGFILE.write(os.popen('DQMMergeFile %s %s' % (tmpdestfile, " ".join(files))).read())
101  LOGFILE.close()
102  if not os.path.exists(tmpdestfile):
103  print 'Failed merging files for run %s. Will try again later.' % run
104  sendmail(YourMail,run)
105  continue
106 
107  os.rename(tmpdestfile, destfile)
108  for f in files:
109  os.rename(f, "%s/%s" % (donedir, f.rsplit('/', 1)[1]))
110 
111  allOldFiles.extend(oldfiles)
112  newFiles.append((long(run), destfile))
113 
114  if os.path.exists(TMPDB):
115  os.remove(TMPDB)
116 
117  if os.path.exists(DB):
118  os.rename(DB, TMPDB)
119  else:
120  os.system('set -x; visDQMRegisterFile %s "/Global/Online/ALL" "Global run"' % TMPDB)
121 
122  if len(allOldFiles) > 0:
123  os.system('set -x; visDQMUnregisterFile %s %s' % (TMPDB, " ".join(allOldFiles)))
124 
125  existing = [long(x) for x in os.popen("sqlite3 %s 'select distinct runnr from t_data'" % TMPDB).read().split()]
126  for runnr, file in newFiles:
127  print 'Registering %s for run %d' % (file, runnr)
128  older = sorted([x for x in existing if x < runnr])
129  newer = sorted([x for x in existing if x > runnr])
130  if len(newer) > MAX_TOTAL_RUNS:
131  print "Too many newer runs (%d), not registering %s for run %d" % (len(newer), file, runnr)
132  continue
133 
134  if len(older) > MAX_TOTAL_RUNS:
135  print "Too many older runs (%d), pruning data for oldest run %d" % (len(older), older[0])
136  os.system(r"set -x; sqlite3 %s 'delete from t_data where runnr = %d'" % (TMPDB, older[0]))
137  os.system(r"set -x; sqlite3 %s 'delete from t_files where name like '\''%%R%09d.root'\'" % (TMPDB, older[0]))
138  os.system(r"set -x; sqlite3 %s 'vacuum'" % TMPDB)
139  existing.remove(older[0])
140 
141  os.system('set -x; visDQMRegisterFile %s "/Global/Online/ALL" "Global run" %s' % (TMPDB, file))
142  existing.append(runnr)
143 
144  os.rename(TMPDB, DB)
145 
146  if NRUNS <= MAX_RUNS:
147  time.sleep(WAITTIME)
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
double split
Definition: MVATrainer.cc:139