CMS 3D CMS Logo

fileCollector.py
Go to the documentation of this file.
1 #! /usr/bin/env python
2 import os, time, sys, glob, re, shutil, stat, smtplib, socket
3 from email.MIMEText import MIMEText
4 from fcntl import lockf, LOCK_EX, LOCK_UN
5 from hashlib import md5
6 from traceback import print_exc, format_exc
7 from datetime import datetime
8 from subprocess import Popen,PIPE
9 sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
10 
11 EMAIL = sys.argv[1]
12 COLLECTDIR = sys.argv[2] # Directory from where to pick up root files
13 TFILEDONEDIR = sys.argv[3] # Directory to store processed *_T files
14 DROPBOX = sys.argv[4] # Directory where to liave the files
15 
16 # Constants
17 WAITTIME = 10
18 EMAILINTERVAL = 15 * 60 # Time between sent emails
19 SBASEDIR = os.path.abspath(__file__).rsplit("/",1)[0]
20 TMPDROPBOX = "%s/.tmpdropbox" % DROPBOX
21 RETRIES = 3
22 SENDMAIL = "/usr/sbin/sendmail" # sendmail location
23 HOSTNAME = socket.gethostname().lower()
24 
25 # Control variables
26 lastEmailSent = datetime.now()
27 
28 # --------------------------------------------------------------------
29 def logme(msg, *args):
30  procid = "[%s/%d]" % (__file__.rsplit("/", 1)[-1], os.getpid())
31  print datetime.now(), procid, msg % args
32 
33 def filecheck(rootfile):
34  cmd = 'root -l -b -q %s/filechk.C"(\\"%s\\")"' % (SBASEDIR,rootfile)
35  a = os.popen(cmd).read().split()
36  tag=a.pop()
37  if tag == '(int)(-1)' or tag == '(int)0':
38  return 0
39 
40  if tag == '(int)1':
41  return 1
42 
43  return 0
44 
45 def convert(infile, ofile):
46  cmd = 'root -l -b -q %s/sistrip_reduce_file.C++"' \
47  '(\\"%s\\", \\"%s\\")" >& /dev/null' % (SBASEDIR,infile, ofile)
48  os.system(cmd)
49 
50 def sendmail(body="Hello from visDQMZipCastorVerifier"):
51  scall = Popen("%s -t" % SENDMAIL, shell=True, stdin=PIPE)
52  scall.stdin.write("To: %s\n" % EMAIL)
53  scall.stdin.write("Subject: File Collector on server %s has a Critical Error\n" %
54  HOSTNAME)
55  scall.stdin.write("\n") # blank line separating headers from body
56  scall.stdin.write("%s\n" % body)
57  scall.stdin.close()
58  rc = scall.wait()
59  if rc != 0:
60  logme("ERROR: Sendmail exit with status %s", rc)
61 
62 # --------------------------------------------------------------------
63 if not os.path.exists(TMPDROPBOX):
64  os.makedirs(TMPDROPBOX)
65 
66 if not os.path.exists(TFILEDONEDIR):
67  os.makedirs(TFILEDONEDIR)
68 
69 if not os.path.exists(DROPBOX):
70  os.makedirs(DROPBOX)
71 
72 while True:
73  try:
74  NRUNS = 0 #Number of runs found
75  NFOUND = 0 #Number of files found
76  NEW = {}
77  TAGS= []
78  for dir, subdirs, files in os.walk(COLLECTDIR):
79  for f in files:
80  fMatch=re.match('^DQM_V[0-9]{4}_(?P<subsys>.*)_R(?P<runnr>[0-9]{9})(|_T[0-9]*)\.root$',f)
81  if not fMatch:
82  fMatch=re.match('^Playback_V[0-9]{4}_(?P<subsys>.*)_R(?P<runnr>[0-9]{9})(|_T[0-9]*)\.root$', f)
83 
84  if fMatch:
85  runnr = int(fMatch.group("runnr"))
86  subsystem=fMatch.group("subsys")
87  runstr="%09d" % runnr
88  donefile = "%s/%s/%s/%s" % (TFILEDONEDIR, runstr[0:3], runstr[3:6], f)
89  f = "%s/%s" % (dir, f)
90  if os.path.exists(donefile) and os.stat(donefile).st_size == os.stat(f).st_size:
91  logme("WARNING: File %s was already processed but re-appeared", f)
92  os.remove(f)
93  continue
94 
95  NEW.setdefault(runnr, {}).setdefault(subsystem,[]).append(f)
96  NFOUND += 1
97 
98  if len(NEW) == 0:
99  time.sleep(WAITTIME)
100  continue
101 
102  TAGS=sorted(glob.glob('%s/tagfile_runend_*' % COLLECTDIR ),reverse=True)
103  if len(TAGS)==0:
104  if len(NEW) <= 1:
105  time.sleep(WAITTIME)
106  continue
107 
108  TAGRUNEND=long(sorted(NEW.keys(),reverse=True)[1])
109 
110  else:
111  TAGRUNEND=long(TAGS[0].split("_")[2])
112 
113  for tag in TAGS:
114  os.remove(tag)
115 
116  for run,subsystems in NEW.items():
117  if run > TAGRUNEND:
118  continue
119 
120  for subsystem,files in subsystems.items():
121  done=False
122  keeper=0
123  Tfiles=sorted(files,cmp=lambda x,y: "_T" not in x and x != y and 1 or cmp(x,y))[::-1]
124  for Tfile in Tfiles:
125  seed=HOSTNAME.replace("-","t")[-6:]
126  finalTMPfile="%s/DQM_V0001_%s_R%09d.root.%s" % (TMPDROPBOX,subsystem,run,seed)
127  runstr="%09d" % run
128  finalTfile="%s/%s/%s/%s" % (TFILEDONEDIR,runstr[0:3],runstr[3:6],Tfile.split("/")[-1])
129  finalTdir="%s/%s/%s" % (TFILEDONEDIR,runstr[0:3],runstr[3:6])
130  if not os.path.exists(finalTdir):
131  os.makedirs(finalTdir)
132 
133  if os.path.exists(finalTMPfile):
134  os.remove(finalTMPfile)
135 
136  if done:
137  if keeper == 0:
138  keeper+=1
139  shutil.move(Tfile,finalTfile+"_d")
140 
141  else:
142  os.remove(Tfile)
143 
144  continue
145 
146  if filecheck(Tfile) != 1:
147  logme("INFO: File %s is incomplete looking for next"
148  " DQM_V*_%s_R%09d_T*.root valid file",
149  Tfile, subsystem, run)
150  if keeper == 0:
151  keeper+=1
152  shutil.move(Tfile,finalTfile+"_d")
153 
154  else:
155  os.remove(Tfile)
156 
157  continue
158 
159  if "Playback" in Tfile and "SiStrip" in Tfile:
160  dqmfile = Tfile.replace('Playback','DQM')
161  convert(Tfile,dqmfile)
162  if not os.path.exists(dqmfile):
163  logme("WARNING: Problem converting %s skiping", Tfile)
164  shutil.move(Tfile,finalTfile+"_d")
165  continue
166 
167  os.rename(Tfile,finalTfile.replace('Playback','Playback_full'))
168  Tfile=dqmfile
169 
170  for i in range(RETRIES):
171  md5Digest=md5(file(Tfile).read())
172  originStr="md5:%s %d %s" % (md5Digest.hexdigest(),os.stat(Tfile).st_size,Tfile)
173  originTMPFile="%s.origin" % finalTMPfile
174  originFile=open(originTMPFile,"w")
175  originFile.write(originStr)
176  originFile.close()
177  shutil.copy(Tfile,finalTMPfile)
178  version=1
179  lFile=open("%s/lock" % TMPDROPBOX ,"a")
180  lockf(lFile,LOCK_EX)
181  for vdir,vsubdir,vfiles in os.walk(DROPBOX):
182  if 'DQM_V0001_%s_R%09d.root' % (subsystem,run) not in vfiles:
183  continue
184  version += 1
185 
186  if not os.path.exists("%s/V%04d" % (DROPBOX,version)):
187  os.makedirs("%s/V%04d" % (DROPBOX,version))
188 
189  finalfile="%s/V%04d/DQM_V0001_%s_R%09d.root" % (DROPBOX,version,subsystem,run)
190  originFileName="%s.origin" % finalfile
191  if os.path.exists(finalTMPfile) and os.stat(finalTMPfile).st_size == os.stat(Tfile).st_size:
192  os.rename(Tfile,finalTfile)
193  os.rename(finalTMPfile,finalfile)
194  os.rename(originTMPFile,originFileName)
195  os.chmod(finalfile,stat.S_IREAD|stat.S_IRGRP|stat.S_IROTH| stat.S_IWRITE|stat.S_IWGRP|stat.S_IWOTH)
196  os.chmod(originFileName,stat.S_IREAD|stat.S_IRGRP|stat.S_IROTH| stat.S_IWRITE|stat.S_IWGRP|stat.S_IWOTH)
197  logme("INFO: File %s has been successfully sent to the DROPBOX" , Tfile)
198  lockf(lFile,LOCK_UN)
199  lFile.close()
200  break
201  else:
202  logme("ERROR: Problem transfering final file for run"
203  " %09d. Retrying in %d", run, WAITTIME)
204  if i == RETRIES-1:
205  now = datetime.now()
206  if now - EMAILINTERVAL > lastEmailSent:
207  sendmail("ERROR: Problem transfering final file for run"
208  " %09d.\n Retrying in %d seconds" % (run, WAITTIME))
209  lastEmailSent = now
210 
211  time.sleep(WAITTIME)
212  lockf(lFile,LOCK_UN)
213  lFile.close()
214  done=True
215 
216  except KeyboardInterrupt as e:
217  sys.exit(0)
218 
219  except Exception as e:
220  logme('ERROR: %s', e)
221  now = datetime.now()
222  if now - EMAILINTERVAL > lastEmailSent:
223  sendmail ('ERROR: %s\n%s' % (e, format_exc()))
224  lastEmailSent = now
225 
226  print_exc()
def logme(msg, args)
def filecheck(rootfile)
def convert(infile, ofile)
def sendmail(body="Hello from visDQMZipCastorVerifier")
double split
Definition: MVATrainer.cc:139