3 Classes to check that a set of ROOT files are OK and publish a report 6 import datetime, fnmatch, json, os, shutil, sys, tempfile, time
9 import eostools
as castortools
10 from timeout
import timed_out, TimedOutExc
11 from castorBaseDir
import castorBaseDir
12 from dataset
import CMSDataset
16 """Write a report to storage""" 19 if isinstance(parent, type(
"")):
22 self.
parent = parent.__class__.__name__
26 for path
in report[
'PathList']:
27 _, name = tempfile.mkstemp(
'.txt', text=
True)
28 json.dump(report,
file(name,
'w'), sort_keys=
True, indent=4)
30 fname =
'%s_%s.txt' % (self.
parent, report[
'DateCreated'])
32 nname = os.path.join(os.path.dirname(name),fname)
33 os.rename(name, nname)
35 castor_path = castortools.lfnToCastor(path)
36 new_name =
'%s/%s' % (castor_path, fname)
37 castortools.xrdcp(nname,path)
40 if castortools.fileExists(new_name):
45 print "File published: '%s'" % castortools.castorToLFN(new_name)
48 pathhash = path.replace(
'/',
'.')
49 hashed_name =
'PublishToFileSystem-%s-%s' % (pathhash, fname)
50 shutil.move(nname, hashed_name)
51 print >> sys.stderr,
"Cannot write to directory '%s' - written to local file '%s' instead." % (castor_path, hashed_name)
53 def read(self, lfn, local = False):
54 """Reads a report from storage""" 58 cat = castortools.cat(castortools.lfnToCastor(lfn))
60 return json.loads(cat)
63 """Finds the lastest file and reads it""" 64 reg =
'^%s_.*\.txt$' % self.
parent 65 files = castortools.matchingFiles(dir, reg)
66 files = sorted([ (os.path.basename(f), f)
for f
in files])
69 return self.
read(files[-1][1])
75 if not dataset.startswith(os.sep):
76 dataset = os.sep + dataset
90 """Query DAS to find out how many events are in the dataset""" 91 from production_tasks
import BaseDataset
97 self.options.name = output[
'Name']
101 raise Exception(
"Dataset '%s' not found in Das. Please check." % self.
dataset)
103 self.
eventsTotal = CMSDataset.findPrimaryDatasetEntries(self.options.name, self.options.min_run, self.options.max_run)
110 for dirname, files
in six.iteritems(self.
test_result):
111 for name, status
in six.iteritems(files):
112 fname = os.path.join(dirname, name)
113 filemask[fname] = status
115 def isCrabFile(name):
116 _, fname = os.path.split(name)
117 base, _ = os.path.splitext(fname)
118 return re.match(
".*_\d+_\d+_\w+$", base)
is not None, base
119 def getCrabIndex(base):
120 tokens = base.split(
'_')
122 return (
int(tokens[-3]),
int(tokens[-2]))
130 isCrab, base = isCrabFile(f)
132 index = getCrabIndex(base)
133 if index
is not None:
136 mmin =
min(mmin, jobid)
137 mmax =
max(mmax, jobid)
138 if jobid
in files
and filemask[f][0]:
139 files[jobid].
append((retry, f))
141 files[jobid] = [(retry, f)]
146 for i
in xrange(mmin, mmax+1):
148 duplicates = sorted(files[i])
150 fname = duplicates[-1][1]
151 if len(duplicates) > 1:
152 for d
in duplicates[:-1]:
153 good_duplicates[d[1]] = filemask[d[1]][1]
154 sum_dup += good_duplicates[d[1]]
157 return good_duplicates, sorted(
list(bad_jobs)), sum_dup
159 def test(self, previous = None, timeout = -1):
160 if not castortools.fileExists(self.
directory):
161 raise Exception(
"The top level directory '%s' for this dataset does not exist" % self.
directory)
169 if previous
is not None:
170 for name, status
in six.iteritems(previous[
'Files']):
171 prev_results[name] = status
174 for dir, filelist
in six.iteritems(filesToTest):
178 if self.options.wildcard
is not None:
179 filtered = fnmatch.filter(filelist, self.options.wildcard)
181 print >> sys.stderr,
"Warning: The wildcard '%s' does not match any files in '%s'. Please check you are using quotes." % (self.options.wildcard,self.
directory)
185 fname = os.path.join(dir, ff)
186 lfn = castortools.castorToLFN(fname)
189 if lfn
in prev_results
and prev_results[lfn][0]:
190 if self.options.printout:
191 print '[%i/%i]\t Skipping %s...' % (count, len(filtered),fname),
192 OK, num = prev_results[lfn]
194 if self.options.printout:
195 print '[%i/%i]\t Checking %s...' % (count, len(filtered),fname),
198 filemask[ff] = (OK,num)
199 if self.options.printout:
204 test_results[castortools.castorToLFN(dir)] = filemask
216 print 'DBS Dataset name: %s' % self.options.name
217 print 'Storage path: %s' % self.
topdir 219 for dirname, files
in six.iteritems(self.
test_result):
220 print 'Directory: %s' % dirname
221 for name, status
in six.iteritems(files):
222 fname = os.path.join(dirname, name)
223 if not fname
in self.duplicates:
224 print '\t\t %s: %s' % (name,
str(status))
226 print '\t\t %s: %s (Valid duplicate)' % (name,
str(status))
227 print 'Total entries in DBS: %i' % self.
eventsTotal 228 print 'Total entries in processed files: %i' % self.
eventsSeen 232 print 'Total entries in DBS not determined' 234 print "Bad Crab Jobs: '%s'" %
','.
join([
str(j)
for j
in self.bad_jobs])
246 'PrimaryDataset':self.options.name,
248 'PhysicsGroup':
'CMG',
256 'CreatedBy':self.options.user,
257 'DateCreated':datetime.datetime.now().strftime(
"%s"),
260 for dirname, files
in six.iteritems(self.
test_result):
261 report[
'PathList'].
append(dirname)
262 for name, status
in six.iteritems(files):
263 fname = os.path.join(dirname, name)
264 report[
'Files'][fname] = status
274 report[
'PrimaryDatasetFraction'] = -1.
277 report[
'FilesGood'] = totalGood
278 report[
'FilesBad'] = totalBad
279 report[
'FilesCount'] = totalGood + totalBad
281 report[
'BadJobs'] = self.bad_jobs
282 report[
'ValidDuplicates'] = self.duplicates
284 report[
'MinRun'] = self.options.min_run
285 report[
'MaxRun'] = self.options.max_run
290 """Returns the CASTOR instance to use""" 291 return os.environ.get(
'STAGE_HOST',
'castorcms')
294 """Recursively list a file or directory on castor""" 295 return castortools.listFiles(dir,self.options.resursive)
298 """filter out filenames so that they only contain root files""" 299 return [f
for f
in self.
listFiles(dir)
if f.lower().endswith(
'.root')]
302 """Sort files into directories""" 305 dirname = os.path.dirname(f)
306 filename = os.path.basename(f)
307 if dirname
not in result: result[dirname] = []
308 result[dirname].
append(filename)
313 """Parse the output of edmFileUtil to get the number of events found""" 314 tokens = output.split(
' ')
317 result =
int(tokens[-4])
323 stdout = subprocess.Popen([
'edmFileUtil',lfn], stdout=subprocess.PIPE,stderr=subprocess.PIPE).
communicate()[0]
324 for error
in [
"Fatal Root Error",
"Could not open file",
"Not a valid collection"]:
325 if error
in stdout:
return (
False,-1)
333 except TimedOutExc
as e:
334 print >> sys.stderr,
"ERROR:\tedmFileUtil timed out for lfn '%s' (%d)" % (lfn,timeout)
343 if __name__ ==
'__main__':
346 report = {
'DateCreated':
'123456',
'PathList':[
'/store/cmst3/user/wreece']}
348 print pub.get(
'/store/cmst3/user/wreece')
def read(self, lfn, local=False)
def listRootFiles(self, dir)
static void * communicate(void *obj)
def getParseNumberOfEvents(self, output)
def __init__(self, dataset, options)
def stripDuplicates(self)
static std::string join(char **cmd)
def sortByBaseDir(self, files)
def __init__(self, parent)
def test(self, previous=None, timeout=-1)
def publish(self, report)
def testFileTimeOut(self, lfn, timeout)
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger list("!*","!HLTx*"if it matches 2 triggers or more) will accept the event if all the matching triggers are FAIL.It will reject the event if any of the triggers are PASS or EXCEPTION(this matches the behavior of"!*"before the partial wildcard feature was incorporated).Triggers which are in the READY state are completely ignored.(READY should never be returned since the trigger paths have been run