CMS 3D CMS Logo

dqmPostProcessing_online.py
Go to the documentation of this file.
1 #! /usr/bin/env python
2 
3 from __future__ import print_function
4 import os, time, sys, shutil, glob, smtplib, re
5 from datetime import datetime
6 from email.MIMEText import MIMEText
7 #from ROOT import TFile
8 sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
9 
10 DIR = '/data/dqm/dropbox' # directory to search new files
11 DB = '/home/dqm/dqm.db' #master db
12 TMPDB = '/home/dqm/dqm.db.tmp' # temporal db
13 FILEDIR = '/data/dqm/merged' # directory, to which merged file is stored
14 DONEDIR = '/data/dqm/done' # directory, to which processed files are stored
15 WAITTIME = 120 # waiting time for new files (sec)
16 MAX_TOTAL_RUNS = 400
17 MAX_RUNS = 10
18 
19 YourMail = "lilopera@cern.ch"
20 ServerMail = "dqm@srv-C2D05-19.cms"
21 
22 def sendmail(EmailAddress,run):
23  s=smtplib.SMTP("localhost")
24  tolist=[EmailAddress, "lat@cern.ch"]
25  body="File merge failed by unknown reason for run"+run
26  msg = MIMEText(body)
27  msg['Subject'] = "File merge failed."
28  msg['From'] = ServerMail
29  msg['To'] = EmailAddress
30  s.sendmail(ServerMail,tolist,msg.as_string())
31  s.quit()
32 
33 def filecheck(rootfile):
34  f = TFile(rootfile)
35  if (f.IsZombie()):
36  #print "File corrupted"
37  f.Close()
38  return 0
39  else:
40  hist = f.FindObjectAny("reportSummaryContents")
41  #(skip filecheck for HcalTiming files!!)
42  if (hist == None and rootfile.rfind('HcalTiming') == -1):
43  #print "File is incomplete"
44  f.Close()
45  return 0
46  else:
47  #print "File is OK"
48  f.Close()
49  return 1
50 
51 while True:
52 
53  NRUNS = 0
54  NFOUND = 0
55  NEW = {}
56  for dir, subdirs, files in os.walk(DIR):
57  for f in files:
58  if not f.startswith("DQM_Reference") and re.match(r'^DQM_.*_R[0-9]{9}\.root$', f):
59  runnr = f[-14:-5]
60  donefile = "%s/%s/%s/%s" % (DONEDIR, runnr[0:3], runnr[3:6], f)
61  f = "%s/%s" % (dir, f)
62  if os.path.exists(donefile) and os.stat(donefile).st_size == os.stat(f).st_size:
63  print("WARNING: %s was already processed but re-appeared" % f)
64  os.remove(f)
65  continue
66  NEW.setdefault(runnr, []).append(f)
67  NFOUND += 1
68 
69  if NFOUND:
70  print('%s: found %d new files in %d runs.' % (datetime.now(), NFOUND, len(NEW)))
71 
72  newFiles = []
73  allOldFiles = []
74  for run in sorted(NEW.keys())[::-1]:
75  NRUNS += 1
76  if NRUNS > MAX_RUNS:
77  break
78 
79  files = NEW[run]
80  runnr = "%09d" % long(run)
81  destdir = "%s/%s/%s" % (FILEDIR, runnr[0:3], runnr[3:6])
82  donedir = "%s/%s/%s" % (DONEDIR, runnr[0:3], runnr[3:6])
83  oldfiles = sorted(glob.glob("%s/DQM_V????_R%s.root" % (destdir, runnr)))[::-1]
84  if len(oldfiles) > 0:
85  version = int(oldfiles[0][-20:-16]) + 1
86  files.append(oldfiles[0])
87  else:
88  version = 1
89 
90  if not os.path.exists(destdir):
91  os.makedirs(destdir)
92  if not os.path.exists(donedir):
93  os.makedirs(donedir)
94 
95  destfile = "%s/DQM_V%04d_R%s.root" % (destdir, version, runnr)
96  logfile = "%s.log" % destfile[:-5]
97  tmpdestfile = "%s.tmp" % destfile
98 
99  print('Merging run %s to %s (adding %s to %s)' % (run, destfile, files, oldfiles))
100  LOGFILE = open(logfile, 'a')
101  LOGFILE.write(os.popen('DQMMergeFile %s %s' % (tmpdestfile, " ".join(files))).read())
102  LOGFILE.close()
103  if not os.path.exists(tmpdestfile):
104  print('Failed merging files for run %s. Will try again later.' % run)
105  sendmail(YourMail,run)
106  continue
107 
108  os.rename(tmpdestfile, destfile)
109  for f in files:
110  os.rename(f, "%s/%s" % (donedir, f.rsplit('/', 1)[1]))
111 
112  allOldFiles.extend(oldfiles)
113  newFiles.append((long(run), destfile))
114 
115  if os.path.exists(TMPDB):
116  os.remove(TMPDB)
117 
118  if os.path.exists(DB):
119  os.rename(DB, TMPDB)
120  else:
121  os.system('set -x; visDQMRegisterFile %s "/Global/Online/ALL" "Global run"' % TMPDB)
122 
123  if len(allOldFiles) > 0:
124  os.system('set -x; visDQMUnregisterFile %s %s' % (TMPDB, " ".join(allOldFiles)))
125 
126  existing = [long(x) for x in os.popen("sqlite3 %s 'select distinct runnr from t_data'" % TMPDB).read().split()]
127  for runnr, file in newFiles:
128  print('Registering %s for run %d' % (file, runnr))
129  older = sorted([x for x in existing if x < runnr])
130  newer = sorted([x for x in existing if x > runnr])
131  if len(newer) > MAX_TOTAL_RUNS:
132  print("Too many newer runs (%d), not registering %s for run %d" % (len(newer), file, runnr))
133  continue
134 
135  if len(older) > MAX_TOTAL_RUNS:
136  print("Too many older runs (%d), pruning data for oldest run %d" % (len(older), older[0]))
137  os.system(r"set -x; sqlite3 %s 'delete from t_data where runnr = %d'" % (TMPDB, older[0]))
138  os.system(r"set -x; sqlite3 %s 'delete from t_files where name like '\''%%R%09d.root'\'" % (TMPDB, older[0]))
139  os.system(r"set -x; sqlite3 %s 'vacuum'" % TMPDB)
140  existing.remove(older[0])
141 
142  os.system('set -x; visDQMRegisterFile %s "/Global/Online/ALL" "Global run" %s' % (TMPDB, file))
143  existing.append(runnr)
144 
145  os.rename(TMPDB, DB)
146 
147  if NRUNS <= MAX_RUNS:
148  time.sleep(WAITTIME)
join
static std::string join(char **cmd)
Definition: RemoteFile.cc:17
cms::dd::split
std::vector< std::string_view > split(std::string_view, const char *)
dqmPostProcessing_online.sendmail
def sendmail(EmailAddress, run)
Definition: dqmPostProcessing_online.py:22
mps_setup.append
append
Definition: mps_setup.py:85
createfilelist.int
int
Definition: createfilelist.py:10
dqmPostProcessing_online.filecheck
def filecheck(rootfile)
Definition: dqmPostProcessing_online.py:33
edm::print
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
readEcalDQMStatus.read
read
Definition: readEcalDQMStatus.py:38