3 Classes to check that a set of ROOT files are OK and publish a report 5 from __future__
import print_function
6 from __future__
import absolute_import
8 from builtins
import range
9 import datetime, fnmatch, json, os, shutil, sys, tempfile, time
12 from .
import eostools
as castortools
13 from .timeout
import timed_out, TimedOutExc
14 from .castorBaseDir
import castorBaseDir
15 from .dataset
import CMSDataset
18 """Write a report to storage""" 21 if isinstance(parent, type(
"")):
24 self.
parent = parent.__class__.__name__
28 for path
in report[
'PathList']:
29 _, name = tempfile.mkstemp(
'.txt', text=
True)
30 json.dump(report,
file(name,
'w'), sort_keys=
True, indent=4)
32 fname =
'%s_%s.txt' % (self.
parent, report[
'DateCreated'])
34 nname = os.path.join(os.path.dirname(name),fname)
35 os.rename(name, nname)
37 castor_path = castortools.lfnToCastor(path)
38 new_name =
'%s/%s' % (castor_path, fname)
39 castortools.xrdcp(nname,path)
42 if castortools.fileExists(new_name):
47 print(
"File published: '%s'" % castortools.castorToLFN(new_name))
50 pathhash = path.replace(
'/',
'.')
51 hashed_name =
'PublishToFileSystem-%s-%s' % (pathhash, fname)
52 shutil.move(nname, hashed_name)
53 print(
"Cannot write to directory '%s' - written to local file '%s' instead." % (castor_path, hashed_name), file=sys.stderr)
55 def read(self, lfn, local = False):
56 """Reads a report from storage""" 60 cat = castortools.cat(castortools.lfnToCastor(lfn))
62 return json.loads(cat)
65 """Finds the lastest file and reads it""" 66 reg =
'^%s_.*\.txt$' % self.
parent 67 files = castortools.matchingFiles(dir, reg)
68 files = sorted([ (os.path.basename(f), f)
for f
in files])
71 return self.
read(files[-1][1])
77 if not dataset.startswith(os.sep):
78 dataset = os.sep + dataset
92 """Query DAS to find out how many events are in the dataset""" 93 from .production_tasks
import BaseDataset
99 self.
options.name = output[
'Name']
103 raise Exception(
"Dataset '%s' not found in Das. Please check." % self.
dataset)
113 for name, status
in files.items():
114 fname = os.path.join(dirname, name)
115 filemask[fname] = status
117 def isCrabFile(name):
118 _, fname = os.path.split(name)
119 base, _ = os.path.splitext(fname)
120 return re.match(
".*_\d+_\d+_\w+$", base)
is not None, base
121 def getCrabIndex(base):
122 tokens = base.split(
'_')
124 return (
int(tokens[-3]),
int(tokens[-2]))
132 isCrab, base = isCrabFile(f)
134 index = getCrabIndex(base)
135 if index
is not None:
138 mmin =
min(mmin, jobid)
139 mmax =
max(mmax, jobid)
140 if jobid
in files
and filemask[f][0]:
141 files[jobid].
append((retry, f))
143 files[jobid] = [(retry, f)]
148 for i
in range(mmin, mmax+1):
150 duplicates = sorted(files[i])
152 fname = duplicates[-1][1]
153 if len(duplicates) > 1:
154 for d
in duplicates[:-1]:
155 good_duplicates[d[1]] = filemask[d[1]][1]
156 sum_dup += good_duplicates[d[1]]
159 return good_duplicates, sorted(list(bad_jobs)), sum_dup
161 def test(self, previous = None, timeout = -1):
162 if not castortools.fileExists(self.
directory):
163 raise Exception(
"The top level directory '%s' for this dataset does not exist" % self.
directory)
171 if previous
is not None:
172 for name, status
in previous[
'Files'].
items():
173 prev_results[name] = status
176 for dir, filelist
in filesToTest.items():
180 if self.
options.wildcard
is not None:
181 filtered = fnmatch.filter(filelist, self.
options.wildcard)
183 print(
"Warning: The wildcard '%s' does not match any files in '%s'. Please check you are using quotes." % (self.
options.wildcard,self.
directory), file=sys.stderr)
187 fname = os.path.join(dir, ff)
188 lfn = castortools.castorToLFN(fname)
191 if lfn
in prev_results
and prev_results[lfn][0]:
193 print(
'[%i/%i]\t Skipping %s...' % (count, len(filtered),fname), end=
' ')
194 OK, num = prev_results[lfn]
197 print(
'[%i/%i]\t Checking %s...' % (count, len(filtered),fname), end=
' ')
200 filemask[ff] = (OK,num)
206 test_results[castortools.castorToLFN(dir)] = filemask
222 print(
'Directory: %s' % dirname)
223 for name, status
in files.items():
224 fname = os.path.join(dirname, name)
225 if not fname
in self.duplicates:
226 print(
'\t\t %s: %s' % (name,
str(status)))
228 print(
'\t\t %s: %s (Valid duplicate)' % (name,
str(status)))
234 print(
'Total entries in DBS not determined')
236 print(
"Bad Crab Jobs: '%s'" %
','.
join([
str(j)
for j
in self.bad_jobs]))
248 'PrimaryDataset':self.
options.name,
250 'PhysicsGroup':
'CMG',
259 'DateCreated':datetime.datetime.now().strftime(
"%s"),
263 report[
'PathList'].
append(dirname)
264 for name, status
in files.items():
265 fname = os.path.join(dirname, name)
266 report[
'Files'][fname] = status
276 report[
'PrimaryDatasetFraction'] = -1.
279 report[
'FilesGood'] = totalGood
280 report[
'FilesBad'] = totalBad
281 report[
'FilesCount'] = totalGood + totalBad
283 report[
'BadJobs'] = self.bad_jobs
284 report[
'ValidDuplicates'] = self.duplicates
286 report[
'MinRun'] = self.
options.min_run
287 report[
'MaxRun'] = self.
options.max_run
292 """Returns the CASTOR instance to use""" 293 return os.environ.get(
'STAGE_HOST',
'castorcms')
296 """Recursively list a file or directory on castor""" 297 return castortools.listFiles(dir,self.
options.resursive)
300 """filter out filenames so that they only contain root files""" 301 return [f
for f
in self.
listFiles(dir)
if f.lower().endswith(
'.root')]
304 """Sort files into directories""" 307 dirname = os.path.dirname(f)
308 filename = os.path.basename(f)
309 if dirname
not in result: result[dirname] = []
310 result[dirname].
append(filename)
315 """Parse the output of edmFileUtil to get the number of events found""" 316 tokens = output.split(
' ')
319 result =
int(tokens[-4])
325 stdout = subprocess.Popen([
'edmFileUtil',lfn], stdout=subprocess.PIPE,stderr=subprocess.PIPE).
communicate()[0]
326 for error
in [
"Fatal Root Error",
"Could not open file",
"Not a valid collection"]:
327 if error
in stdout:
return (
False,-1)
335 except TimedOutExc
as e:
336 print(
"ERROR:\tedmFileUtil timed out for lfn '%s' (%d)" % (lfn,timeout), file=sys.stderr)
345 if __name__ ==
'__main__':
348 report = {
'DateCreated':
'123456',
'PathList':[
'/store/cmst3/user/wreece']}
350 print(pub.get(
'/store/cmst3/user/wreece'))
def read(self, lfn, local=False)
def listRootFiles(self, dir)
static void * communicate(void *obj)
def getParseNumberOfEvents(self, output)
def __init__(self, dataset, options)
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
def split(sequence, size)
def stripDuplicates(self)
static std::string join(char **cmd)
def sortByBaseDir(self, files)
def __init__(self, parent)
def test(self, previous=None, timeout=-1)
def publish(self, report)
def testFileTimeOut(self, lfn, timeout)