CMS 3D CMS Logo

fileCollector.py
Go to the documentation of this file.
1 #! /usr/bin/env python
2 from __future__ import print_function
3 import os, time, sys, glob, re, shutil, stat, smtplib, socket
4 from email.MIMEText import MIMEText
5 from fcntl import lockf, LOCK_EX, LOCK_UN
6 from hashlib import md5
7 from traceback import print_exc, format_exc
8 from datetime import datetime
9 from subprocess import Popen,PIPE
10 sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
11 
12 EMAIL = sys.argv[1]
13 COLLECTDIR = sys.argv[2] # Directory from where to pick up root files
14 TFILEDONEDIR = sys.argv[3] # Directory to store processed *_T files
15 DROPBOX = sys.argv[4] # Directory where to liave the files
16 
17 # Constants
18 WAITTIME = 10
19 EMAILINTERVAL = 15 * 60 # Time between sent emails
20 SBASEDIR = os.path.abspath(__file__).rsplit("/",1)[0]
21 TMPDROPBOX = "%s/.tmpdropbox" % DROPBOX
22 RETRIES = 3
23 SENDMAIL = "/usr/sbin/sendmail" # sendmail location
24 HOSTNAME = socket.gethostname().lower()
25 
26 # Control variables
27 lastEmailSent = datetime.now()
28 
29 # --------------------------------------------------------------------
30 def logme(msg, *args):
31  procid = "[%s/%d]" % (__file__.rsplit("/", 1)[-1], os.getpid())
32  print(datetime.now(), procid, msg % args)
33 
34 def filecheck(rootfile):
35  cmd = 'root -l -b -q %s/filechk.C"(\\"%s\\")"' % (SBASEDIR,rootfile)
36  a = os.popen(cmd).read().split()
37  tag=a.pop()
38  if tag == '(int)(-1)' or tag == '(int)0':
39  return 0
40 
41  if tag == '(int)1':
42  return 1
43 
44  return 0
45 
46 def convert(infile, ofile):
47  cmd = 'root -l -b -q %s/sistrip_reduce_file.C++"' \
48  '(\\"%s\\", \\"%s\\")" >& /dev/null' % (SBASEDIR,infile, ofile)
49  os.system(cmd)
50 
51 def sendmail(body="Hello from visDQMZipCastorVerifier"):
52  scall = Popen("%s -t" % SENDMAIL, shell=True, stdin=PIPE)
53  scall.stdin.write("To: %s\n" % EMAIL)
54  scall.stdin.write("Subject: File Collector on server %s has a Critical Error\n" %
55  HOSTNAME)
56  scall.stdin.write("\n") # blank line separating headers from body
57  scall.stdin.write("%s\n" % body)
58  scall.stdin.close()
59  rc = scall.wait()
60  if rc != 0:
61  logme("ERROR: Sendmail exit with status %s", rc)
62 
63 # --------------------------------------------------------------------
64 if not os.path.exists(TMPDROPBOX):
65  os.makedirs(TMPDROPBOX)
66 
67 if not os.path.exists(TFILEDONEDIR):
68  os.makedirs(TFILEDONEDIR)
69 
70 if not os.path.exists(DROPBOX):
71  os.makedirs(DROPBOX)
72 
73 while True:
74  try:
75  NRUNS = 0 #Number of runs found
76  NFOUND = 0 #Number of files found
77  NEW = {}
78  TAGS= []
79  for dir, subdirs, files in os.walk(COLLECTDIR):
80  for f in files:
81  fMatch=re.match('^DQM_V[0-9]{4}_(?P<subsys>.*)_R(?P<runnr>[0-9]{9})(|_T[0-9]*)\.root$',f)
82  if not fMatch:
83  fMatch=re.match('^Playback_V[0-9]{4}_(?P<subsys>.*)_R(?P<runnr>[0-9]{9})(|_T[0-9]*)\.root$', f)
84 
85  if fMatch:
86  runnr = int(fMatch.group("runnr"))
87  subsystem=fMatch.group("subsys")
88  runstr="%09d" % runnr
89  donefile = "%s/%s/%s/%s" % (TFILEDONEDIR, runstr[0:3], runstr[3:6], f)
90  f = "%s/%s" % (dir, f)
91  if os.path.exists(donefile) and os.stat(donefile).st_size == os.stat(f).st_size:
92  logme("WARNING: File %s was already processed but re-appeared", f)
93  os.remove(f)
94  continue
95 
96  NEW.setdefault(runnr, {}).setdefault(subsystem,[]).append(f)
97  NFOUND += 1
98 
99  if len(NEW) == 0:
100  time.sleep(WAITTIME)
101  continue
102 
103  TAGS=sorted(glob.glob('%s/tagfile_runend_*' % COLLECTDIR ),reverse=True)
104  if len(TAGS)==0:
105  if len(NEW) <= 1:
106  time.sleep(WAITTIME)
107  continue
108 
109  TAGRUNEND=long(sorted(NEW.keys(),reverse=True)[1])
110 
111  else:
112  TAGRUNEND=long(TAGS[0].split("_")[2])
113 
114  for tag in TAGS:
115  os.remove(tag)
116 
117  for run,subsystems in NEW.items():
118  if run > TAGRUNEND:
119  continue
120 
121  for subsystem,files in subsystems.items():
122  done=False
123  keeper=0
124  Tfiles=sorted(files,cmp=lambda x,y: "_T" not in x and x != y and 1 or cmp(x,y))[::-1]
125  for Tfile in Tfiles:
126  seed=HOSTNAME.replace("-","t")[-6:]
127  finalTMPfile="%s/DQM_V0001_%s_R%09d.root.%s" % (TMPDROPBOX,subsystem,run,seed)
128  runstr="%09d" % run
129  finalTfile="%s/%s/%s/%s" % (TFILEDONEDIR,runstr[0:3],runstr[3:6],Tfile.split("/")[-1])
130  finalTdir="%s/%s/%s" % (TFILEDONEDIR,runstr[0:3],runstr[3:6])
131  if not os.path.exists(finalTdir):
132  os.makedirs(finalTdir)
133 
134  if os.path.exists(finalTMPfile):
135  os.remove(finalTMPfile)
136 
137  if done:
138  if keeper == 0:
139  keeper+=1
140  shutil.move(Tfile,finalTfile+"_d")
141 
142  else:
143  os.remove(Tfile)
144 
145  continue
146 
147  if filecheck(Tfile) != 1:
148  logme("INFO: File %s is incomplete looking for next"
149  " DQM_V*_%s_R%09d_T*.root valid file",
150  Tfile, subsystem, run)
151  if keeper == 0:
152  keeper+=1
153  shutil.move(Tfile,finalTfile+"_d")
154 
155  else:
156  os.remove(Tfile)
157 
158  continue
159 
160  if "Playback" in Tfile and "SiStrip" in Tfile:
161  dqmfile = Tfile.replace('Playback','DQM')
162  convert(Tfile,dqmfile)
163  if not os.path.exists(dqmfile):
164  logme("WARNING: Problem converting %s skiping", Tfile)
165  shutil.move(Tfile,finalTfile+"_d")
166  continue
167 
168  os.rename(Tfile,finalTfile.replace('Playback','Playback_full'))
169  Tfile=dqmfile
170 
171  for i in range(RETRIES):
172  md5Digest=md5(file(Tfile).read())
173  originStr="md5:%s %d %s" % (md5Digest.hexdigest(),os.stat(Tfile).st_size,Tfile)
174  originTMPFile="%s.origin" % finalTMPfile
175  originFile=open(originTMPFile,"w")
176  originFile.write(originStr)
177  originFile.close()
178  shutil.copy(Tfile,finalTMPfile)
179  version=1
180  lFile=open("%s/lock" % TMPDROPBOX ,"a")
181  lockf(lFile,LOCK_EX)
182  for vdir,vsubdir,vfiles in os.walk(DROPBOX):
183  if 'DQM_V0001_%s_R%09d.root' % (subsystem,run) not in vfiles:
184  continue
185  version += 1
186 
187  if not os.path.exists("%s/V%04d" % (DROPBOX,version)):
188  os.makedirs("%s/V%04d" % (DROPBOX,version))
189 
190  finalfile="%s/V%04d/DQM_V0001_%s_R%09d.root" % (DROPBOX,version,subsystem,run)
191  originFileName="%s.origin" % finalfile
192  if os.path.exists(finalTMPfile) and os.stat(finalTMPfile).st_size == os.stat(Tfile).st_size:
193  os.rename(Tfile,finalTfile)
194  os.rename(finalTMPfile,finalfile)
195  os.rename(originTMPFile,originFileName)
196  os.chmod(finalfile,stat.S_IREAD|stat.S_IRGRP|stat.S_IROTH| stat.S_IWRITE|stat.S_IWGRP|stat.S_IWOTH)
197  os.chmod(originFileName,stat.S_IREAD|stat.S_IRGRP|stat.S_IROTH| stat.S_IWRITE|stat.S_IWGRP|stat.S_IWOTH)
198  logme("INFO: File %s has been successfully sent to the DROPBOX" , Tfile)
199  lockf(lFile,LOCK_UN)
200  lFile.close()
201  break
202  else:
203  logme("ERROR: Problem transfering final file for run"
204  " %09d. Retrying in %d", run, WAITTIME)
205  if i == RETRIES-1:
206  now = datetime.now()
207  if now - EMAILINTERVAL > lastEmailSent:
208  sendmail("ERROR: Problem transfering final file for run"
209  " %09d.\n Retrying in %d seconds" % (run, WAITTIME))
210  lastEmailSent = now
211 
212  time.sleep(WAITTIME)
213  lockf(lFile,LOCK_UN)
214  lFile.close()
215  done=True
216 
217  except KeyboardInterrupt as e:
218  sys.exit(0)
219 
220  except Exception as e:
221  logme('ERROR: %s', e)
222  now = datetime.now()
223  if now - EMAILINTERVAL > lastEmailSent:
224  sendmail ('ERROR: %s\n%s' % (e, format_exc()))
225  lastEmailSent = now
226 
227  print_exc()
def logme(msg, args)
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
def filecheck(rootfile)
def convert(infile, ofile)
def sendmail(body="Hello from visDQMZipCastorVerifier")
double split
Definition: MVATrainer.cc:139