3 from __future__
import print_function
4 import os,time,sys,zipfile,re,shutil,stat
5 from fcntl
import lockf, LOCK_EX, LOCK_UN
6 from hashlib
import md5
8 from datetime
import datetime
10 COLLECTING_DIR = sys.argv[1]
11 T_FILE_DONE_DIR = sys.argv[2]
14 EXEDIR = os.path.dirname(__file__)
15 COLLECTOR_WAIT_TIME = 10
16 WAIT_TIME_FILE_PT = 60 * 15
17 TMP_DROPBOX = os.path.join(DROPBOX,
".uploading")
20 STOP_FILE =
"%s/.stop" % EXEDIR
21 sys.stdout = os.fdopen(sys.stdout.fileno(),
'w', 0)
22 os.environ[
"WorkDir"] = EXEDIR
25 procid =
"[%s/%d]" % (__file__.rsplit(
"/", 1)[-1], os.getpid())
26 print(datetime.now(), procid, msg % args)
29 cmd = EXEDIR +
'/filechk.sh ' + rootfile
30 a = os.popen(cmd).read().
split()
32 if tag ==
'(int)(-1)':
33 logme(
"ERROR: File %s corrupted (isZombi)", rootfile)
36 logme(
"ERROR: File %s is incomplete", rootfile)
44 fName = os.path.realpath(fName)
45 pids=os.listdir(
'/proc')
46 for pid
in sorted(pids):
51 if os.stat(os.path.join(
'/proc',pid)).st_uid != os.getuid():
54 uid = os.stat(os.path.join(
'/proc',pid)).st_uid
55 fd_dir=os.path.join(
'/proc', pid,
'fd')
56 if os.stat(fd_dir).st_uid != os.getuid():
59 for f
in os.listdir(fd_dir):
60 fdName = os.path.join(fd_dir, f)
61 if os.path.islink(fdName) :
62 link=os.readlink(fdName)
71 cmd = EXEDIR +
'/convert.sh ' + infile +
' ' +ofile
75 hname = os.getenv(
"HOSTNAME")
76 seed=hname.replace(
"-",
"t")[-6:]
77 finalTMPfile=
"%s/DQM_V0001_%s_R%s.root.%s" % (TMP_DROPBOX,subsystem,run,seed)
78 if os.path.exists(finalTMPfile):
79 os.remove(finalTMPfile)
81 md5Digest=md5(
file(fName).read())
82 originStr=
"md5:%s %d %s" % (md5Digest.hexdigest(),os.stat(fName).st_size,fName)
83 originTMPFile=
"%s.origin" % finalTMPfile
84 originFile=open(originTMPFile,
"w")
85 originFile.write(originStr)
87 shutil.copy(fName,finalTMPfile)
88 if not os.path.exists(finalTMPfile)
or not os.stat(finalTMPfile).st_size == os.stat(fName).st_size:
92 lFile=open(
"%s/lock" % TMP_DROPBOX ,
"a")
94 for vdir,vsubdir,vfiles
in os.walk(DROPBOX):
95 if 'DQM_V0001_%s_R%s.root' % (subsystem,run)
not in vfiles:
100 if not os.path.exists(
"%s/%04d" % (DROPBOX,version)):
101 os.makedirs(
"%s/%04d" % (DROPBOX,version))
102 os.chmod(
"%s/%04d" % (DROPBOX,version),2775)
104 finalfile=
"%s/%04d/DQM_V0001_%s_R%s.root" % (DROPBOX,version,subsystem,run)
105 originFileName=
"%s.origin" % finalfile
107 os.rename(finalTMPfile,finalfile)
108 os.rename(originTMPFile,originFileName)
109 os.chmod(finalfile,stat.S_IREAD|stat.S_IRGRP|stat.S_IROTH| stat.S_IWRITE|stat.S_IWGRP|stat.S_IWOTH)
110 os.chmod(originFileName,stat.S_IREAD|stat.S_IRGRP|stat.S_IROTH| stat.S_IWRITE|stat.S_IWGRP|stat.S_IWOTH)
114 logme(
"ERROR: File %s upload failed to the DROPBOX" % fName)
117 logme(
"INFO: File %s has been successfully sent to the DROPBOX" % fName)
124 if "Playback" in fName
and "SiStrip" == NEW[rFile][
"subSystem"]:
125 dqmfile = fName.replace(
'Playback',
'DQM')
127 if not os.path.exists(dqmfile):
128 logme(
"ERROR: Problem converting %s skiping" % Tfile)
129 shutil.move(fName,finalTfile+
"_d")
130 return (dqmfile,
False)
132 os.rename(fName,finalTfile.replace(
'Playback',
'Playback_full'))
134 return (dqmfile,
True)
139 LAST_FILE_UPLOADED = time.time()
140 if not os.path.exists(TMP_DROPBOX):
141 os.makedirs(TMP_DROPBOX)
145 if os.path.exists(STOP_FILE):
146 logme(
"INFO: Stop file found, quitting")
150 TAGS=sorted(glob(
'%s/tagfile_runend_*' % COLLECTING_DIR ),reverse=
True)
154 for dir, subdirs, files
in os.walk(COLLECTING_DIR):
156 fMatch=re.match(
'^(DQM|Playback)_V[0-9]{4}_(?P<subsys>.*)_R(?P<runnr>[0-9]{9})\.root$',f)
158 runnr = fMatch.group(
"runnr")
159 subsystem=fMatch.group(
"subsys")
160 f =
"%s/%s" % (dir, f)
161 NEW.setdefault(f, {
"runNumber":runnr,
162 "subSystem":subsystem,
165 if int(runnr) >
int(LAST_SEEN_RUN):
166 LAST_SEEN_RUN = runnr
168 for rFile
in NEW.keys():
169 if len(NEW[rFile][
"TFiles"]):
173 for dir, subdirs, files
in os.walk(COLLECTING_DIR):
175 runnr = NEW[rFile][
"runNumber"]
176 subsystem=NEW[rFile][
"subSystem"]
177 fMatch=re.match(
'^(DQM|Playback)_V[0-9]{4}_%s_R%s_T[0-9]{8}.root$' % (
180 f =
"%s/%s" % (dir, f)
181 NEW[rFile][
"TFiles"].
append(f)
183 NEW[rFile][
"TFiles"].sort(reverse=
True)
186 for rFile
in NEW.keys():
188 logme(
"INFO: File %s is open", rFile)
192 run = NEW[rFile][
"runNumber"]
193 subsystem = NEW[rFile][
"subSystem"]
194 finalTdir=
"%s/%s/%s" % (T_FILE_DONE_DIR,run[0:3],run[3:6])
195 if not os.path.exists(finalTdir):
196 os.makedirs(finalTdir)
199 os.rename(rFile,
"%s/%s_d" % (finalTdir, os.path.basename(rFile)))
200 for Tfile
in NEW[rFile][
"TFiles"]:
201 finalTfile=
"%s/%s" % (finalTdir,os.path.basename(Tfile))
206 if os.path.exists(Tfile):
207 shutil.move(Tfile,finalTfile+
"_d")
214 for i
in range(RETRIES):
216 NEW[rFile][
"Processed"] = transferred =
True 217 LAST_FILE_UPLOADED = time.time()
218 os.rename(fToUpload,
"%s/%s" % (finalTdir, os.path.basename(fToUpload)))
221 NEW[rFile][
'Processed'] =
True 224 finalTfile=
"%s/%s" % (finalTdir,os.path.basename(rFile))
229 for i
in range(RETRIES):
231 NEW[rFile][
"Processed"] = transferred =
True 232 LAST_FILE_UPLOADED = time.time()
233 os.rename(fToUpload,
"%s/%s" % (finalTdir, os.path.basename(fToUpload)))
237 for rFile
in NEW.keys():
238 if not NEW[rFile][
"Processed"]:
241 run = NEW[rFile][
"runNumber"]
242 subsystem = NEW[rFile][
"subSystem"]
243 finalTdir=
"%s/%s/%s" % (T_FILE_DONE_DIR,run[0:3],run[3:6])
244 for Tfile
in NEW[rFile][
"TFiles"]:
245 if os.path.exists(Tfile):
246 finalTfile=
"%s/%s_d" % (finalTdir,os.path.basename(Tfile))
247 os.rename(Tfile,finalTfile)
250 fList = sorted(glob(
"%s/*_%s_R%s*_d" % (finalTdir,subsystem, run)),cmp=
lambda x,y:
"_T" not in x
and 1
or (
"_T" in y
and ( -1 * cmp(x,y))))
251 for f
in fList[::-1]:
252 if len(fList) > KEEP:
257 for rFile
in NEW.keys():
258 if NEW[rFile][
'Processed']:
262 if LAST_FILE_UPLOADED < time.time() - WAIT_TIME_FILE_PT:
263 for dir, subdirs, files
in os.walk(COLLECTING_DIR):
265 fMatch=re.match(
'^(DQM|Playback)_V[0-9]{4}_(?P<subsys>.*)_R(?P<runnr>[0-9]{9})_T[0-9]{8}\.root$',f)
269 runnr = fMatch.group(
"runnr")
270 subsystem=fMatch.group(
"subsys")
271 if runnr > LAST_SEEN_RUN:
274 tmpFName =
"%s/%s.root" % (dir,f.rsplit(
"_",1)[0])
275 if os.path.exists(tmpFName):
278 finalTdir =
"%s/%s/%s" % (T_FILE_DONE_DIR,runnr[0:3],runnr[3:6])
279 fList = sorted(glob(
"%s/*_%s_R%s*" % (finalTdir,subsystem, runnr)),
280 cmp=
lambda x,y: cmp(os.stat(x).st_mtime,os.stat(y).st_mtime))
281 fName =
"%s/%s" % (dir,f)
282 if len(fList)
and os.stat(fList[-1]).st_mtime > os.stat(fName).st_mtime:
286 logme(
"INFO: Creating dummy file %s to pick up Orphan _T files", tmpFName)
287 tmpF = open(tmpFName,
"w+")
291 time.sleep(COLLECTOR_WAIT_TIME)
def processSiStrip(fName, finalTfile)
S & print(S &os, JobReport::InputFile const &f)
def uploadFile(fName, subsystem, run)
def convert(infile, ofile)