CMS 3D CMS Logo

fileCollector.py
Go to the documentation of this file.
1 #! /usr/bin/env python
2 from __future__ import print_function
3 from builtins import range
4 import os, time, sys, glob, re, shutil, stat, smtplib, socket
5 from email.MIMEText import MIMEText
6 from fcntl import lockf, LOCK_EX, LOCK_UN
7 from hashlib import md5
8 from traceback import print_exc, format_exc
9 from datetime import datetime
10 from subprocess import Popen,PIPE
11 sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
12 
13 EMAIL = sys.argv[1]
14 COLLECTDIR = sys.argv[2] # Directory from where to pick up root files
15 TFILEDONEDIR = sys.argv[3] # Directory to store processed *_T files
16 DROPBOX = sys.argv[4] # Directory where to liave the files
17 
18 # Constants
19 WAITTIME = 10
20 EMAILINTERVAL = 15 * 60 # Time between sent emails
21 SBASEDIR = os.path.abspath(__file__).rsplit("/",1)[0]
22 TMPDROPBOX = "%s/.tmpdropbox" % DROPBOX
23 RETRIES = 3
24 SENDMAIL = "/usr/sbin/sendmail" # sendmail location
25 HOSTNAME = socket.gethostname().lower()
26 
27 # Control variables
28 lastEmailSent = datetime.now()
29 
30 # --------------------------------------------------------------------
31 def logme(msg, *args):
32  procid = "[%s/%d]" % (__file__.rsplit("/", 1)[-1], os.getpid())
33  print(datetime.now(), procid, msg % args)
34 
35 def filecheck(rootfile):
36  cmd = 'root -l -b -q %s/filechk.C"(\\"%s\\")"' % (SBASEDIR,rootfile)
37  a = os.popen(cmd).read().split()
38  tag=a.pop()
39  if tag == '(int)(-1)' or tag == '(int)0':
40  return 0
41 
42  if tag == '(int)1':
43  return 1
44 
45  return 0
46 
47 def convert(infile, ofile):
48  cmd = 'root -l -b -q %s/sistrip_reduce_file.C++"' \
49  '(\\"%s\\", \\"%s\\")" >& /dev/null' % (SBASEDIR,infile, ofile)
50  os.system(cmd)
51 
52 def sendmail(body="Hello from visDQMZipCastorVerifier"):
53  scall = Popen("%s -t" % SENDMAIL, shell=True, stdin=PIPE)
54  scall.stdin.write("To: %s\n" % EMAIL)
55  scall.stdin.write("Subject: File Collector on server %s has a Critical Error\n" %
56  HOSTNAME)
57  scall.stdin.write("\n") # blank line separating headers from body
58  scall.stdin.write("%s\n" % body)
59  scall.stdin.close()
60  rc = scall.wait()
61  if rc != 0:
62  logme("ERROR: Sendmail exit with status %s", rc)
63 
64 # --------------------------------------------------------------------
65 if not os.path.exists(TMPDROPBOX):
66  os.makedirs(TMPDROPBOX)
67 
68 if not os.path.exists(TFILEDONEDIR):
69  os.makedirs(TFILEDONEDIR)
70 
71 if not os.path.exists(DROPBOX):
72  os.makedirs(DROPBOX)
73 
74 while True:
75  try:
76  NRUNS = 0 #Number of runs found
77  NFOUND = 0 #Number of files found
78  NEW = {}
79  TAGS= []
80  for dir, subdirs, files in os.walk(COLLECTDIR):
81  for f in files:
82  fMatch=re.match('^DQM_V[0-9]{4}_(?P<subsys>.*)_R(?P<runnr>[0-9]{9})(|_T[0-9]*)\.root$',f)
83  if not fMatch:
84  fMatch=re.match('^Playback_V[0-9]{4}_(?P<subsys>.*)_R(?P<runnr>[0-9]{9})(|_T[0-9]*)\.root$', f)
85 
86  if fMatch:
87  runnr = int(fMatch.group("runnr"))
88  subsystem=fMatch.group("subsys")
89  runstr="%09d" % runnr
90  donefile = "%s/%s/%s/%s" % (TFILEDONEDIR, runstr[0:3], runstr[3:6], f)
91  f = "%s/%s" % (dir, f)
92  if os.path.exists(donefile) and os.stat(donefile).st_size == os.stat(f).st_size:
93  logme("WARNING: File %s was already processed but re-appeared", f)
94  os.remove(f)
95  continue
96 
97  NEW.setdefault(runnr, {}).setdefault(subsystem,[]).append(f)
98  NFOUND += 1
99 
100  if len(NEW) == 0:
101  time.sleep(WAITTIME)
102  continue
103 
104  TAGS=sorted(glob.glob('%s/tagfile_runend_*' % COLLECTDIR ),reverse=True)
105  if len(TAGS)==0:
106  if len(NEW) <= 1:
107  time.sleep(WAITTIME)
108  continue
109 
110  TAGRUNEND=long(sorted(NEW.keys(),reverse=True)[1])
111 
112  else:
113  TAGRUNEND=long(TAGS[0].split("_")[2])
114 
115  for tag in TAGS:
116  os.remove(tag)
117 
118  for run,subsystems in NEW.items():
119  if run > TAGRUNEND:
120  continue
121 
122  for subsystem,files in subsystems.items():
123  done=False
124  keeper=0
125  Tfiles=sorted(files,cmp=lambda x,y: "_T" not in x and x != y and 1 or cmp(x,y))[::-1]
126  for Tfile in Tfiles:
127  seed=HOSTNAME.replace("-","t")[-6:]
128  finalTMPfile="%s/DQM_V0001_%s_R%09d.root.%s" % (TMPDROPBOX,subsystem,run,seed)
129  runstr="%09d" % run
130  finalTfile="%s/%s/%s/%s" % (TFILEDONEDIR,runstr[0:3],runstr[3:6],Tfile.split("/")[-1])
131  finalTdir="%s/%s/%s" % (TFILEDONEDIR,runstr[0:3],runstr[3:6])
132  if not os.path.exists(finalTdir):
133  os.makedirs(finalTdir)
134 
135  if os.path.exists(finalTMPfile):
136  os.remove(finalTMPfile)
137 
138  if done:
139  if keeper == 0:
140  keeper+=1
141  shutil.move(Tfile,finalTfile+"_d")
142 
143  else:
144  os.remove(Tfile)
145 
146  continue
147 
148  if filecheck(Tfile) != 1:
149  logme("INFO: File %s is incomplete looking for next"
150  " DQM_V*_%s_R%09d_T*.root valid file",
151  Tfile, subsystem, run)
152  if keeper == 0:
153  keeper+=1
154  shutil.move(Tfile,finalTfile+"_d")
155 
156  else:
157  os.remove(Tfile)
158 
159  continue
160 
161  if "Playback" in Tfile and "SiStrip" in Tfile:
162  dqmfile = Tfile.replace('Playback','DQM')
163  convert(Tfile,dqmfile)
164  if not os.path.exists(dqmfile):
165  logme("WARNING: Problem converting %s skiping", Tfile)
166  shutil.move(Tfile,finalTfile+"_d")
167  continue
168 
169  os.rename(Tfile,finalTfile.replace('Playback','Playback_full'))
170  Tfile=dqmfile
171 
172  for i in range(RETRIES):
173  md5Digest=md5(file(Tfile).read())
174  originStr="md5:%s %d %s" % (md5Digest.hexdigest(),os.stat(Tfile).st_size,Tfile)
175  originTMPFile="%s.origin" % finalTMPfile
176  originFile=open(originTMPFile,"w")
177  originFile.write(originStr)
178  originFile.close()
179  shutil.copy(Tfile,finalTMPfile)
180  version=1
181  lFile=open("%s/lock" % TMPDROPBOX ,"a")
182  lockf(lFile,LOCK_EX)
183  for vdir,vsubdir,vfiles in os.walk(DROPBOX):
184  if 'DQM_V0001_%s_R%09d.root' % (subsystem,run) not in vfiles:
185  continue
186  version += 1
187 
188  if not os.path.exists("%s/V%04d" % (DROPBOX,version)):
189  os.makedirs("%s/V%04d" % (DROPBOX,version))
190 
191  finalfile="%s/V%04d/DQM_V0001_%s_R%09d.root" % (DROPBOX,version,subsystem,run)
192  originFileName="%s.origin" % finalfile
193  if os.path.exists(finalTMPfile) and os.stat(finalTMPfile).st_size == os.stat(Tfile).st_size:
194  os.rename(Tfile,finalTfile)
195  os.rename(finalTMPfile,finalfile)
196  os.rename(originTMPFile,originFileName)
197  os.chmod(finalfile,stat.S_IREAD|stat.S_IRGRP|stat.S_IROTH| stat.S_IWRITE|stat.S_IWGRP|stat.S_IWOTH)
198  os.chmod(originFileName,stat.S_IREAD|stat.S_IRGRP|stat.S_IROTH| stat.S_IWRITE|stat.S_IWGRP|stat.S_IWOTH)
199  logme("INFO: File %s has been successfully sent to the DROPBOX" , Tfile)
200  lockf(lFile,LOCK_UN)
201  lFile.close()
202  break
203  else:
204  logme("ERROR: Problem transfering final file for run"
205  " %09d. Retrying in %d", run, WAITTIME)
206  if i == RETRIES-1:
207  now = datetime.now()
208  if now - EMAILINTERVAL > lastEmailSent:
209  sendmail("ERROR: Problem transfering final file for run"
210  " %09d.\n Retrying in %d seconds" % (run, WAITTIME))
211  lastEmailSent = now
212 
213  time.sleep(WAITTIME)
214  lockf(lFile,LOCK_UN)
215  lFile.close()
216  done=True
217 
218  except KeyboardInterrupt as e:
219  sys.exit(0)
220 
221  except Exception as e:
222  logme('ERROR: %s', e)
223  now = datetime.now()
224  if now - EMAILINTERVAL > lastEmailSent:
225  sendmail ('ERROR: %s\n%s' % (e, format_exc()))
226  lastEmailSent = now
227 
228  print_exc()
def logme(msg, args)
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
def filecheck(rootfile)
def convert(infile, ofile)
def sendmail(body="Hello from visDQMZipCastorVerifier")
double split
Definition: MVATrainer.cc:139