3 from __future__
import print_function
4 from builtins
import range
5 import os,time,sys,zipfile,re,shutil,stat
6 from fcntl
import lockf, LOCK_EX, LOCK_UN
7 from hashlib
import md5
9 from datetime
import datetime
11 COLLECTING_DIR = sys.argv[1]
12 T_FILE_DONE_DIR = sys.argv[2]
15 EXEDIR = os.path.dirname(__file__)
16 COLLECTOR_WAIT_TIME = 10
17 WAIT_TIME_FILE_PT = 60 * 15
18 TMP_DROPBOX = os.path.join(DROPBOX,
".uploading")
21 STOP_FILE =
"%s/.stop" % EXEDIR
22 sys.stdout = os.fdopen(sys.stdout.fileno(),
'w', 0)
23 os.environ[
"WorkDir"] = EXEDIR
26 procid =
"[%s/%d]" % (__file__.rsplit(
"/", 1)[-1], os.getpid())
27 print(datetime.now(), procid, msg % args)
30 cmd = EXEDIR +
'/filechk.sh ' + rootfile
33 if tag ==
'(int)(-1)':
34 logme(
"ERROR: File %s corrupted (isZombi)", rootfile)
37 logme(
"ERROR: File %s is incomplete", rootfile)
45 fName = os.path.realpath(fName)
46 pids=os.listdir(
'/proc')
47 for pid
in sorted(pids):
52 if os.stat(os.path.join(
'/proc',pid)).st_uid != os.getuid():
55 uid = os.stat(os.path.join(
'/proc',pid)).st_uid
56 fd_dir=os.path.join(
'/proc', pid,
'fd')
57 if os.stat(fd_dir).st_uid != os.getuid():
60 for f
in os.listdir(fd_dir):
61 fdName = os.path.join(fd_dir, f)
62 if os.path.islink(fdName) :
63 link=os.readlink(fdName)
72 cmd = EXEDIR +
'/convert.sh ' + infile +
' ' +ofile
76 hname = os.getenv(
"HOSTNAME")
77 seed=hname.replace(
"-",
"t")[-6:]
78 finalTMPfile=
"%s/DQM_V0001_%s_R%s.root.%s" % (TMP_DROPBOX,subsystem,run,seed)
79 if os.path.exists(finalTMPfile):
80 os.remove(finalTMPfile)
83 originStr=
"md5:%s %d %s" % (md5Digest.hexdigest(),os.stat(fName).st_size,fName)
84 originTMPFile=
"%s.origin" % finalTMPfile
85 originFile=open(originTMPFile,
"w")
86 originFile.write(originStr)
88 shutil.copy(fName,finalTMPfile)
89 if not os.path.exists(finalTMPfile)
or not os.stat(finalTMPfile).st_size == os.stat(fName).st_size:
93 lFile=open(
"%s/lock" % TMP_DROPBOX ,
"a")
95 for vdir,vsubdir,vfiles
in os.walk(DROPBOX):
96 if 'DQM_V0001_%s_R%s.root' % (subsystem,run)
not in vfiles:
101 if not os.path.exists(
"%s/%04d" % (DROPBOX,version)):
102 os.makedirs(
"%s/%04d" % (DROPBOX,version))
103 os.chmod(
"%s/%04d" % (DROPBOX,version),2775)
105 finalfile=
"%s/%04d/DQM_V0001_%s_R%s.root" % (DROPBOX,version,subsystem,run)
106 originFileName=
"%s.origin" % finalfile
108 os.rename(finalTMPfile,finalfile)
109 os.rename(originTMPFile,originFileName)
110 os.chmod(finalfile,stat.S_IREAD|stat.S_IRGRP|stat.S_IROTH| stat.S_IWRITE|stat.S_IWGRP|stat.S_IWOTH)
111 os.chmod(originFileName,stat.S_IREAD|stat.S_IRGRP|stat.S_IROTH| stat.S_IWRITE|stat.S_IWGRP|stat.S_IWOTH)
115 logme(
"ERROR: File %s upload failed to the DROPBOX" % fName)
118 logme(
"INFO: File %s has been successfully sent to the DROPBOX" % fName)
125 if "Playback" in fName
and "SiStrip" == NEW[rFile][
"subSystem"]:
126 dqmfile = fName.replace(
'Playback',
'DQM')
128 if not os.path.exists(dqmfile):
129 logme(
"ERROR: Problem converting %s skiping" % Tfile)
130 shutil.move(fName,finalTfile+
"_d")
131 return (dqmfile,
False)
133 os.rename(fName,finalTfile.replace(
'Playback',
'Playback_full'))
135 return (dqmfile,
True)
140 LAST_FILE_UPLOADED = time.time()
141 if not os.path.exists(TMP_DROPBOX):
142 os.makedirs(TMP_DROPBOX)
146 if os.path.exists(STOP_FILE):
147 logme(
"INFO: Stop file found, quitting")
151 TAGS=sorted(glob(
'%s/tagfile_runend_*' % COLLECTING_DIR ),reverse=
True)
155 for dir, subdirs, files
in os.walk(COLLECTING_DIR):
157 fMatch=re.match(
'^(DQM|Playback)_V[0-9]{4}_(?P<subsys>.*)_R(?P<runnr>[0-9]{9})\.root$',f)
159 runnr = fMatch.group(
"runnr")
160 subsystem=fMatch.group(
"subsys")
161 f =
"%s/%s" % (dir, f)
162 NEW.setdefault(f, {
"runNumber":runnr,
163 "subSystem":subsystem,
166 if int(runnr) >
int(LAST_SEEN_RUN):
167 LAST_SEEN_RUN = runnr
169 for rFile
in NEW.keys():
170 if len(NEW[rFile][
"TFiles"]):
174 for dir, subdirs, files
in os.walk(COLLECTING_DIR):
176 runnr = NEW[rFile][
"runNumber"]
177 subsystem=NEW[rFile][
"subSystem"]
178 fMatch=re.match(
'^(DQM|Playback)_V[0-9]{4}_%s_R%s_T[0-9]{8}.root$' % (
181 f =
"%s/%s" % (dir, f)
182 NEW[rFile][
"TFiles"].
append(f)
184 NEW[rFile][
"TFiles"].
sort(reverse=
True)
187 for rFile
in NEW.keys():
189 logme(
"INFO: File %s is open", rFile)
193 run = NEW[rFile][
"runNumber"]
194 subsystem = NEW[rFile][
"subSystem"]
195 finalTdir=
"%s/%s/%s" % (T_FILE_DONE_DIR,run[0:3],run[3:6])
196 if not os.path.exists(finalTdir):
197 os.makedirs(finalTdir)
200 os.rename(rFile,
"%s/%s_d" % (finalTdir, os.path.basename(rFile)))
201 for Tfile
in NEW[rFile][
"TFiles"]:
202 finalTfile=
"%s/%s" % (finalTdir,os.path.basename(Tfile))
207 if os.path.exists(Tfile):
208 shutil.move(Tfile,finalTfile+
"_d")
215 for i
in range(RETRIES):
217 NEW[rFile][
"Processed"] = transferred =
True
218 LAST_FILE_UPLOADED = time.time()
219 os.rename(fToUpload,
"%s/%s" % (finalTdir, os.path.basename(fToUpload)))
222 NEW[rFile][
'Processed'] =
True
225 finalTfile=
"%s/%s" % (finalTdir,os.path.basename(rFile))
230 for i
in range(RETRIES):
232 NEW[rFile][
"Processed"] = transferred =
True
233 LAST_FILE_UPLOADED = time.time()
234 os.rename(fToUpload,
"%s/%s" % (finalTdir, os.path.basename(fToUpload)))
238 for rFile
in NEW.keys():
239 if not NEW[rFile][
"Processed"]:
242 run = NEW[rFile][
"runNumber"]
243 subsystem = NEW[rFile][
"subSystem"]
244 finalTdir=
"%s/%s/%s" % (T_FILE_DONE_DIR,run[0:3],run[3:6])
245 for Tfile
in NEW[rFile][
"TFiles"]:
246 if os.path.exists(Tfile):
247 finalTfile=
"%s/%s_d" % (finalTdir,os.path.basename(Tfile))
248 os.rename(Tfile,finalTfile)
251 fList = sorted(glob(
"%s/*_%s_R%s*_d" % (finalTdir,subsystem, run)),cmp=
lambda x,y:
"_T" not in x
and 1
or (
"_T" in y
and ( -1 *
cmp(x,y))))
252 for f
in fList[::-1]:
253 if len(fList) > KEEP:
258 for rFile
in NEW.keys():
259 if NEW[rFile][
'Processed']:
263 if LAST_FILE_UPLOADED < time.time() - WAIT_TIME_FILE_PT:
264 for dir, subdirs, files
in os.walk(COLLECTING_DIR):
266 fMatch=re.match(
'^(DQM|Playback)_V[0-9]{4}_(?P<subsys>.*)_R(?P<runnr>[0-9]{9})_T[0-9]{8}\.root$',f)
270 runnr = fMatch.group(
"runnr")
271 subsystem=fMatch.group(
"subsys")
272 if runnr > LAST_SEEN_RUN:
275 tmpFName =
"%s/%s.root" % (dir,f.rsplit(
"_",1)[0])
276 if os.path.exists(tmpFName):
279 finalTdir =
"%s/%s/%s" % (T_FILE_DONE_DIR,runnr[0:3],runnr[3:6])
280 fList = sorted(glob(
"%s/*_%s_R%s*" % (finalTdir,subsystem, runnr)),
281 cmp=
lambda x,y:
cmp(os.stat(x).st_mtime,os.stat(y).st_mtime))
282 fName =
"%s/%s" % (dir,f)
283 if len(fList)
and os.stat(fList[-1]).st_mtime > os.stat(fName).st_mtime:
287 logme(
"INFO: Creating dummy file %s to pick up Orphan _T files", tmpFName)
288 tmpF = open(tmpFName,
"w+")
292 time.sleep(COLLECTOR_WAIT_TIME)