3 Classes to check that a set of ROOT files are OK and publish a report 5 from __future__
import print_function
7 import datetime, fnmatch, json, os, shutil, sys, tempfile, time
10 import eostools
as castortools
11 from timeout
import timed_out, TimedOutExc
12 from castorBaseDir
import castorBaseDir
13 from dataset
import CMSDataset
17 """Write a report to storage""" 20 if isinstance(parent, type(
"")):
23 self.
parent = parent.__class__.__name__
27 for path
in report[
'PathList']:
28 _, name = tempfile.mkstemp(
'.txt', text=
True)
29 json.dump(report,
file(name,
'w'), sort_keys=
True, indent=4)
31 fname =
'%s_%s.txt' % (self.
parent, report[
'DateCreated'])
33 nname = os.path.join(os.path.dirname(name),fname)
34 os.rename(name, nname)
36 castor_path = castortools.lfnToCastor(path)
37 new_name =
'%s/%s' % (castor_path, fname)
38 castortools.xrdcp(nname,path)
41 if castortools.fileExists(new_name):
46 print(
"File published: '%s'" % castortools.castorToLFN(new_name))
49 pathhash = path.replace(
'/',
'.')
50 hashed_name =
'PublishToFileSystem-%s-%s' % (pathhash, fname)
51 shutil.move(nname, hashed_name)
52 print(
"Cannot write to directory '%s' - written to local file '%s' instead." % (castor_path, hashed_name), file=sys.stderr)
54 def read(self, lfn, local = False):
55 """Reads a report from storage""" 59 cat = castortools.cat(castortools.lfnToCastor(lfn))
61 return json.loads(cat)
64 """Finds the lastest file and reads it""" 65 reg =
'^%s_.*\.txt$' % self.
parent 66 files = castortools.matchingFiles(dir, reg)
67 files = sorted([ (os.path.basename(f), f)
for f
in files])
70 return self.
read(files[-1][1])
76 if not dataset.startswith(os.sep):
77 dataset = os.sep + dataset
91 """Query DAS to find out how many events are in the dataset""" 92 from production_tasks
import BaseDataset
98 self.options.name = output[
'Name']
102 raise Exception(
"Dataset '%s' not found in Das. Please check." % self.
dataset)
104 self.
eventsTotal = CMSDataset.findPrimaryDatasetEntries(self.options.name, self.options.min_run, self.options.max_run)
111 for dirname, files
in six.iteritems(self.
test_result):
112 for name, status
in six.iteritems(files):
113 fname = os.path.join(dirname, name)
114 filemask[fname] = status
116 def isCrabFile(name):
117 _, fname = os.path.split(name)
118 base, _ = os.path.splitext(fname)
119 return re.match(
".*_\d+_\d+_\w+$", base)
is not None, base
120 def getCrabIndex(base):
121 tokens = base.split(
'_')
123 return (
int(tokens[-3]),
int(tokens[-2]))
131 isCrab, base = isCrabFile(f)
133 index = getCrabIndex(base)
134 if index
is not None:
137 mmin =
min(mmin, jobid)
138 mmax =
max(mmax, jobid)
139 if jobid
in files
and filemask[f][0]:
140 files[jobid].
append((retry, f))
142 files[jobid] = [(retry, f)]
147 for i
in xrange(mmin, mmax+1):
149 duplicates = sorted(files[i])
151 fname = duplicates[-1][1]
152 if len(duplicates) > 1:
153 for d
in duplicates[:-1]:
154 good_duplicates[d[1]] = filemask[d[1]][1]
155 sum_dup += good_duplicates[d[1]]
158 return good_duplicates, sorted(
list(bad_jobs)), sum_dup
160 def test(self, previous = None, timeout = -1):
161 if not castortools.fileExists(self.
directory):
162 raise Exception(
"The top level directory '%s' for this dataset does not exist" % self.
directory)
170 if previous
is not None:
171 for name, status
in six.iteritems(previous[
'Files']):
172 prev_results[name] = status
175 for dir, filelist
in six.iteritems(filesToTest):
179 if self.options.wildcard
is not None:
180 filtered = fnmatch.filter(filelist, self.options.wildcard)
182 print(
"Warning: The wildcard '%s' does not match any files in '%s'. Please check you are using quotes." % (self.options.wildcard,self.
directory), file=sys.stderr)
186 fname = os.path.join(dir, ff)
187 lfn = castortools.castorToLFN(fname)
190 if lfn
in prev_results
and prev_results[lfn][0]:
191 if self.options.printout:
192 print(
'[%i/%i]\t Skipping %s...' % (count, len(filtered),fname), end=
' ')
193 OK, num = prev_results[lfn]
195 if self.options.printout:
196 print(
'[%i/%i]\t Checking %s...' % (count, len(filtered),fname), end=
' ')
199 filemask[ff] = (OK,num)
200 if self.options.printout:
205 test_results[castortools.castorToLFN(dir)] = filemask
217 print(
'DBS Dataset name: %s' % self.options.name)
220 for dirname, files
in six.iteritems(self.
test_result):
221 print(
'Directory: %s' % dirname)
222 for name, status
in six.iteritems(files):
223 fname = os.path.join(dirname, name)
224 if not fname
in self.duplicates:
225 print(
'\t\t %s: %s' % (name,
str(status)))
227 print(
'\t\t %s: %s (Valid duplicate)' % (name,
str(status)))
233 print(
'Total entries in DBS not determined')
235 print(
"Bad Crab Jobs: '%s'" %
','.
join([
str(j)
for j
in self.bad_jobs]))
247 'PrimaryDataset':self.options.name,
249 'PhysicsGroup':
'CMG',
257 'CreatedBy':self.options.user,
258 'DateCreated':datetime.datetime.now().strftime(
"%s"),
261 for dirname, files
in six.iteritems(self.
test_result):
262 report[
'PathList'].
append(dirname)
263 for name, status
in six.iteritems(files):
264 fname = os.path.join(dirname, name)
265 report[
'Files'][fname] = status
275 report[
'PrimaryDatasetFraction'] = -1.
278 report[
'FilesGood'] = totalGood
279 report[
'FilesBad'] = totalBad
280 report[
'FilesCount'] = totalGood + totalBad
282 report[
'BadJobs'] = self.bad_jobs
283 report[
'ValidDuplicates'] = self.duplicates
285 report[
'MinRun'] = self.options.min_run
286 report[
'MaxRun'] = self.options.max_run
291 """Returns the CASTOR instance to use""" 292 return os.environ.get(
'STAGE_HOST',
'castorcms')
295 """Recursively list a file or directory on castor""" 296 return castortools.listFiles(dir,self.options.resursive)
299 """filter out filenames so that they only contain root files""" 300 return [f
for f
in self.
listFiles(dir)
if f.lower().endswith(
'.root')]
303 """Sort files into directories""" 306 dirname = os.path.dirname(f)
307 filename = os.path.basename(f)
308 if dirname
not in result: result[dirname] = []
309 result[dirname].
append(filename)
314 """Parse the output of edmFileUtil to get the number of events found""" 315 tokens = output.split(
' ')
318 result =
int(tokens[-4])
324 stdout = subprocess.Popen([
'edmFileUtil',lfn], stdout=subprocess.PIPE,stderr=subprocess.PIPE).
communicate()[0]
325 for error
in [
"Fatal Root Error",
"Could not open file",
"Not a valid collection"]:
326 if error
in stdout:
return (
False,-1)
334 except TimedOutExc
as e:
335 print(
"ERROR:\tedmFileUtil timed out for lfn '%s' (%d)" % (lfn,timeout), file=sys.stderr)
344 if __name__ ==
'__main__':
347 report = {
'DateCreated':
'123456',
'PathList':[
'/store/cmst3/user/wreece']}
349 print(pub.get(
'/store/cmst3/user/wreece'))
def read(self, lfn, local=False)
def listRootFiles(self, dir)
S & print(S &os, JobReport::InputFile const &f)
static void * communicate(void *obj)
def getParseNumberOfEvents(self, output)
def __init__(self, dataset, options)
def stripDuplicates(self)
static std::string join(char **cmd)
def sortByBaseDir(self, files)
def __init__(self, parent)
def test(self, previous=None, timeout=-1)
def publish(self, report)
def testFileTimeOut(self, lfn, timeout)
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger list("!*","!HLTx*"if it matches 2 triggers or more) will accept the event if all the matching triggers are FAIL.It will reject the event if any of the triggers are PASS or EXCEPTION(this matches the behavior of"!*"before the partial wildcard feature was incorporated).Triggers which are in the READY state are completely ignored.(READY should never be returned since the trigger paths have been run