CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
fileCollector2.py
Go to the documentation of this file.
1 #! /usr/bin/env python
2 
3 import os,time,sys,zipfile,re,shutil,stat
4 from fcntl import lockf, LOCK_EX, LOCK_UN
5 from hashlib import md5
6 from glob import glob
7 from datetime import datetime
8 
9 COLLECTING_DIR = sys.argv[1] #Directory where to look for root files
10 T_FILE_DONE_DIR = sys.argv[2] #Directory where to place processed root files
11 DROPBOX = sys.argv[3] #Directory where the collected files are sent.
12 
13 EXEDIR = os.path.dirname(__file__)
14 COLLECTOR_WAIT_TIME = 10 # time between collector cilces
15 WAIT_TIME_FILE_PT = 60 * 15 # time to wait to pick up lost files
16 TMP_DROPBOX = os.path.join(DROPBOX,".uploading")
17 KEEP = 2 # number of _d files to keep
18 RETRIES = 3 # number of retries to sen a file
19 STOP_FILE = "%s/.stop" % EXEDIR
20 sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
21 os.environ["WorkDir"] = EXEDIR
22 
23 def logme(msg, *args):
24  procid = "[%s/%d]" % (__file__.rsplit("/", 1)[-1], os.getpid())
25  print datetime.now(), procid, msg % args
26 
27 def filecheck(rootfile):
28  cmd = EXEDIR + '/filechk.sh ' + rootfile
29  a = os.popen(cmd).read().split()
30  tag=a.pop()
31  if tag == '(int)(-1)':
32  logme("ERROR: File %s corrupted (isZombi)", rootfile)
33  return False
34  elif tag == '(int)0':
35  logme("ERROR: File %s is incomplete", rootfile)
36  return False
37  elif tag == '(int)1':
38  return True
39  else:
40  return False
41 
42 def isFileOpen(fName):
43  fName = os.path.realpath(fName)
44  pids=os.listdir('/proc')
45  for pid in sorted(pids):
46  try:
47  if not pid.isdigit():
48  continue
49 
50  if os.stat(os.path.join('/proc',pid)).st_uid != os.getuid():
51  continue
52 
53  uid = os.stat(os.path.join('/proc',pid)).st_uid
54  fd_dir=os.path.join('/proc', pid, 'fd')
55  if os.stat(fd_dir).st_uid != os.getuid():
56  continue
57 
58  for f in os.listdir(fd_dir):
59  fdName = os.path.join(fd_dir, f)
60  if os.path.islink(fdName) :
61  link=os.readlink(fdName)
62  if link == fName:
63  return True
64  except:
65  continue
66 
67  return False
68 
69 def convert(infile, ofile):
70  cmd = EXEDIR + '/convert.sh ' + infile + ' ' +ofile
71  os.system(cmd)
72 
73 def uploadFile(fName, subsystem, run):
74  hname = os.getenv("HOSTNAME")
75  seed=hname.replace("-","t")[-6:]
76  finalTMPfile="%s/DQM_V0001_%s_R%s.root.%s" % (TMP_DROPBOX,subsystem,run,seed)
77  if os.path.exists(finalTMPfile):
78  os.remove(finalTMPfile)
79 
80  md5Digest=md5(file(fName).read())
81  originStr="md5:%s %d %s" % (md5Digest.hexdigest(),os.stat(fName).st_size,fName)
82  originTMPFile="%s.origin" % finalTMPfile
83  originFile=open(originTMPFile,"w")
84  originFile.write(originStr)
85  originFile.close()
86  shutil.copy(fName,finalTMPfile)
87  if not os.path.exists(finalTMPfile) or not os.stat(finalTMPfile).st_size == os.stat(fName).st_size:
88  return False
89 
90  version=1
91  lFile=open("%s/lock" % TMP_DROPBOX ,"a")
92  lockf(lFile,LOCK_EX)
93  for vdir,vsubdir,vfiles in os.walk(DROPBOX):
94  if 'DQM_V0001_%s_R%s.root' % (subsystem,run) not in vfiles:
95  continue
96 
97  version += 1
98 
99  if not os.path.exists("%s/%04d" % (DROPBOX,version)):
100  os.makedirs("%s/%04d" % (DROPBOX,version))
101  os.chmod("%s/%04d" % (DROPBOX,version),2775)
102 
103  finalfile="%s/%04d/DQM_V0001_%s_R%s.root" % (DROPBOX,version,subsystem,run)
104  originFileName="%s.origin" % finalfile
105  try:
106  os.rename(finalTMPfile,finalfile)
107  os.rename(originTMPFile,originFileName)
108  os.chmod(finalfile,stat.S_IREAD|stat.S_IRGRP|stat.S_IROTH| stat.S_IWRITE|stat.S_IWGRP|stat.S_IWOTH)
109  os.chmod(originFileName,stat.S_IREAD|stat.S_IRGRP|stat.S_IROTH| stat.S_IWRITE|stat.S_IWGRP|stat.S_IWOTH)
110  except:
111  lockf(lFile,LOCK_UN)
112  lFile.close()
113  logme("ERROR: File %s upload failed to the DROPBOX" % fName)
114  return False
115 
116  logme("INFO: File %s has been successfully sent to the DROPBOX" % fName)
117  lockf(lFile,LOCK_UN)
118  lFile.close()
119  return True
120 
121 def processSiStrip(fName,finalTfile):
122  dqmfile = fName
123  if "Playback" in fName and "SiStrip" == NEW[rFile]["subSystem"]:
124  dqmfile = fName.replace('Playback','DQM')
125  convert(fName,dqmfile)
126  if not os.path.exists(dqmfile):
127  logme("ERROR: Problem converting %s skiping" % Tfile)
128  shutil.move(fName,finalTfile+"_d")
129  return (dqmfile,False)
130 
131  os.rename(fName,finalTfile.replace('Playback','Playback_full'))
132 
133  return (dqmfile,True)
134 
135 ####### ENDLESS LOOP WITH SLEEP
136 NEW = {}
137 LAST_SEEN_RUN = "0"
138 LAST_FILE_UPLOADED = time.time()
139 if not os.path.exists(TMP_DROPBOX):
140  os.makedirs(TMP_DROPBOX)
141 
142 while True:
143  #Check if you need to stop.
144  if os.path.exists(STOP_FILE):
145  logme("INFO: Stop file found, quitting")
146  sys.exit(0)
147 
148  #clean up tagfiele_runend files, this should be removed as it use is deprecated
149  TAGS=sorted(glob('%s/tagfile_runend_*' % COLLECTING_DIR ),reverse=True)
150  for tag in TAGS:
151  os.remove(tag)
152 
153  for dir, subdirs, files in os.walk(COLLECTING_DIR):
154  for f in files:
155  fMatch=re.match('^(DQM|Playback)_V[0-9]{4}_(?P<subsys>.*)_R(?P<runnr>[0-9]{9})\.root$',f)
156  if fMatch:
157  runnr = fMatch.group("runnr")
158  subsystem=fMatch.group("subsys")
159  f = "%s/%s" % (dir, f)
160  NEW.setdefault(f, {"runNumber":runnr,
161  "subSystem":subsystem,
162  "Processed":False,
163  "TFiles":[]})
164  if int(runnr) > int(LAST_SEEN_RUN):
165  LAST_SEEN_RUN = runnr
166 
167  for rFile in NEW.keys():
168  if len(NEW[rFile]["TFiles"]):
169  continue
170 
171  # Add respective T files just in case the final root file is damage
172  for dir, subdirs, files in os.walk(COLLECTING_DIR):
173  for f in files:
174  runnr = NEW[rFile]["runNumber"]
175  subsystem=NEW[rFile]["subSystem"]
176  fMatch=re.match('^(DQM|Playback)_V[0-9]{4}_%s_R%s_T[0-9]{8}.root$' % (
177  subsystem, runnr),f)
178  if fMatch:
179  f = "%s/%s" % (dir, f)
180  NEW[rFile]["TFiles"].append(f)
181 
182  NEW[rFile]["TFiles"].sort(reverse=True)
183 
184  #Process files
185  for rFile in NEW.keys():
186  if isFileOpen(rFile):
187  logme("INFO: File %s is open", rFile)
188  continue
189 
190  transferred = False
191  run = NEW[rFile]["runNumber"]
192  subsystem = NEW[rFile]["subSystem"]
193  finalTdir="%s/%s/%s" % (T_FILE_DONE_DIR,run[0:3],run[3:6])
194  if not os.path.exists(finalTdir):
195  os.makedirs(finalTdir)
196 
197  if not filecheck(rFile):
198  os.rename(rFile,"%s/%s_d" % (finalTdir, os.path.basename(rFile)))
199  for Tfile in NEW[rFile]["TFiles"]:
200  finalTfile="%s/%s" % (finalTdir,os.path.basename(Tfile))
201  if transferred:
202  break
203 
204  if not filecheck(Tfile):
205  if os.path.exists(Tfile):
206  shutil.move(Tfile,finalTfile+"_d")
207  continue
208 
209  fToUpload, converted = processSiStrip(Tfile, finalTfile)
210  if not converted:
211  continue
212 
213  for i in range(RETRIES):
214  if uploadFile(fToUpload, subsystem, run):
215  NEW[rFile]["Processed"] = transferred = True
216  LAST_FILE_UPLOADED = time.time()
217  os.rename(fToUpload, "%s/%s" % (finalTdir, os.path.basename(fToUpload)))
218  break
219 
220  NEW[rFile]['Processed'] = True
221  continue
222 
223  finalTfile="%s/%s" % (finalTdir,os.path.basename(rFile))
224  fToUpload, converted = processSiStrip(rFile, finalTfile)
225  if not converted:
226  continue
227 
228  for i in range(RETRIES):
229  if uploadFile(fToUpload, subsystem, run):
230  NEW[rFile]["Processed"] = transferred = True
231  LAST_FILE_UPLOADED = time.time()
232  os.rename(fToUpload, "%s/%s" % (finalTdir, os.path.basename(fToUpload)))
233  break
234 
235  #Clean up COLLECTING_DIR
236  for rFile in NEW.keys():
237  if not NEW[rFile]["Processed"]:
238  continue
239 
240  run = NEW[rFile]["runNumber"]
241  subsystem = NEW[rFile]["subSystem"]
242  finalTdir="%s/%s/%s" % (T_FILE_DONE_DIR,run[0:3],run[3:6])
243  for Tfile in NEW[rFile]["TFiles"]:
244  if os.path.exists(Tfile):
245  finalTfile="%s/%s_d" % (finalTdir,os.path.basename(Tfile))
246  os.rename(Tfile,finalTfile)
247 
248  #Enforce KEEPS
249  fList = sorted(glob("%s/*_%s_R%s*_d" % (finalTdir,subsystem, run)),cmp=lambda x,y: "_T" not in x and 1 or ("_T" in y and ( -1 * cmp(x,y))))
250  for f in fList[::-1]:
251  if len(fList) > KEEP:
252  fList.remove(f)
253  os.remove(f)
254 
255  #Determine if the run has been fully processed.
256  for rFile in NEW.keys():
257  if NEW[rFile]['Processed']:
258  del NEW[rFile]
259 
260  #Find and process orphan _T files.
261  if LAST_FILE_UPLOADED < time.time() - WAIT_TIME_FILE_PT:
262  for dir, subdirs, files in os.walk(COLLECTING_DIR):
263  for f in files:
264  fMatch=re.match('^(DQM|Playback)_V[0-9]{4}_(?P<subsys>.*)_R(?P<runnr>[0-9]{9})_T[0-9]{8}\.root$',f)
265  if not fMatch:
266  continue
267 
268  runnr = fMatch.group("runnr")
269  subsystem=fMatch.group("subsys")
270  if runnr > LAST_SEEN_RUN:
271  continue
272 
273  tmpFName = "%s/%s.root" % (dir,f.rsplit("_",1)[0])
274  if os.path.exists(tmpFName):
275  continue
276 
277  finalTdir = "%s/%s/%s" % (T_FILE_DONE_DIR,runnr[0:3],runnr[3:6])
278  fList = sorted(glob("%s/*_%s_R%s*" % (finalTdir,subsystem, runnr)),
279  cmp=lambda x,y: cmp(os.stat(x).st_mtime,os.stat(y).st_mtime))
280  fName = "%s/%s" % (dir,f)
281  if len(fList) and os.stat(fList[-1]).st_mtime > os.stat(fName).st_mtime:
282  os.remove(fName)
283  continue
284 
285  logme("INFO: Creating dummy file %s to pick up Orphan _T files", tmpFName)
286  tmpF = open(tmpFName,"w+")
287  tmpF.close()
288  del tmpF
289 
290  time.sleep(COLLECTOR_WAIT_TIME)
double split
Definition: MVATrainer.cc:139