3 Classes to check that a set of ROOT files are OK and publish a report
5 from __future__
import print_function
6 from __future__
import absolute_import
8 from builtins
import range
9 import datetime, fnmatch, json, os, shutil, sys, tempfile, time
12 from .
import eostools
as castortools
13 from .timeout
import timed_out, TimedOutExc
14 from .castorBaseDir
import castorBaseDir
15 from .dataset
import CMSDataset
19 """Write a report to storage"""
22 if isinstance(parent, type(
"")):
25 self.
parent = parent.__class__.__name__
29 for path
in report[
'PathList']:
30 _, name = tempfile.mkstemp(
'.txt', text=
True)
31 json.dump(report,
file(name,
'w'), sort_keys=
True, indent=4)
33 fname =
'%s_%s.txt' % (self.
parent, report[
'DateCreated'])
35 nname = os.path.join(os.path.dirname(name),fname)
36 os.rename(name, nname)
38 castor_path = castortools.lfnToCastor(path)
39 new_name =
'%s/%s' % (castor_path, fname)
40 castortools.xrdcp(nname,path)
43 if castortools.fileExists(new_name):
48 print(
"File published: '%s'" % castortools.castorToLFN(new_name))
51 pathhash = path.replace(
'/',
'.')
52 hashed_name =
'PublishToFileSystem-%s-%s' % (pathhash, fname)
53 shutil.move(nname, hashed_name)
54 print(
"Cannot write to directory '%s' - written to local file '%s' instead." % (castor_path, hashed_name), file=sys.stderr)
56 def read(self, lfn, local = False):
57 """Reads a report from storage"""
61 cat = castortools.cat(castortools.lfnToCastor(lfn))
63 return json.loads(cat)
66 """Finds the lastest file and reads it"""
67 reg =
'^%s_.*\.txt$' % self.
parent
68 files = castortools.matchingFiles(dir, reg)
69 files = sorted([ (os.path.basename(f), f)
for f
in files])
72 return self.
read(files[-1][1])
78 if not dataset.startswith(os.sep):
79 dataset = os.sep + dataset
93 """Query DAS to find out how many events are in the dataset"""
94 from .production_tasks
import BaseDataset
100 self.
options.name = output[
'Name']
104 raise Exception(
"Dataset '%s' not found in Das. Please check." % self.
dataset)
113 for dirname, files
in six.iteritems(self.
test_result):
114 for name, status
in six.iteritems(files):
115 fname = os.path.join(dirname, name)
116 filemask[fname] = status
118 def isCrabFile(name):
119 _, fname = os.path.split(name)
120 base, _ = os.path.splitext(fname)
121 return re.match(
".*_\d+_\d+_\w+$", base)
is not None, base
122 def getCrabIndex(base):
123 tokens = base.split(
'_')
125 return (
int(tokens[-3]),
int(tokens[-2]))
133 isCrab, base = isCrabFile(f)
135 index = getCrabIndex(base)
136 if index
is not None:
139 mmin =
min(mmin, jobid)
140 mmax =
max(mmax, jobid)
141 if jobid
in files
and filemask[f][0]:
142 files[jobid].
append((retry, f))
144 files[jobid] = [(retry, f)]
149 for i
in range(mmin, mmax+1):
151 duplicates = sorted(files[i])
153 fname = duplicates[-1][1]
154 if len(duplicates) > 1:
155 for d
in duplicates[:-1]:
156 good_duplicates[d[1]] = filemask[d[1]][1]
157 sum_dup += good_duplicates[d[1]]
160 return good_duplicates, sorted(
list(bad_jobs)), sum_dup
162 def test(self, previous = None, timeout = -1):
163 if not castortools.fileExists(self.
directory):
164 raise Exception(
"The top level directory '%s' for this dataset does not exist" % self.
directory)
172 if previous
is not None:
173 for name, status
in six.iteritems(previous[
'Files']):
174 prev_results[name] = status
177 for dir, filelist
in six.iteritems(filesToTest):
181 if self.
options.wildcard
is not None:
182 filtered = fnmatch.filter(filelist, self.
options.wildcard)
184 print(
"Warning: The wildcard '%s' does not match any files in '%s'. Please check you are using quotes." % (self.
options.wildcard,self.
directory), file=sys.stderr)
188 fname = os.path.join(dir, ff)
189 lfn = castortools.castorToLFN(fname)
192 if lfn
in prev_results
and prev_results[lfn][0]:
194 print(
'[%i/%i]\t Skipping %s...' % (count, len(filtered),fname), end=
' ')
195 OK, num = prev_results[lfn]
198 print(
'[%i/%i]\t Checking %s...' % (count, len(filtered),fname), end=
' ')
201 filemask[ff] = (OK,num)
207 test_results[castortools.castorToLFN(dir)] = filemask
222 for dirname, files
in six.iteritems(self.
test_result):
223 print(
'Directory: %s' % dirname)
224 for name, status
in six.iteritems(files):
225 fname = os.path.join(dirname, name)
226 if not fname
in self.duplicates:
227 print(
'\t\t %s: %s' % (name,
str(status)))
229 print(
'\t\t %s: %s (Valid duplicate)' % (name,
str(status)))
235 print(
'Total entries in DBS not determined')
237 print(
"Bad Crab Jobs: '%s'" %
','.
join([
str(j)
for j
in self.bad_jobs]))
249 'PrimaryDataset':self.
options.name,
251 'PhysicsGroup':
'CMG',
260 'DateCreated':datetime.datetime.now().strftime(
"%s"),
263 for dirname, files
in six.iteritems(self.
test_result):
264 report[
'PathList'].
append(dirname)
265 for name, status
in six.iteritems(files):
266 fname = os.path.join(dirname, name)
267 report[
'Files'][fname] = status
277 report[
'PrimaryDatasetFraction'] = -1.
280 report[
'FilesGood'] = totalGood
281 report[
'FilesBad'] = totalBad
282 report[
'FilesCount'] = totalGood + totalBad
284 report[
'BadJobs'] = self.bad_jobs
285 report[
'ValidDuplicates'] = self.duplicates
287 report[
'MinRun'] = self.
options.min_run
288 report[
'MaxRun'] = self.
options.max_run
293 """Returns the CASTOR instance to use"""
294 return os.environ.get(
'STAGE_HOST',
'castorcms')
297 """Recursively list a file or directory on castor"""
298 return castortools.listFiles(dir,self.
options.resursive)
301 """filter out filenames so that they only contain root files"""
302 return [f
for f
in self.
listFiles(dir)
if f.lower().endswith(
'.root')]
305 """Sort files into directories"""
308 dirname = os.path.dirname(f)
309 filename = os.path.basename(f)
310 if dirname
not in result: result[dirname] = []
311 result[dirname].
append(filename)
316 """Parse the output of edmFileUtil to get the number of events found"""
317 tokens = output.split(
' ')
320 result =
int(tokens[-4])
326 stdout = subprocess.Popen([
'edmFileUtil',lfn], stdout=subprocess.PIPE,stderr=subprocess.PIPE).
communicate()[0]
327 for error
in [
"Fatal Root Error",
"Could not open file",
"Not a valid collection"]:
328 if error
in stdout:
return (
False,-1)
336 except TimedOutExc
as e:
337 print(
"ERROR:\tedmFileUtil timed out for lfn '%s' (%d)" % (lfn,timeout), file=sys.stderr)
346 if __name__ ==
'__main__':
349 report = {
'DateCreated':
'123456',
'PathList':[
'/store/cmst3/user/wreece']}
351 print(pub.get(
'/store/cmst3/user/wreece'))