CMS 3D CMS Logo

edmIntegrityCheck.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 """
3 Classes to check that a set of ROOT files are OK and publish a report
4 """
5 from __future__ import print_function
6 from __future__ import absolute_import
7 
8 from builtins import range
9 import datetime, fnmatch, json, os, shutil, sys, tempfile, time
10 import subprocess
11 
12 from . import eostools as castortools
13 from .timeout import timed_out, TimedOutExc
14 from .castorBaseDir import castorBaseDir
15 from .dataset import CMSDataset
16 import six
17 
19  """Write a report to storage"""
20 
21  def __init__(self, parent):
22  if isinstance(parent, type("")):
23  self.parent = parent
24  else:
25  self.parent = parent.__class__.__name__
26 
27  def publish(self, report):
28  """Publish a file"""
29  for path in report['PathList']:
30  _, name = tempfile.mkstemp('.txt', text=True)
31  json.dump(report, file(name,'w'), sort_keys=True, indent=4)
32 
33  fname = '%s_%s.txt' % (self.parent, report['DateCreated'])
34  #rename the file locally - TODO: This is a potential problem
35  nname = os.path.join(os.path.dirname(name),fname)
36  os.rename(name, nname)
37 
38  castor_path = castortools.lfnToCastor(path)
39  new_name = '%s/%s' % (castor_path, fname)
40  castortools.xrdcp(nname,path)
41  time.sleep(1)
42 
43  if castortools.fileExists(new_name):
44 
45  #castortools.move(old_name, new_name)
46  #castortools.chmod(new_name, '644')
47 
48  print("File published: '%s'" % castortools.castorToLFN(new_name))
49  os.remove(nname)
50  else:
51  pathhash = path.replace('/','.')
52  hashed_name = 'PublishToFileSystem-%s-%s' % (pathhash, fname)
53  shutil.move(nname, hashed_name)
54  print("Cannot write to directory '%s' - written to local file '%s' instead." % (castor_path, hashed_name), file=sys.stderr)
55 
56  def read(self, lfn, local = False):
57  """Reads a report from storage"""
58  if local:
59  cat = file(lfn).read()
60  else:
61  cat = castortools.cat(castortools.lfnToCastor(lfn))
62  #print "the cat is: ", cat
63  return json.loads(cat)
64 
65  def get(self, dir):
66  """Finds the lastest file and reads it"""
67  reg = '^%s_.*\.txt$' % self.parent
68  files = castortools.matchingFiles(dir, reg)
69  files = sorted([ (os.path.basename(f), f) for f in files])
70  if not files:
71  return None
72  return self.read(files[-1][1])
73 
74 
76 
77  def __init__(self, dataset, options):
78  if not dataset.startswith(os.sep):
79  dataset = os.sep + dataset
80 
81  self.dataset = dataset
82  self.options = options
83  self.topdir = castortools.lfnToCastor( castorBaseDir(user=options.user) )
84  self.directory = os.path.join(self.topdir, *self.dataset.split(os.sep))
85 
86  #event counters
87  self.eventsTotal = -1
88  self.eventsSeen = 0
89 
90  self.test_result = None
91 
92  def query(self):
93  """Query DAS to find out how many events are in the dataset"""
94  from .production_tasks import BaseDataset
95  base = BaseDataset(self.dataset, self.options.user, self.options)
96 
97  data = None
98  output = base.run({})
99  if 'Das' in output:
100  self.options.name = output['Name']
101  data = output['Das']
102 
103  if data is None:
104  raise Exception("Dataset '%s' not found in Das. Please check." % self.dataset)
105  #get the number of events in the dataset
106  self.eventsTotal = CMSDataset.findPrimaryDatasetEntries(self.options.name, self.options.min_run, self.options.max_run)
107 
108  def stripDuplicates(self):
109 
110  import re
111 
112  filemask = {}
113  for dirname, files in six.iteritems(self.test_result):
114  for name, status in six.iteritems(files):
115  fname = os.path.join(dirname, name)
116  filemask[fname] = status
117 
118  def isCrabFile(name):
119  _, fname = os.path.split(name)
120  base, _ = os.path.splitext(fname)
121  return re.match(".*_\d+_\d+_\w+$", base) is not None, base
122  def getCrabIndex(base):
123  tokens = base.split('_')
124  if len(tokens) > 2:
125  return (int(tokens[-3]), int(tokens[-2]))
126  return None
127 
128  files = {}
129 
130  mmin = 1000000000
131  mmax = -100000000
132  for f in filemask:
133  isCrab, base = isCrabFile(f)
134  if isCrab:
135  index = getCrabIndex(base)
136  if index is not None:
137  jobid, retry = index
138 
139  mmin = min(mmin, jobid)
140  mmax = max(mmax, jobid)
141  if jobid in files and filemask[f][0]:
142  files[jobid].append((retry, f))
143  elif filemask[f][0]:
144  files[jobid] = [(retry, f)]
145 
146  good_duplicates = {}
147  bad_jobs = set()
148  sum_dup = 0
149  for i in range(mmin, mmax+1):
150  if i in files:
151  duplicates = sorted(files[i])
152 
153  fname = duplicates[-1][1]
154  if len(duplicates) > 1:
155  for d in duplicates[:-1]:
156  good_duplicates[d[1]] = filemask[d[1]][1]
157  sum_dup += good_duplicates[d[1]]
158  else:
159  bad_jobs.add(i)
160  return good_duplicates, sorted(list(bad_jobs)), sum_dup
161 
162  def test(self, previous = None, timeout = -1):
163  if not castortools.fileExists(self.directory):
164  raise Exception("The top level directory '%s' for this dataset does not exist" % self.directory)
165 
166  self.query()
167 
168  test_results = {}
169 
170  #support updating to speed things up
171  prev_results = {}
172  if previous is not None:
173  for name, status in six.iteritems(previous['Files']):
174  prev_results[name] = status
175 
176  filesToTest = self.sortByBaseDir(self.listRootFiles(self.directory))
177  for dir, filelist in six.iteritems(filesToTest):
178  filemask = {}
179  #apply a UNIX wildcard if specified
180  filtered = filelist
181  if self.options.wildcard is not None:
182  filtered = fnmatch.filter(filelist, self.options.wildcard)
183  if not filtered:
184  print("Warning: The wildcard '%s' does not match any files in '%s'. Please check you are using quotes." % (self.options.wildcard,self.directory), file=sys.stderr)
185 
186  count = 0
187  for ff in filtered:
188  fname = os.path.join(dir, ff)
189  lfn = castortools.castorToLFN(fname)
190 
191  #try to update from the previous result if available
192  if lfn in prev_results and prev_results[lfn][0]:
193  if self.options.printout:
194  print('[%i/%i]\t Skipping %s...' % (count, len(filtered),fname), end=' ')
195  OK, num = prev_results[lfn]
196  else:
197  if self.options.printout:
198  print('[%i/%i]\t Checking %s...' % (count, len(filtered),fname), end=' ')
199  OK, num = self.testFileTimeOut(lfn, timeout)
200 
201  filemask[ff] = (OK,num)
202  if self.options.printout:
203  print((OK, num))
204  if OK:
205  self.eventsSeen += num
206  count += 1
207  test_results[castortools.castorToLFN(dir)] = filemask
208  self.test_result = test_results
209 
210  self.duplicates, self.bad_jobs, sum_dup = self.stripDuplicates()
211  #remove duplicate entries from the event count
212  self.eventsSeen -= sum_dup
213 
214  def report(self):
215 
216  if self.test_result is None:
217  self.test()
218 
219  print('DBS Dataset name: %s' % self.options.name)
220  print('Storage path: %s' % self.topdir)
221 
222  for dirname, files in six.iteritems(self.test_result):
223  print('Directory: %s' % dirname)
224  for name, status in six.iteritems(files):
225  fname = os.path.join(dirname, name)
226  if not fname in self.duplicates:
227  print('\t\t %s: %s' % (name, str(status)))
228  else:
229  print('\t\t %s: %s (Valid duplicate)' % (name, str(status)))
230  print('Total entries in DBS: %i' % self.eventsTotal)
231  print('Total entries in processed files: %i' % self.eventsSeen)
232  if self.eventsTotal>0:
233  print('Fraction of dataset processed: %f' % (self.eventsSeen/(1.*self.eventsTotal)))
234  else:
235  print('Total entries in DBS not determined')
236  if self.bad_jobs:
237  print("Bad Crab Jobs: '%s'" % ','.join([str(j) for j in self.bad_jobs]))
238 
239  def structured(self):
240 
241  if self.test_result is None:
242  self.test()
243 
244  totalGood = 0
245  totalBad = 0
246 
247  report = {'data':{},
248  'ReportVersion':3,
249  'PrimaryDataset':self.options.name,
250  'Name':self.dataset,
251  'PhysicsGroup':'CMG',
252  'Status':'VALID',
253  'TierList':[],
254  'AlgoList':[],
255  'RunList':[],
256  'PathList':[],
257  'Topdir':self.topdir,
258  'StageHost':self.stageHost(),
259  'CreatedBy':self.options.user,
260  'DateCreated':datetime.datetime.now().strftime("%s"),
261  'Files':{}}
262 
263  for dirname, files in six.iteritems(self.test_result):
264  report['PathList'].append(dirname)
265  for name, status in six.iteritems(files):
266  fname = os.path.join(dirname, name)
267  report['Files'][fname] = status
268  if status[0]:
269  totalGood += 1
270  else:
271  totalBad += 1
272 
273  report['PrimaryDatasetEntries'] = self.eventsTotal
274  if self.eventsTotal>0:
275  report['PrimaryDatasetFraction'] = (self.eventsSeen/(1.*self.eventsTotal))
276  else:
277  report['PrimaryDatasetFraction'] = -1.
278  report['FilesEntries'] = self.eventsSeen
279 
280  report['FilesGood'] = totalGood
281  report['FilesBad'] = totalBad
282  report['FilesCount'] = totalGood + totalBad
283 
284  report['BadJobs'] = self.bad_jobs
285  report['ValidDuplicates'] = self.duplicates
286 
287  report['MinRun'] = self.options.min_run
288  report['MaxRun'] = self.options.max_run
289 
290  return report
291 
292  def stageHost(self):
293  """Returns the CASTOR instance to use"""
294  return os.environ.get('STAGE_HOST','castorcms')
295 
296  def listFiles(self,dir):
297  """Recursively list a file or directory on castor"""
298  return castortools.listFiles(dir,self.options.resursive)
299 
300  def listRootFiles(self,dir):
301  """filter out filenames so that they only contain root files"""
302  return [f for f in self.listFiles(dir) if f.lower().endswith('.root')]
303 
304  def sortByBaseDir(self,files):
305  """Sort files into directories"""
306  result = {}
307  for f in files:
308  dirname = os.path.dirname(f)
309  filename = os.path.basename(f)
310  if dirname not in result: result[dirname] = []
311  result[dirname].append(filename)
312  return result
313 
314 
315  def getParseNumberOfEvents(self,output):
316  """Parse the output of edmFileUtil to get the number of events found"""
317  tokens = output.split(' ')
318  result = -2
319  try:
320  result = int(tokens[-4])
321  except ValueError:
322  pass
323  return result
324 
325  def testFile(self,lfn):
326  stdout = subprocess.Popen(['edmFileUtil',lfn], stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0]
327  for error in ["Fatal Root Error","Could not open file","Not a valid collection"]:
328  if error in stdout: return (False,-1)
329  return (True,self.getParseNumberOfEvents(stdout))
330 
331  def testFileTimeOut(self,lfn, timeout):
332  @timed_out(timeout)
333  def tf(lfn):
334  try:
335  return self.testFile(lfn)
336  except TimedOutExc as e:
337  print("ERROR:\tedmFileUtil timed out for lfn '%s' (%d)" % (lfn,timeout), file=sys.stderr)
338  return (False,-1)
339  if timeout > 0:
340  return tf(lfn)
341  else:
342  return self.testFile(lfn)
343 
344 
345 
346 if __name__ == '__main__':
347 
348  pub = PublishToFileSystem('Test')
349  report = {'DateCreated':'123456','PathList':['/store/cmst3/user/wreece']}
350  pub.publish(report)
351  print(pub.get('/store/cmst3/user/wreece'))
def read(self, lfn, local=False)
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
static void * communicate(void *obj)
Definition: DQMNet.cc:1045
def __init__(self, dataset, options)
T min(T a, T b)
Definition: MathUtil.h:58
def timed_out(timeout)
Definition: timeout.py:23
static std::string join(char **cmd)
Definition: RemoteFile.cc:17
def test(self, previous=None, timeout=-1)
#define str(s)
def testFileTimeOut(self, lfn, timeout)
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger list("!*","!HLTx*"if it matches 2 triggers or more) will accept the event if all the matching triggers are FAIL.It will reject the event if any of the triggers are PASS or EXCEPTION(this matches the behavior of"!*"before the partial wildcard feature was incorporated).Triggers which are in the READY state are completely ignored.(READY should never be returned since the trigger paths have been run