CMS 3D CMS Logo

edmIntegrityCheck.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 """
3 Classes to check that a set of ROOT files are OK and publish a report
4 """
5 
6 import datetime, fnmatch, json, os, shutil, sys, tempfile, time
7 import subprocess
8 
9 import eostools as castortools
10 from timeout import timed_out, TimedOutExc
11 from castorBaseDir import castorBaseDir
12 from dataset import CMSDataset
13 import six
14 
16  """Write a report to storage"""
17 
18  def __init__(self, parent):
19  if isinstance(parent, type("")):
20  self.parent = parent
21  else:
22  self.parent = parent.__class__.__name__
23 
24  def publish(self, report):
25  """Publish a file"""
26  for path in report['PathList']:
27  _, name = tempfile.mkstemp('.txt', text=True)
28  json.dump(report, file(name,'w'), sort_keys=True, indent=4)
29 
30  fname = '%s_%s.txt' % (self.parent, report['DateCreated'])
31  #rename the file locally - TODO: This is a potential problem
32  nname = os.path.join(os.path.dirname(name),fname)
33  os.rename(name, nname)
34 
35  castor_path = castortools.lfnToCastor(path)
36  new_name = '%s/%s' % (castor_path, fname)
37  castortools.xrdcp(nname,path)
38  time.sleep(1)
39 
40  if castortools.fileExists(new_name):
41 
42  #castortools.move(old_name, new_name)
43  #castortools.chmod(new_name, '644')
44 
45  print "File published: '%s'" % castortools.castorToLFN(new_name)
46  os.remove(nname)
47  else:
48  pathhash = path.replace('/','.')
49  hashed_name = 'PublishToFileSystem-%s-%s' % (pathhash, fname)
50  shutil.move(nname, hashed_name)
51  print >> sys.stderr, "Cannot write to directory '%s' - written to local file '%s' instead." % (castor_path, hashed_name)
52 
53  def read(self, lfn, local = False):
54  """Reads a report from storage"""
55  if local:
56  cat = file(lfn).read()
57  else:
58  cat = castortools.cat(castortools.lfnToCastor(lfn))
59  #print "the cat is: ", cat
60  return json.loads(cat)
61 
62  def get(self, dir):
63  """Finds the lastest file and reads it"""
64  reg = '^%s_.*\.txt$' % self.parent
65  files = castortools.matchingFiles(dir, reg)
66  files = sorted([ (os.path.basename(f), f) for f in files])
67  if not files:
68  return None
69  return self.read(files[-1][1])
70 
71 
73 
74  def __init__(self, dataset, options):
75  if not dataset.startswith(os.sep):
76  dataset = os.sep + dataset
77 
78  self.dataset = dataset
79  self.options = options
80  self.topdir = castortools.lfnToCastor( castorBaseDir(user=options.user) )
81  self.directory = os.path.join(self.topdir, *self.dataset.split(os.sep))
82 
83  #event counters
84  self.eventsTotal = -1
85  self.eventsSeen = 0
86 
87  self.test_result = None
88 
89  def query(self):
90  """Query DAS to find out how many events are in the dataset"""
91  from production_tasks import BaseDataset
92  base = BaseDataset(self.dataset, self.options.user, self.options)
93 
94  data = None
95  output = base.run({})
96  if 'Das' in output:
97  self.options.name = output['Name']
98  data = output['Das']
99 
100  if data is None:
101  raise Exception("Dataset '%s' not found in Das. Please check." % self.dataset)
102  #get the number of events in the dataset
103  self.eventsTotal = CMSDataset.findPrimaryDatasetEntries(self.options.name, self.options.min_run, self.options.max_run)
104 
105  def stripDuplicates(self):
106 
107  import re
108 
109  filemask = {}
110  for dirname, files in six.iteritems(self.test_result):
111  for name, status in six.iteritems(files):
112  fname = os.path.join(dirname, name)
113  filemask[fname] = status
114 
115  def isCrabFile(name):
116  _, fname = os.path.split(name)
117  base, _ = os.path.splitext(fname)
118  return re.match(".*_\d+_\d+_\w+$", base) is not None, base
119  def getCrabIndex(base):
120  tokens = base.split('_')
121  if len(tokens) > 2:
122  return (int(tokens[-3]), int(tokens[-2]))
123  return None
124 
125  files = {}
126 
127  mmin = 1000000000
128  mmax = -100000000
129  for f in filemask:
130  isCrab, base = isCrabFile(f)
131  if isCrab:
132  index = getCrabIndex(base)
133  if index is not None:
134  jobid, retry = index
135 
136  mmin = min(mmin, jobid)
137  mmax = max(mmax, jobid)
138  if jobid in files and filemask[f][0]:
139  files[jobid].append((retry, f))
140  elif filemask[f][0]:
141  files[jobid] = [(retry, f)]
142 
143  good_duplicates = {}
144  bad_jobs = set()
145  sum_dup = 0
146  for i in xrange(mmin, mmax+1):
147  if i in files:
148  duplicates = sorted(files[i])
149 
150  fname = duplicates[-1][1]
151  if len(duplicates) > 1:
152  for d in duplicates[:-1]:
153  good_duplicates[d[1]] = filemask[d[1]][1]
154  sum_dup += good_duplicates[d[1]]
155  else:
156  bad_jobs.add(i)
157  return good_duplicates, sorted(list(bad_jobs)), sum_dup
158 
159  def test(self, previous = None, timeout = -1):
160  if not castortools.fileExists(self.directory):
161  raise Exception("The top level directory '%s' for this dataset does not exist" % self.directory)
162 
163  self.query()
164 
165  test_results = {}
166 
167  #support updating to speed things up
168  prev_results = {}
169  if previous is not None:
170  for name, status in six.iteritems(previous['Files']):
171  prev_results[name] = status
172 
173  filesToTest = self.sortByBaseDir(self.listRootFiles(self.directory))
174  for dir, filelist in six.iteritems(filesToTest):
175  filemask = {}
176  #apply a UNIX wildcard if specified
177  filtered = filelist
178  if self.options.wildcard is not None:
179  filtered = fnmatch.filter(filelist, self.options.wildcard)
180  if not filtered:
181  print >> sys.stderr, "Warning: The wildcard '%s' does not match any files in '%s'. Please check you are using quotes." % (self.options.wildcard,self.directory)
182 
183  count = 0
184  for ff in filtered:
185  fname = os.path.join(dir, ff)
186  lfn = castortools.castorToLFN(fname)
187 
188  #try to update from the previous result if available
189  if lfn in prev_results and prev_results[lfn][0]:
190  if self.options.printout:
191  print '[%i/%i]\t Skipping %s...' % (count, len(filtered),fname),
192  OK, num = prev_results[lfn]
193  else:
194  if self.options.printout:
195  print '[%i/%i]\t Checking %s...' % (count, len(filtered),fname),
196  OK, num = self.testFileTimeOut(lfn, timeout)
197 
198  filemask[ff] = (OK,num)
199  if self.options.printout:
200  print (OK, num)
201  if OK:
202  self.eventsSeen += num
203  count += 1
204  test_results[castortools.castorToLFN(dir)] = filemask
205  self.test_result = test_results
206 
207  self.duplicates, self.bad_jobs, sum_dup = self.stripDuplicates()
208  #remove duplicate entries from the event count
209  self.eventsSeen -= sum_dup
210 
211  def report(self):
212 
213  if self.test_result is None:
214  self.test()
215 
216  print 'DBS Dataset name: %s' % self.options.name
217  print 'Storage path: %s' % self.topdir
218 
219  for dirname, files in six.iteritems(self.test_result):
220  print 'Directory: %s' % dirname
221  for name, status in six.iteritems(files):
222  fname = os.path.join(dirname, name)
223  if not fname in self.duplicates:
224  print '\t\t %s: %s' % (name, str(status))
225  else:
226  print '\t\t %s: %s (Valid duplicate)' % (name, str(status))
227  print 'Total entries in DBS: %i' % self.eventsTotal
228  print 'Total entries in processed files: %i' % self.eventsSeen
229  if self.eventsTotal>0:
230  print 'Fraction of dataset processed: %f' % (self.eventsSeen/(1.*self.eventsTotal))
231  else:
232  print 'Total entries in DBS not determined'
233  if self.bad_jobs:
234  print "Bad Crab Jobs: '%s'" % ','.join([str(j) for j in self.bad_jobs])
235 
236  def structured(self):
237 
238  if self.test_result is None:
239  self.test()
240 
241  totalGood = 0
242  totalBad = 0
243 
244  report = {'data':{},
245  'ReportVersion':3,
246  'PrimaryDataset':self.options.name,
247  'Name':self.dataset,
248  'PhysicsGroup':'CMG',
249  'Status':'VALID',
250  'TierList':[],
251  'AlgoList':[],
252  'RunList':[],
253  'PathList':[],
254  'Topdir':self.topdir,
255  'StageHost':self.stageHost(),
256  'CreatedBy':self.options.user,
257  'DateCreated':datetime.datetime.now().strftime("%s"),
258  'Files':{}}
259 
260  for dirname, files in six.iteritems(self.test_result):
261  report['PathList'].append(dirname)
262  for name, status in six.iteritems(files):
263  fname = os.path.join(dirname, name)
264  report['Files'][fname] = status
265  if status[0]:
266  totalGood += 1
267  else:
268  totalBad += 1
269 
270  report['PrimaryDatasetEntries'] = self.eventsTotal
271  if self.eventsTotal>0:
272  report['PrimaryDatasetFraction'] = (self.eventsSeen/(1.*self.eventsTotal))
273  else:
274  report['PrimaryDatasetFraction'] = -1.
275  report['FilesEntries'] = self.eventsSeen
276 
277  report['FilesGood'] = totalGood
278  report['FilesBad'] = totalBad
279  report['FilesCount'] = totalGood + totalBad
280 
281  report['BadJobs'] = self.bad_jobs
282  report['ValidDuplicates'] = self.duplicates
283 
284  report['MinRun'] = self.options.min_run
285  report['MaxRun'] = self.options.max_run
286 
287  return report
288 
289  def stageHost(self):
290  """Returns the CASTOR instance to use"""
291  return os.environ.get('STAGE_HOST','castorcms')
292 
293  def listFiles(self,dir):
294  """Recursively list a file or directory on castor"""
295  return castortools.listFiles(dir,self.options.resursive)
296 
297  def listRootFiles(self,dir):
298  """filter out filenames so that they only contain root files"""
299  return [f for f in self.listFiles(dir) if f.lower().endswith('.root')]
300 
301  def sortByBaseDir(self,files):
302  """Sort files into directories"""
303  result = {}
304  for f in files:
305  dirname = os.path.dirname(f)
306  filename = os.path.basename(f)
307  if dirname not in result: result[dirname] = []
308  result[dirname].append(filename)
309  return result
310 
311 
312  def getParseNumberOfEvents(self,output):
313  """Parse the output of edmFileUtil to get the number of events found"""
314  tokens = output.split(' ')
315  result = -2
316  try:
317  result = int(tokens[-4])
318  except ValueError:
319  pass
320  return result
321 
322  def testFile(self,lfn):
323  stdout = subprocess.Popen(['edmFileUtil',lfn], stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0]
324  for error in ["Fatal Root Error","Could not open file","Not a valid collection"]:
325  if error in stdout: return (False,-1)
326  return (True,self.getParseNumberOfEvents(stdout))
327 
328  def testFileTimeOut(self,lfn, timeout):
329  @timed_out(timeout)
330  def tf(lfn):
331  try:
332  return self.testFile(lfn)
333  except TimedOutExc as e:
334  print >> sys.stderr, "ERROR:\tedmFileUtil timed out for lfn '%s' (%d)" % (lfn,timeout)
335  return (False,-1)
336  if timeout > 0:
337  return tf(lfn)
338  else:
339  return self.testFile(lfn)
340 
341 
342 
343 if __name__ == '__main__':
344 
345  pub = PublishToFileSystem('Test')
346  report = {'DateCreated':'123456','PathList':['/store/cmst3/user/wreece']}
347  pub.publish(report)
348  print pub.get('/store/cmst3/user/wreece')
def read(self, lfn, local=False)
static void * communicate(void *obj)
Definition: DQMNet.cc:1251
def __init__(self, dataset, options)
T min(T a, T b)
Definition: MathUtil.h:58
def timed_out(timeout)
Definition: timeout.py:23
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
def test(self, previous=None, timeout=-1)
#define str(s)
def testFileTimeOut(self, lfn, timeout)
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger list("!*","!HLTx*"if it matches 2 triggers or more) will accept the event if all the matching triggers are FAIL.It will reject the event if any of the triggers are PASS or EXCEPTION(this matches the behavior of"!*"before the partial wildcard feature was incorporated).Triggers which are in the READY state are completely ignored.(READY should never be returned since the trigger paths have been run