CMS 3D CMS Logo

fileCollector2.py
Go to the documentation of this file.
1 #! /usr/bin/env python
2 
3 from __future__ import print_function
4 import os,time,sys,zipfile,re,shutil,stat
5 from fcntl import lockf, LOCK_EX, LOCK_UN
6 from hashlib import md5
7 from glob import glob
8 from datetime import datetime
9 
10 COLLECTING_DIR = sys.argv[1] #Directory where to look for root files
11 T_FILE_DONE_DIR = sys.argv[2] #Directory where to place processed root files
12 DROPBOX = sys.argv[3] #Directory where the collected files are sent.
13 
14 EXEDIR = os.path.dirname(__file__)
15 COLLECTOR_WAIT_TIME = 10 # time between collector cilces
16 WAIT_TIME_FILE_PT = 60 * 15 # time to wait to pick up lost files
17 TMP_DROPBOX = os.path.join(DROPBOX,".uploading")
18 KEEP = 2 # number of _d files to keep
19 RETRIES = 3 # number of retries to sen a file
20 STOP_FILE = "%s/.stop" % EXEDIR
21 sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
22 os.environ["WorkDir"] = EXEDIR
23 
24 def logme(msg, *args):
25  procid = "[%s/%d]" % (__file__.rsplit("/", 1)[-1], os.getpid())
26  print(datetime.now(), procid, msg % args)
27 
28 def filecheck(rootfile):
29  cmd = EXEDIR + '/filechk.sh ' + rootfile
30  a = os.popen(cmd).read().split()
31  tag=a.pop()
32  if tag == '(int)(-1)':
33  logme("ERROR: File %s corrupted (isZombi)", rootfile)
34  return False
35  elif tag == '(int)0':
36  logme("ERROR: File %s is incomplete", rootfile)
37  return False
38  elif tag == '(int)1':
39  return True
40  else:
41  return False
42 
43 def isFileOpen(fName):
44  fName = os.path.realpath(fName)
45  pids=os.listdir('/proc')
46  for pid in sorted(pids):
47  try:
48  if not pid.isdigit():
49  continue
50 
51  if os.stat(os.path.join('/proc',pid)).st_uid != os.getuid():
52  continue
53 
54  uid = os.stat(os.path.join('/proc',pid)).st_uid
55  fd_dir=os.path.join('/proc', pid, 'fd')
56  if os.stat(fd_dir).st_uid != os.getuid():
57  continue
58 
59  for f in os.listdir(fd_dir):
60  fdName = os.path.join(fd_dir, f)
61  if os.path.islink(fdName) :
62  link=os.readlink(fdName)
63  if link == fName:
64  return True
65  except:
66  continue
67 
68  return False
69 
70 def convert(infile, ofile):
71  cmd = EXEDIR + '/convert.sh ' + infile + ' ' +ofile
72  os.system(cmd)
73 
74 def uploadFile(fName, subsystem, run):
75  hname = os.getenv("HOSTNAME")
76  seed=hname.replace("-","t")[-6:]
77  finalTMPfile="%s/DQM_V0001_%s_R%s.root.%s" % (TMP_DROPBOX,subsystem,run,seed)
78  if os.path.exists(finalTMPfile):
79  os.remove(finalTMPfile)
80 
81  md5Digest=md5(file(fName).read())
82  originStr="md5:%s %d %s" % (md5Digest.hexdigest(),os.stat(fName).st_size,fName)
83  originTMPFile="%s.origin" % finalTMPfile
84  originFile=open(originTMPFile,"w")
85  originFile.write(originStr)
86  originFile.close()
87  shutil.copy(fName,finalTMPfile)
88  if not os.path.exists(finalTMPfile) or not os.stat(finalTMPfile).st_size == os.stat(fName).st_size:
89  return False
90 
91  version=1
92  lFile=open("%s/lock" % TMP_DROPBOX ,"a")
93  lockf(lFile,LOCK_EX)
94  for vdir,vsubdir,vfiles in os.walk(DROPBOX):
95  if 'DQM_V0001_%s_R%s.root' % (subsystem,run) not in vfiles:
96  continue
97 
98  version += 1
99 
100  if not os.path.exists("%s/%04d" % (DROPBOX,version)):
101  os.makedirs("%s/%04d" % (DROPBOX,version))
102  os.chmod("%s/%04d" % (DROPBOX,version),2775)
103 
104  finalfile="%s/%04d/DQM_V0001_%s_R%s.root" % (DROPBOX,version,subsystem,run)
105  originFileName="%s.origin" % finalfile
106  try:
107  os.rename(finalTMPfile,finalfile)
108  os.rename(originTMPFile,originFileName)
109  os.chmod(finalfile,stat.S_IREAD|stat.S_IRGRP|stat.S_IROTH| stat.S_IWRITE|stat.S_IWGRP|stat.S_IWOTH)
110  os.chmod(originFileName,stat.S_IREAD|stat.S_IRGRP|stat.S_IROTH| stat.S_IWRITE|stat.S_IWGRP|stat.S_IWOTH)
111  except:
112  lockf(lFile,LOCK_UN)
113  lFile.close()
114  logme("ERROR: File %s upload failed to the DROPBOX" % fName)
115  return False
116 
117  logme("INFO: File %s has been successfully sent to the DROPBOX" % fName)
118  lockf(lFile,LOCK_UN)
119  lFile.close()
120  return True
121 
122 def processSiStrip(fName,finalTfile):
123  dqmfile = fName
124  if "Playback" in fName and "SiStrip" == NEW[rFile]["subSystem"]:
125  dqmfile = fName.replace('Playback','DQM')
126  convert(fName,dqmfile)
127  if not os.path.exists(dqmfile):
128  logme("ERROR: Problem converting %s skiping" % Tfile)
129  shutil.move(fName,finalTfile+"_d")
130  return (dqmfile,False)
131 
132  os.rename(fName,finalTfile.replace('Playback','Playback_full'))
133 
134  return (dqmfile,True)
135 
136 ####### ENDLESS LOOP WITH SLEEP
137 NEW = {}
138 LAST_SEEN_RUN = "0"
139 LAST_FILE_UPLOADED = time.time()
140 if not os.path.exists(TMP_DROPBOX):
141  os.makedirs(TMP_DROPBOX)
142 
143 while True:
144  #Check if you need to stop.
145  if os.path.exists(STOP_FILE):
146  logme("INFO: Stop file found, quitting")
147  sys.exit(0)
148 
149  #clean up tagfiele_runend files, this should be removed as it use is deprecated
150  TAGS=sorted(glob('%s/tagfile_runend_*' % COLLECTING_DIR ),reverse=True)
151  for tag in TAGS:
152  os.remove(tag)
153 
154  for dir, subdirs, files in os.walk(COLLECTING_DIR):
155  for f in files:
156  fMatch=re.match('^(DQM|Playback)_V[0-9]{4}_(?P<subsys>.*)_R(?P<runnr>[0-9]{9})\.root$',f)
157  if fMatch:
158  runnr = fMatch.group("runnr")
159  subsystem=fMatch.group("subsys")
160  f = "%s/%s" % (dir, f)
161  NEW.setdefault(f, {"runNumber":runnr,
162  "subSystem":subsystem,
163  "Processed":False,
164  "TFiles":[]})
165  if int(runnr) > int(LAST_SEEN_RUN):
166  LAST_SEEN_RUN = runnr
167 
168  for rFile in NEW.keys():
169  if len(NEW[rFile]["TFiles"]):
170  continue
171 
172  # Add respective T files just in case the final root file is damage
173  for dir, subdirs, files in os.walk(COLLECTING_DIR):
174  for f in files:
175  runnr = NEW[rFile]["runNumber"]
176  subsystem=NEW[rFile]["subSystem"]
177  fMatch=re.match('^(DQM|Playback)_V[0-9]{4}_%s_R%s_T[0-9]{8}.root$' % (
178  subsystem, runnr),f)
179  if fMatch:
180  f = "%s/%s" % (dir, f)
181  NEW[rFile]["TFiles"].append(f)
182 
183  NEW[rFile]["TFiles"].sort(reverse=True)
184 
185  #Process files
186  for rFile in NEW.keys():
187  if isFileOpen(rFile):
188  logme("INFO: File %s is open", rFile)
189  continue
190 
191  transferred = False
192  run = NEW[rFile]["runNumber"]
193  subsystem = NEW[rFile]["subSystem"]
194  finalTdir="%s/%s/%s" % (T_FILE_DONE_DIR,run[0:3],run[3:6])
195  if not os.path.exists(finalTdir):
196  os.makedirs(finalTdir)
197 
198  if not filecheck(rFile):
199  os.rename(rFile,"%s/%s_d" % (finalTdir, os.path.basename(rFile)))
200  for Tfile in NEW[rFile]["TFiles"]:
201  finalTfile="%s/%s" % (finalTdir,os.path.basename(Tfile))
202  if transferred:
203  break
204 
205  if not filecheck(Tfile):
206  if os.path.exists(Tfile):
207  shutil.move(Tfile,finalTfile+"_d")
208  continue
209 
210  fToUpload, converted = processSiStrip(Tfile, finalTfile)
211  if not converted:
212  continue
213 
214  for i in range(RETRIES):
215  if uploadFile(fToUpload, subsystem, run):
216  NEW[rFile]["Processed"] = transferred = True
217  LAST_FILE_UPLOADED = time.time()
218  os.rename(fToUpload, "%s/%s" % (finalTdir, os.path.basename(fToUpload)))
219  break
220 
221  NEW[rFile]['Processed'] = True
222  continue
223 
224  finalTfile="%s/%s" % (finalTdir,os.path.basename(rFile))
225  fToUpload, converted = processSiStrip(rFile, finalTfile)
226  if not converted:
227  continue
228 
229  for i in range(RETRIES):
230  if uploadFile(fToUpload, subsystem, run):
231  NEW[rFile]["Processed"] = transferred = True
232  LAST_FILE_UPLOADED = time.time()
233  os.rename(fToUpload, "%s/%s" % (finalTdir, os.path.basename(fToUpload)))
234  break
235 
236  #Clean up COLLECTING_DIR
237  for rFile in NEW.keys():
238  if not NEW[rFile]["Processed"]:
239  continue
240 
241  run = NEW[rFile]["runNumber"]
242  subsystem = NEW[rFile]["subSystem"]
243  finalTdir="%s/%s/%s" % (T_FILE_DONE_DIR,run[0:3],run[3:6])
244  for Tfile in NEW[rFile]["TFiles"]:
245  if os.path.exists(Tfile):
246  finalTfile="%s/%s_d" % (finalTdir,os.path.basename(Tfile))
247  os.rename(Tfile,finalTfile)
248 
249  #Enforce KEEPS
250  fList = sorted(glob("%s/*_%s_R%s*_d" % (finalTdir,subsystem, run)),cmp=lambda x,y: "_T" not in x and 1 or ("_T" in y and ( -1 * cmp(x,y))))
251  for f in fList[::-1]:
252  if len(fList) > KEEP:
253  fList.remove(f)
254  os.remove(f)
255 
256  #Determine if the run has been fully processed.
257  for rFile in NEW.keys():
258  if NEW[rFile]['Processed']:
259  del NEW[rFile]
260 
261  #Find and process orphan _T files.
262  if LAST_FILE_UPLOADED < time.time() - WAIT_TIME_FILE_PT:
263  for dir, subdirs, files in os.walk(COLLECTING_DIR):
264  for f in files:
265  fMatch=re.match('^(DQM|Playback)_V[0-9]{4}_(?P<subsys>.*)_R(?P<runnr>[0-9]{9})_T[0-9]{8}\.root$',f)
266  if not fMatch:
267  continue
268 
269  runnr = fMatch.group("runnr")
270  subsystem=fMatch.group("subsys")
271  if runnr > LAST_SEEN_RUN:
272  continue
273 
274  tmpFName = "%s/%s.root" % (dir,f.rsplit("_",1)[0])
275  if os.path.exists(tmpFName):
276  continue
277 
278  finalTdir = "%s/%s/%s" % (T_FILE_DONE_DIR,runnr[0:3],runnr[3:6])
279  fList = sorted(glob("%s/*_%s_R%s*" % (finalTdir,subsystem, runnr)),
280  cmp=lambda x,y: cmp(os.stat(x).st_mtime,os.stat(y).st_mtime))
281  fName = "%s/%s" % (dir,f)
282  if len(fList) and os.stat(fList[-1]).st_mtime > os.stat(fName).st_mtime:
283  os.remove(fName)
284  continue
285 
286  logme("INFO: Creating dummy file %s to pick up Orphan _T files", tmpFName)
287  tmpF = open(tmpFName,"w+")
288  tmpF.close()
289  del tmpF
290 
291  time.sleep(COLLECTOR_WAIT_TIME)
def processSiStrip(fName, finalTfile)
def logme(msg, args)
def filecheck(rootfile)
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
def isFileOpen(fName)
def uploadFile(fName, subsystem, run)
def convert(infile, ofile)
double split
Definition: MVATrainer.cc:139