3 import os,time,sys,zipfile,re,shutil,stat
4 from fcntl
import lockf, LOCK_EX, LOCK_UN
5 from hashlib
import md5
7 from datetime
import datetime
9 COLLECTING_DIR = sys.argv[1]
10 T_FILE_DONE_DIR = sys.argv[2]
13 EXEDIR = os.path.dirname(__file__)
14 COLLECTOR_WAIT_TIME = 10
15 WAIT_TIME_FILE_PT = 60 * 15
16 TMP_DROPBOX = os.path.join(DROPBOX,
".uploading")
19 STOP_FILE =
"%s/.stop" % EXEDIR
20 sys.stdout = os.fdopen(sys.stdout.fileno(),
'w', 0)
21 os.environ[
"WorkDir"] = EXEDIR
24 procid =
"[%s/%d]" % (__file__.rsplit(
"/", 1)[-1], os.getpid())
25 print datetime.now(), procid, msg % args
28 cmd = EXEDIR +
'/filechk.sh ' + rootfile
29 a = os.popen(cmd).read().
split()
31 if tag ==
'(int)(-1)':
32 logme(
"ERROR: File %s corrupted (isZombi)", rootfile)
35 logme(
"ERROR: File %s is incomplete", rootfile)
43 fName = os.path.realpath(fName)
44 pids=os.listdir(
'/proc')
45 for pid
in sorted(pids):
50 if os.stat(os.path.join(
'/proc',pid)).st_uid != os.getuid():
53 uid = os.stat(os.path.join(
'/proc',pid)).st_uid
54 fd_dir=os.path.join(
'/proc', pid,
'fd')
55 if os.stat(fd_dir).st_uid != os.getuid():
58 for f
in os.listdir(fd_dir):
59 fdName = os.path.join(fd_dir, f)
60 if os.path.islink(fdName) :
61 link=os.readlink(fdName)
70 cmd = EXEDIR +
'/convert.sh ' + infile +
' ' +ofile
74 hname = os.getenv(
"HOSTNAME")
75 seed=hname.replace(
"-",
"t")[-6:]
76 finalTMPfile=
"%s/DQM_V0001_%s_R%s.root.%s" % (TMP_DROPBOX,subsystem,run,seed)
77 if os.path.exists(finalTMPfile):
78 os.remove(finalTMPfile)
80 md5Digest=md5(
file(fName).read())
81 originStr=
"md5:%s %d %s" % (md5Digest.hexdigest(),os.stat(fName).st_size,fName)
82 originTMPFile=
"%s.origin" % finalTMPfile
83 originFile=open(originTMPFile,
"w")
84 originFile.write(originStr)
86 shutil.copy(fName,finalTMPfile)
87 if not os.path.exists(finalTMPfile)
or not os.stat(finalTMPfile).st_size == os.stat(fName).st_size:
91 lFile=open(
"%s/lock" % TMP_DROPBOX ,
"a")
93 for vdir,vsubdir,vfiles
in os.walk(DROPBOX):
94 if 'DQM_V0001_%s_R%s.root' % (subsystem,run)
not in vfiles:
99 if not os.path.exists(
"%s/%04d" % (DROPBOX,version)):
100 os.makedirs(
"%s/%04d" % (DROPBOX,version))
101 os.chmod(
"%s/%04d" % (DROPBOX,version),2775)
103 finalfile=
"%s/%04d/DQM_V0001_%s_R%s.root" % (DROPBOX,version,subsystem,run)
104 originFileName=
"%s.origin" % finalfile
106 os.rename(finalTMPfile,finalfile)
107 os.rename(originTMPFile,originFileName)
108 os.chmod(finalfile,stat.S_IREAD|stat.S_IRGRP|stat.S_IROTH| stat.S_IWRITE|stat.S_IWGRP|stat.S_IWOTH)
109 os.chmod(originFileName,stat.S_IREAD|stat.S_IRGRP|stat.S_IROTH| stat.S_IWRITE|stat.S_IWGRP|stat.S_IWOTH)
113 logme(
"ERROR: File %s upload failed to the DROPBOX" % fName)
116 logme(
"INFO: File %s has been successfully sent to the DROPBOX" % fName)
123 if "Playback" in fName
and "SiStrip" == NEW[rFile][
"subSystem"]:
124 dqmfile = fName.replace(
'Playback',
'DQM')
126 if not os.path.exists(dqmfile):
127 logme(
"ERROR: Problem converting %s skiping" % Tfile)
128 shutil.move(fName,finalTfile+
"_d")
129 return (dqmfile,
False)
131 os.rename(fName,finalTfile.replace(
'Playback',
'Playback_full'))
133 return (dqmfile,
True)
138 LAST_FILE_UPLOADED = time.time()
139 if not os.path.exists(TMP_DROPBOX):
140 os.makedirs(TMP_DROPBOX)
144 if os.path.exists(STOP_FILE):
145 logme(
"INFO: Stop file found, quitting")
149 TAGS=sorted(glob(
'%s/tagfile_runend_*' % COLLECTING_DIR ),reverse=
True)
153 for dir, subdirs, files
in os.walk(COLLECTING_DIR):
155 fMatch=re.match(
'^(DQM|Playback)_V[0-9]{4}_(?P<subsys>.*)_R(?P<runnr>[0-9]{9})\.root$',f)
157 runnr = fMatch.group(
"runnr")
158 subsystem=fMatch.group(
"subsys")
159 f =
"%s/%s" % (dir, f)
160 NEW.setdefault(f, {
"runNumber":runnr,
161 "subSystem":subsystem,
164 if int(runnr) >
int(LAST_SEEN_RUN):
165 LAST_SEEN_RUN = runnr
167 for rFile
in NEW.keys():
168 if len(NEW[rFile][
"TFiles"]):
172 for dir, subdirs, files
in os.walk(COLLECTING_DIR):
174 runnr = NEW[rFile][
"runNumber"]
175 subsystem=NEW[rFile][
"subSystem"]
176 fMatch=re.match(
'^(DQM|Playback)_V[0-9]{4}_%s_R%s_T[0-9]{8}.root$' % (
179 f =
"%s/%s" % (dir, f)
180 NEW[rFile][
"TFiles"].
append(f)
182 NEW[rFile][
"TFiles"].sort(reverse=
True)
185 for rFile
in NEW.keys():
187 logme(
"INFO: File %s is open", rFile)
191 run = NEW[rFile][
"runNumber"]
192 subsystem = NEW[rFile][
"subSystem"]
193 finalTdir=
"%s/%s/%s" % (T_FILE_DONE_DIR,run[0:3],run[3:6])
194 if not os.path.exists(finalTdir):
195 os.makedirs(finalTdir)
198 os.rename(rFile,
"%s/%s_d" % (finalTdir, os.path.basename(rFile)))
199 for Tfile
in NEW[rFile][
"TFiles"]:
200 finalTfile=
"%s/%s" % (finalTdir,os.path.basename(Tfile))
205 if os.path.exists(Tfile):
206 shutil.move(Tfile,finalTfile+
"_d")
213 for i
in range(RETRIES):
215 NEW[rFile][
"Processed"] = transferred =
True 216 LAST_FILE_UPLOADED = time.time()
217 os.rename(fToUpload,
"%s/%s" % (finalTdir, os.path.basename(fToUpload)))
220 NEW[rFile][
'Processed'] =
True 223 finalTfile=
"%s/%s" % (finalTdir,os.path.basename(rFile))
228 for i
in range(RETRIES):
230 NEW[rFile][
"Processed"] = transferred =
True 231 LAST_FILE_UPLOADED = time.time()
232 os.rename(fToUpload,
"%s/%s" % (finalTdir, os.path.basename(fToUpload)))
236 for rFile
in NEW.keys():
237 if not NEW[rFile][
"Processed"]:
240 run = NEW[rFile][
"runNumber"]
241 subsystem = NEW[rFile][
"subSystem"]
242 finalTdir=
"%s/%s/%s" % (T_FILE_DONE_DIR,run[0:3],run[3:6])
243 for Tfile
in NEW[rFile][
"TFiles"]:
244 if os.path.exists(Tfile):
245 finalTfile=
"%s/%s_d" % (finalTdir,os.path.basename(Tfile))
246 os.rename(Tfile,finalTfile)
249 fList = sorted(glob(
"%s/*_%s_R%s*_d" % (finalTdir,subsystem, run)),cmp=
lambda x,y:
"_T" not in x
and 1
or (
"_T" in y
and ( -1 * cmp(x,y))))
250 for f
in fList[::-1]:
251 if len(fList) > KEEP:
256 for rFile
in NEW.keys():
257 if NEW[rFile][
'Processed']:
261 if LAST_FILE_UPLOADED < time.time() - WAIT_TIME_FILE_PT:
262 for dir, subdirs, files
in os.walk(COLLECTING_DIR):
264 fMatch=re.match(
'^(DQM|Playback)_V[0-9]{4}_(?P<subsys>.*)_R(?P<runnr>[0-9]{9})_T[0-9]{8}\.root$',f)
268 runnr = fMatch.group(
"runnr")
269 subsystem=fMatch.group(
"subsys")
270 if runnr > LAST_SEEN_RUN:
273 tmpFName =
"%s/%s.root" % (dir,f.rsplit(
"_",1)[0])
274 if os.path.exists(tmpFName):
277 finalTdir =
"%s/%s/%s" % (T_FILE_DONE_DIR,runnr[0:3],runnr[3:6])
278 fList = sorted(glob(
"%s/*_%s_R%s*" % (finalTdir,subsystem, runnr)),
279 cmp=
lambda x,y: cmp(os.stat(x).st_mtime,os.stat(y).st_mtime))
280 fName =
"%s/%s" % (dir,f)
281 if len(fList)
and os.stat(fList[-1]).st_mtime > os.stat(fName).st_mtime:
285 logme(
"INFO: Creating dummy file %s to pick up Orphan _T files", tmpFName)
286 tmpF = open(tmpFName,
"w+")
290 time.sleep(COLLECTOR_WAIT_TIME)
def processSiStrip(fName, finalTfile)
def uploadFile(fName, subsystem, run)
def convert(infile, ofile)