CMS 3D CMS Logo

fileCollector2.py
Go to the documentation of this file.
1 #! /usr/bin/env python
2 
3 from __future__ import print_function
4 from builtins import range
5 import os,time,sys,zipfile,re,shutil,stat
6 from fcntl import lockf, LOCK_EX, LOCK_UN
7 from hashlib import md5
8 from glob import glob
9 from datetime import datetime
10 
11 COLLECTING_DIR = sys.argv[1] #Directory where to look for root files
12 T_FILE_DONE_DIR = sys.argv[2] #Directory where to place processed root files
13 DROPBOX = sys.argv[3] #Directory where the collected files are sent.
14 
15 EXEDIR = os.path.dirname(__file__)
16 COLLECTOR_WAIT_TIME = 10 # time between collector cilces
17 WAIT_TIME_FILE_PT = 60 * 15 # time to wait to pick up lost files
18 TMP_DROPBOX = os.path.join(DROPBOX,".uploading")
19 KEEP = 2 # number of _d files to keep
20 RETRIES = 3 # number of retries to sen a file
21 STOP_FILE = "%s/.stop" % EXEDIR
22 sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
23 os.environ["WorkDir"] = EXEDIR
24 
25 def logme(msg, *args):
26  procid = "[%s/%d]" % (__file__.rsplit("/", 1)[-1], os.getpid())
27  print(datetime.now(), procid, msg % args)
28 
29 def filecheck(rootfile):
30  cmd = EXEDIR + '/filechk.sh ' + rootfile
31  a = os.popen(cmd).read().split()
32  tag=a.pop()
33  if tag == '(int)(-1)':
34  logme("ERROR: File %s corrupted (isZombi)", rootfile)
35  return False
36  elif tag == '(int)0':
37  logme("ERROR: File %s is incomplete", rootfile)
38  return False
39  elif tag == '(int)1':
40  return True
41  else:
42  return False
43 
44 def isFileOpen(fName):
45  fName = os.path.realpath(fName)
46  pids=os.listdir('/proc')
47  for pid in sorted(pids):
48  try:
49  if not pid.isdigit():
50  continue
51 
52  if os.stat(os.path.join('/proc',pid)).st_uid != os.getuid():
53  continue
54 
55  uid = os.stat(os.path.join('/proc',pid)).st_uid
56  fd_dir=os.path.join('/proc', pid, 'fd')
57  if os.stat(fd_dir).st_uid != os.getuid():
58  continue
59 
60  for f in os.listdir(fd_dir):
61  fdName = os.path.join(fd_dir, f)
62  if os.path.islink(fdName) :
63  link=os.readlink(fdName)
64  if link == fName:
65  return True
66  except:
67  continue
68 
69  return False
70 
71 def convert(infile, ofile):
72  cmd = EXEDIR + '/convert.sh ' + infile + ' ' +ofile
73  os.system(cmd)
74 
75 def uploadFile(fName, subsystem, run):
76  hname = os.getenv("HOSTNAME")
77  seed=hname.replace("-","t")[-6:]
78  finalTMPfile="%s/DQM_V0001_%s_R%s.root.%s" % (TMP_DROPBOX,subsystem,run,seed)
79  if os.path.exists(finalTMPfile):
80  os.remove(finalTMPfile)
81 
82  md5Digest=md5(file(fName).read())
83  originStr="md5:%s %d %s" % (md5Digest.hexdigest(),os.stat(fName).st_size,fName)
84  originTMPFile="%s.origin" % finalTMPfile
85  originFile=open(originTMPFile,"w")
86  originFile.write(originStr)
87  originFile.close()
88  shutil.copy(fName,finalTMPfile)
89  if not os.path.exists(finalTMPfile) or not os.stat(finalTMPfile).st_size == os.stat(fName).st_size:
90  return False
91 
92  version=1
93  lFile=open("%s/lock" % TMP_DROPBOX ,"a")
94  lockf(lFile,LOCK_EX)
95  for vdir,vsubdir,vfiles in os.walk(DROPBOX):
96  if 'DQM_V0001_%s_R%s.root' % (subsystem,run) not in vfiles:
97  continue
98 
99  version += 1
100 
101  if not os.path.exists("%s/%04d" % (DROPBOX,version)):
102  os.makedirs("%s/%04d" % (DROPBOX,version))
103  os.chmod("%s/%04d" % (DROPBOX,version),2775)
104 
105  finalfile="%s/%04d/DQM_V0001_%s_R%s.root" % (DROPBOX,version,subsystem,run)
106  originFileName="%s.origin" % finalfile
107  try:
108  os.rename(finalTMPfile,finalfile)
109  os.rename(originTMPFile,originFileName)
110  os.chmod(finalfile,stat.S_IREAD|stat.S_IRGRP|stat.S_IROTH| stat.S_IWRITE|stat.S_IWGRP|stat.S_IWOTH)
111  os.chmod(originFileName,stat.S_IREAD|stat.S_IRGRP|stat.S_IROTH| stat.S_IWRITE|stat.S_IWGRP|stat.S_IWOTH)
112  except:
113  lockf(lFile,LOCK_UN)
114  lFile.close()
115  logme("ERROR: File %s upload failed to the DROPBOX" % fName)
116  return False
117 
118  logme("INFO: File %s has been successfully sent to the DROPBOX" % fName)
119  lockf(lFile,LOCK_UN)
120  lFile.close()
121  return True
122 
123 def processSiStrip(fName,finalTfile):
124  dqmfile = fName
125  if "Playback" in fName and "SiStrip" == NEW[rFile]["subSystem"]:
126  dqmfile = fName.replace('Playback','DQM')
127  convert(fName,dqmfile)
128  if not os.path.exists(dqmfile):
129  logme("ERROR: Problem converting %s skiping" % Tfile)
130  shutil.move(fName,finalTfile+"_d")
131  return (dqmfile,False)
132 
133  os.rename(fName,finalTfile.replace('Playback','Playback_full'))
134 
135  return (dqmfile,True)
136 
137 
138 NEW = {}
139 LAST_SEEN_RUN = "0"
140 LAST_FILE_UPLOADED = time.time()
141 if not os.path.exists(TMP_DROPBOX):
142  os.makedirs(TMP_DROPBOX)
143 
144 while True:
145  #Check if you need to stop.
146  if os.path.exists(STOP_FILE):
147  logme("INFO: Stop file found, quitting")
148  sys.exit(0)
149 
150  #clean up tagfiele_runend files, this should be removed as it use is deprecated
151  TAGS=sorted(glob('%s/tagfile_runend_*' % COLLECTING_DIR ),reverse=True)
152  for tag in TAGS:
153  os.remove(tag)
154 
155  for dir, subdirs, files in os.walk(COLLECTING_DIR):
156  for f in files:
157  fMatch=re.match('^(DQM|Playback)_V[0-9]{4}_(?P<subsys>.*)_R(?P<runnr>[0-9]{9})\.root$',f)
158  if fMatch:
159  runnr = fMatch.group("runnr")
160  subsystem=fMatch.group("subsys")
161  f = "%s/%s" % (dir, f)
162  NEW.setdefault(f, {"runNumber":runnr,
163  "subSystem":subsystem,
164  "Processed":False,
165  "TFiles":[]})
166  if int(runnr) > int(LAST_SEEN_RUN):
167  LAST_SEEN_RUN = runnr
168 
169  for rFile in NEW.keys():
170  if len(NEW[rFile]["TFiles"]):
171  continue
172 
173  # Add respective T files just in case the final root file is damage
174  for dir, subdirs, files in os.walk(COLLECTING_DIR):
175  for f in files:
176  runnr = NEW[rFile]["runNumber"]
177  subsystem=NEW[rFile]["subSystem"]
178  fMatch=re.match('^(DQM|Playback)_V[0-9]{4}_%s_R%s_T[0-9]{8}.root$' % (
179  subsystem, runnr),f)
180  if fMatch:
181  f = "%s/%s" % (dir, f)
182  NEW[rFile]["TFiles"].append(f)
183 
184  NEW[rFile]["TFiles"].sort(reverse=True)
185 
186  #Process files
187  for rFile in NEW.keys():
188  if isFileOpen(rFile):
189  logme("INFO: File %s is open", rFile)
190  continue
191 
192  transferred = False
193  run = NEW[rFile]["runNumber"]
194  subsystem = NEW[rFile]["subSystem"]
195  finalTdir="%s/%s/%s" % (T_FILE_DONE_DIR,run[0:3],run[3:6])
196  if not os.path.exists(finalTdir):
197  os.makedirs(finalTdir)
198 
199  if not filecheck(rFile):
200  os.rename(rFile,"%s/%s_d" % (finalTdir, os.path.basename(rFile)))
201  for Tfile in NEW[rFile]["TFiles"]:
202  finalTfile="%s/%s" % (finalTdir,os.path.basename(Tfile))
203  if transferred:
204  break
205 
206  if not filecheck(Tfile):
207  if os.path.exists(Tfile):
208  shutil.move(Tfile,finalTfile+"_d")
209  continue
210 
211  fToUpload, converted = processSiStrip(Tfile, finalTfile)
212  if not converted:
213  continue
214 
215  for i in range(RETRIES):
216  if uploadFile(fToUpload, subsystem, run):
217  NEW[rFile]["Processed"] = transferred = True
218  LAST_FILE_UPLOADED = time.time()
219  os.rename(fToUpload, "%s/%s" % (finalTdir, os.path.basename(fToUpload)))
220  break
221 
222  NEW[rFile]['Processed'] = True
223  continue
224 
225  finalTfile="%s/%s" % (finalTdir,os.path.basename(rFile))
226  fToUpload, converted = processSiStrip(rFile, finalTfile)
227  if not converted:
228  continue
229 
230  for i in range(RETRIES):
231  if uploadFile(fToUpload, subsystem, run):
232  NEW[rFile]["Processed"] = transferred = True
233  LAST_FILE_UPLOADED = time.time()
234  os.rename(fToUpload, "%s/%s" % (finalTdir, os.path.basename(fToUpload)))
235  break
236 
237  #Clean up COLLECTING_DIR
238  for rFile in NEW.keys():
239  if not NEW[rFile]["Processed"]:
240  continue
241 
242  run = NEW[rFile]["runNumber"]
243  subsystem = NEW[rFile]["subSystem"]
244  finalTdir="%s/%s/%s" % (T_FILE_DONE_DIR,run[0:3],run[3:6])
245  for Tfile in NEW[rFile]["TFiles"]:
246  if os.path.exists(Tfile):
247  finalTfile="%s/%s_d" % (finalTdir,os.path.basename(Tfile))
248  os.rename(Tfile,finalTfile)
249 
250  #Enforce KEEPS
251  fList = sorted(glob("%s/*_%s_R%s*_d" % (finalTdir,subsystem, run)),cmp=lambda x,y: "_T" not in x and 1 or ("_T" in y and ( -1 * cmp(x,y))))
252  for f in fList[::-1]:
253  if len(fList) > KEEP:
254  fList.remove(f)
255  os.remove(f)
256 
257  #Determine if the run has been fully processed.
258  for rFile in NEW.keys():
259  if NEW[rFile]['Processed']:
260  del NEW[rFile]
261 
262  #Find and process orphan _T files.
263  if LAST_FILE_UPLOADED < time.time() - WAIT_TIME_FILE_PT:
264  for dir, subdirs, files in os.walk(COLLECTING_DIR):
265  for f in files:
266  fMatch=re.match('^(DQM|Playback)_V[0-9]{4}_(?P<subsys>.*)_R(?P<runnr>[0-9]{9})_T[0-9]{8}\.root$',f)
267  if not fMatch:
268  continue
269 
270  runnr = fMatch.group("runnr")
271  subsystem=fMatch.group("subsys")
272  if runnr > LAST_SEEN_RUN:
273  continue
274 
275  tmpFName = "%s/%s.root" % (dir,f.rsplit("_",1)[0])
276  if os.path.exists(tmpFName):
277  continue
278 
279  finalTdir = "%s/%s/%s" % (T_FILE_DONE_DIR,runnr[0:3],runnr[3:6])
280  fList = sorted(glob("%s/*_%s_R%s*" % (finalTdir,subsystem, runnr)),
281  cmp=lambda x,y: cmp(os.stat(x).st_mtime,os.stat(y).st_mtime))
282  fName = "%s/%s" % (dir,f)
283  if len(fList) and os.stat(fList[-1]).st_mtime > os.stat(fName).st_mtime:
284  os.remove(fName)
285  continue
286 
287  logme("INFO: Creating dummy file %s to pick up Orphan _T files", tmpFName)
288  tmpF = open(tmpFName,"w+")
289  tmpF.close()
290  del tmpF
291 
292  time.sleep(COLLECTOR_WAIT_TIME)
FastTimerService_cff.range
range
Definition: FastTimerService_cff.py:34
fileCollector2.convert
def convert(infile, ofile)
Definition: fileCollector2.py:71
fileCollector2.logme
def logme(msg, *args)
Definition: fileCollector2.py:25
fileCollector2.filecheck
def filecheck(rootfile)
Definition: fileCollector2.py:29
submitPVValidationJobs.split
def split(sequence, size)
Definition: submitPVValidationJobs.py:352
fileCollector2.cmp
cmp
Definition: fileCollector2.py:251
print
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:46
mps_setup.append
append
Definition: mps_setup.py:85
createfilelist.int
int
Definition: createfilelist.py:10
FrontierConditions_GlobalTag_cff.file
file
Definition: FrontierConditions_GlobalTag_cff.py:13
readEcalDQMStatus.read
read
Definition: readEcalDQMStatus.py:38
fileCollector2.processSiStrip
def processSiStrip(fName, finalTfile)
Definition: fileCollector2.py:123
fileCollector2.uploadFile
def uploadFile(fName, subsystem, run)
Definition: fileCollector2.py:75
fileCollector2.isFileOpen
def isFileOpen(fName)
Definition: fileCollector2.py:44