CMS 3D CMS Logo

edmIntegrityCheck.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 """
3 Classes to check that a set of ROOT files are OK and publish a report
4 """
5 
6 import datetime, fnmatch, json, os, shutil, sys, tempfile, time
7 import subprocess
8 
9 import eostools as castortools
10 from timeout import timed_out, TimedOutExc
11 from castorBaseDir import castorBaseDir
12 from dataset import CMSDataset
13 
15  """Write a report to storage"""
16 
17  def __init__(self, parent):
18  if type(parent) == type(""):
19  self.parent = parent
20  else:
21  self.parent = parent.__class__.__name__
22 
23  def publish(self, report):
24  """Publish a file"""
25  for path in report['PathList']:
26  _, name = tempfile.mkstemp('.txt', text=True)
27  json.dump(report, file(name,'w'), sort_keys=True, indent=4)
28 
29  fname = '%s_%s.txt' % (self.parent, report['DateCreated'])
30  #rename the file locally - TODO: This is a potential problem
31  nname = os.path.join(os.path.dirname(name),fname)
32  os.rename(name, nname)
33 
34  castor_path = castortools.lfnToCastor(path)
35  new_name = '%s/%s' % (castor_path, fname)
36  castortools.xrdcp(nname,path)
37  time.sleep(1)
38 
39  if castortools.fileExists(new_name):
40 
41  #castortools.move(old_name, new_name)
42  #castortools.chmod(new_name, '644')
43 
44  print "File published: '%s'" % castortools.castorToLFN(new_name)
45  os.remove(nname)
46  else:
47  pathhash = path.replace('/','.')
48  hashed_name = 'PublishToFileSystem-%s-%s' % (pathhash, fname)
49  shutil.move(nname, hashed_name)
50  print >> sys.stderr, "Cannot write to directory '%s' - written to local file '%s' instead." % (castor_path, hashed_name)
51 
52  def read(self, lfn, local = False):
53  """Reads a report from storage"""
54  if local:
55  cat = file(lfn).read()
56  else:
57  cat = castortools.cat(castortools.lfnToCastor(lfn))
58  #print "the cat is: ", cat
59  return json.loads(cat)
60 
61  def get(self, dir):
62  """Finds the lastest file and reads it"""
63  reg = '^%s_.*\.txt$' % self.parent
64  files = castortools.matchingFiles(dir, reg)
65  files = sorted([ (os.path.basename(f), f) for f in files])
66  if not files:
67  return None
68  return self.read(files[-1][1])
69 
70 
72 
73  def __init__(self, dataset, options):
74  if not dataset.startswith(os.sep):
75  dataset = os.sep + dataset
76 
77  self.dataset = dataset
78  self.options = options
79  self.topdir = castortools.lfnToCastor( castorBaseDir(user=options.user) )
80  self.directory = os.path.join(self.topdir, *self.dataset.split(os.sep))
81 
82  #event counters
83  self.eventsTotal = -1
84  self.eventsSeen = 0
85 
86  self.test_result = None
87 
88  def query(self):
89  """Query DAS to find out how many events are in the dataset"""
90  from production_tasks import BaseDataset
91  base = BaseDataset(self.dataset, self.options.user, self.options)
92 
93  data = None
94  output = base.run({})
95  if 'Das' in output:
96  self.options.name = output['Name']
97  data = output['Das']
98 
99  if data is None:
100  raise Exception("Dataset '%s' not found in Das. Please check." % self.dataset)
101  #get the number of events in the dataset
102  self.eventsTotal = CMSDataset.findPrimaryDatasetEntries(self.options.name, self.options.min_run, self.options.max_run)
103 
104  def stripDuplicates(self):
105 
106  import re
107 
108  filemask = {}
109  for dirname, files in self.test_result.iteritems():
110  for name, status in files.iteritems():
111  fname = os.path.join(dirname, name)
112  filemask[fname] = status
113 
114  def isCrabFile(name):
115  _, fname = os.path.split(name)
116  base, _ = os.path.splitext(fname)
117  return re.match(".*_\d+_\d+_\w+$", base) is not None, base
118  def getCrabIndex(base):
119  tokens = base.split('_')
120  if len(tokens) > 2:
121  return (int(tokens[-3]), int(tokens[-2]))
122  return None
123 
124  files = {}
125 
126  mmin = 1000000000
127  mmax = -100000000
128  for f in filemask:
129  isCrab, base = isCrabFile(f)
130  if isCrab:
131  index = getCrabIndex(base)
132  if index is not None:
133  jobid, retry = index
134 
135  mmin = min(mmin, jobid)
136  mmax = max(mmax, jobid)
137  if jobid in files and filemask[f][0]:
138  files[jobid].append((retry, f))
139  elif filemask[f][0]:
140  files[jobid] = [(retry, f)]
141 
142  good_duplicates = {}
143  bad_jobs = set()
144  sum_dup = 0
145  for i in xrange(mmin, mmax+1):
146  if i in files:
147  duplicates = files[i]
148  duplicates.sort()
149 
150  fname = duplicates[-1][1]
151  if len(duplicates) > 1:
152  for d in duplicates[:-1]:
153  good_duplicates[d[1]] = filemask[d[1]][1]
154  sum_dup += good_duplicates[d[1]]
155  else:
156  bad_jobs.add(i)
157  return good_duplicates, sorted(list(bad_jobs)), sum_dup
158 
159  def test(self, previous = None, timeout = -1):
160  if not castortools.fileExists(self.directory):
161  raise Exception("The top level directory '%s' for this dataset does not exist" % self.directory)
162 
163  self.query()
164 
165  test_results = {}
166 
167  #support updating to speed things up
168  prev_results = {}
169  if previous is not None:
170  for name, status in previous['Files'].iteritems():
171  prev_results[name] = status
172 
173  filesToTest = self.sortByBaseDir(self.listRootFiles(self.directory))
174  for dir, filelist in filesToTest.iteritems():
175  filemask = {}
176  #apply a UNIX wildcard if specified
177  filtered = filelist
178  if self.options.wildcard is not None:
179  filtered = fnmatch.filter(filelist, self.options.wildcard)
180  if not filtered:
181  print >> sys.stderr, "Warning: The wildcard '%s' does not match any files in '%s'. Please check you are using quotes." % (self.options.wildcard,self.directory)
182 
183  count = 0
184  for ff in filtered:
185  fname = os.path.join(dir, ff)
186  lfn = castortools.castorToLFN(fname)
187 
188  #try to update from the previous result if available
189  if lfn in prev_results and prev_results[lfn][0]:
190  if self.options.printout:
191  print '[%i/%i]\t Skipping %s...' % (count, len(filtered),fname),
192  OK, num = prev_results[lfn]
193  else:
194  if self.options.printout:
195  print '[%i/%i]\t Checking %s...' % (count, len(filtered),fname),
196  OK, num = self.testFileTimeOut(lfn, timeout)
197 
198  filemask[ff] = (OK,num)
199  if self.options.printout:
200  print (OK, num)
201  if OK:
202  self.eventsSeen += num
203  count += 1
204  test_results[castortools.castorToLFN(dir)] = filemask
205  self.test_result = test_results
206 
207  self.duplicates, self.bad_jobs, sum_dup = self.stripDuplicates()
208  #remove duplicate entries from the event count
209  self.eventsSeen -= sum_dup
210 
211  def report(self):
212 
213  if self.test_result is None:
214  self.test()
215 
216  print 'DBS Dataset name: %s' % self.options.name
217  print 'Storage path: %s' % self.topdir
218 
219  for dirname, files in self.test_result.iteritems():
220  print 'Directory: %s' % dirname
221  for name, status in files.iteritems():
222  fname = os.path.join(dirname, name)
223  if not fname in self.duplicates:
224  print '\t\t %s: %s' % (name, str(status))
225  else:
226  print '\t\t %s: %s (Valid duplicate)' % (name, str(status))
227  print 'Total entries in DBS: %i' % self.eventsTotal
228  print 'Total entries in processed files: %i' % self.eventsSeen
229  if self.eventsTotal>0:
230  print 'Fraction of dataset processed: %f' % (self.eventsSeen/(1.*self.eventsTotal))
231  else:
232  print 'Total entries in DBS not determined'
233  if self.bad_jobs:
234  print "Bad Crab Jobs: '%s'" % ','.join([str(j) for j in self.bad_jobs])
235 
236  def structured(self):
237 
238  if self.test_result is None:
239  self.test()
240 
241  totalGood = 0
242  totalBad = 0
243 
244  report = {'data':{},
245  'ReportVersion':3,
246  'PrimaryDataset':self.options.name,
247  'Name':self.dataset,
248  'PhysicsGroup':'CMG',
249  'Status':'VALID',
250  'TierList':[],
251  'AlgoList':[],
252  'RunList':[],
253  'PathList':[],
254  'Topdir':self.topdir,
255  'StageHost':self.stageHost(),
256  'CreatedBy':self.options.user,
257  'DateCreated':datetime.datetime.now().strftime("%s"),
258  'Files':{}}
259 
260  for dirname, files in self.test_result.iteritems():
261  report['PathList'].append(dirname)
262  for name, status in files.iteritems():
263  fname = os.path.join(dirname, name)
264  report['Files'][fname] = status
265  if status[0]:
266  totalGood += 1
267  else:
268  totalBad += 1
269 
270  report['PrimaryDatasetEntries'] = self.eventsTotal
271  if self.eventsTotal>0:
272  report['PrimaryDatasetFraction'] = (self.eventsSeen/(1.*self.eventsTotal))
273  else:
274  report['PrimaryDatasetFraction'] = -1.
275  report['FilesEntries'] = self.eventsSeen
276 
277  report['FilesGood'] = totalGood
278  report['FilesBad'] = totalBad
279  report['FilesCount'] = totalGood + totalBad
280 
281  report['BadJobs'] = self.bad_jobs
282  report['ValidDuplicates'] = self.duplicates
283 
284  report['MinRun'] = self.options.min_run
285  report['MaxRun'] = self.options.max_run
286 
287  return report
288 
289  def stageHost(self):
290  """Returns the CASTOR instance to use"""
291  return os.environ.get('STAGE_HOST','castorcms')
292 
293  def listFiles(self,dir):
294  """Recursively list a file or directory on castor"""
295  return castortools.listFiles(dir,self.options.resursive)
296 
297  def listRootFiles(self,dir):
298  """filter out filenames so that they only contain root files"""
299  return [f for f in self.listFiles(dir) if f.lower().endswith('.root')]
300 
301  def sortByBaseDir(self,files):
302  """Sort files into directories"""
303  result = {}
304  for f in files:
305  dirname = os.path.dirname(f)
306  filename = os.path.basename(f)
307  if dirname not in result: result[dirname] = []
308  result[dirname].append(filename)
309  return result
310 
311 
312  def getParseNumberOfEvents(self,output):
313  """Parse the output of edmFileUtil to get the number of events found"""
314  tokens = output.split(' ')
315  result = -2
316  try:
317  result = int(tokens[-4])
318  except ValueError:
319  pass
320  return result
321 
322  def testFile(self,lfn):
323  stdout = subprocess.Popen(['edmFileUtil',lfn], stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0]
324  for error in ["Fatal Root Error","Could not open file","Not a valid collection"]:
325  if error in stdout: return (False,-1)
326  return (True,self.getParseNumberOfEvents(stdout))
327 
328  def testFileTimeOut(self,lfn, timeout):
329  @timed_out(timeout)
330  def tf(lfn):
331  try:
332  return self.testFile(lfn)
333  except TimedOutExc as e:
334  print >> sys.stderr, "ERROR:\tedmFileUtil timed out for lfn '%s' (%d)" % (lfn,timeout)
335  return (False,-1)
336  if timeout > 0:
337  return tf(lfn)
338  else:
339  return self.testFile(lfn)
340 
341 
342 
343 if __name__ == '__main__':
344 
345  pub = PublishToFileSystem('Test')
346  report = {'DateCreated':'123456','PathList':['/store/cmst3/user/wreece']}
347  pub.publish(report)
348  print pub.get('/store/cmst3/user/wreece')
def read(self, lfn, local=False)
static void * communicate(void *obj)
Definition: DQMNet.cc:1251
def __init__(self, dataset, options)
T min(T a, T b)
Definition: MathUtil.h:58
def timed_out(timeout)
Definition: timeout.py:23
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
def test(self, previous=None, timeout=-1)
def testFileTimeOut(self, lfn, timeout)
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger list("!*","!HLTx*"if it matches 2 triggers or more) will accept the event if all the matching triggers are FAIL.It will reject the event if any of the triggers are PASS or EXCEPTION(this matches the behavior of"!*"before the partial wildcard feature was incorporated).Triggers which are in the READY state are completely ignored.(READY should never be returned since the trigger paths have been run