CMS 3D CMS Logo

edmIntegrityCheck.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 """
3 Classes to check that a set of ROOT files are OK and publish a report
4 """
5 from __future__ import print_function
6 
7 import datetime, fnmatch, json, os, shutil, sys, tempfile, time
8 import subprocess
9 
10 import eostools as castortools
11 from timeout import timed_out, TimedOutExc
12 from castorBaseDir import castorBaseDir
13 from dataset import CMSDataset
14 import six
15 
17  """Write a report to storage"""
18 
19  def __init__(self, parent):
20  if isinstance(parent, type("")):
21  self.parent = parent
22  else:
23  self.parent = parent.__class__.__name__
24 
25  def publish(self, report):
26  """Publish a file"""
27  for path in report['PathList']:
28  _, name = tempfile.mkstemp('.txt', text=True)
29  json.dump(report, file(name,'w'), sort_keys=True, indent=4)
30 
31  fname = '%s_%s.txt' % (self.parent, report['DateCreated'])
32  #rename the file locally - TODO: This is a potential problem
33  nname = os.path.join(os.path.dirname(name),fname)
34  os.rename(name, nname)
35 
36  castor_path = castortools.lfnToCastor(path)
37  new_name = '%s/%s' % (castor_path, fname)
38  castortools.xrdcp(nname,path)
39  time.sleep(1)
40 
41  if castortools.fileExists(new_name):
42 
43  #castortools.move(old_name, new_name)
44  #castortools.chmod(new_name, '644')
45 
46  print("File published: '%s'" % castortools.castorToLFN(new_name))
47  os.remove(nname)
48  else:
49  pathhash = path.replace('/','.')
50  hashed_name = 'PublishToFileSystem-%s-%s' % (pathhash, fname)
51  shutil.move(nname, hashed_name)
52  print("Cannot write to directory '%s' - written to local file '%s' instead." % (castor_path, hashed_name), file=sys.stderr)
53 
54  def read(self, lfn, local = False):
55  """Reads a report from storage"""
56  if local:
57  cat = file(lfn).read()
58  else:
59  cat = castortools.cat(castortools.lfnToCastor(lfn))
60  #print "the cat is: ", cat
61  return json.loads(cat)
62 
63  def get(self, dir):
64  """Finds the lastest file and reads it"""
65  reg = '^%s_.*\.txt$' % self.parent
66  files = castortools.matchingFiles(dir, reg)
67  files = sorted([ (os.path.basename(f), f) for f in files])
68  if not files:
69  return None
70  return self.read(files[-1][1])
71 
72 
74 
75  def __init__(self, dataset, options):
76  if not dataset.startswith(os.sep):
77  dataset = os.sep + dataset
78 
79  self.dataset = dataset
80  self.options = options
81  self.topdir = castortools.lfnToCastor( castorBaseDir(user=options.user) )
82  self.directory = os.path.join(self.topdir, *self.dataset.split(os.sep))
83 
84  #event counters
85  self.eventsTotal = -1
86  self.eventsSeen = 0
87 
88  self.test_result = None
89 
90  def query(self):
91  """Query DAS to find out how many events are in the dataset"""
92  from production_tasks import BaseDataset
93  base = BaseDataset(self.dataset, self.options.user, self.options)
94 
95  data = None
96  output = base.run({})
97  if 'Das' in output:
98  self.options.name = output['Name']
99  data = output['Das']
100 
101  if data is None:
102  raise Exception("Dataset '%s' not found in Das. Please check." % self.dataset)
103  #get the number of events in the dataset
104  self.eventsTotal = CMSDataset.findPrimaryDatasetEntries(self.options.name, self.options.min_run, self.options.max_run)
105 
106  def stripDuplicates(self):
107 
108  import re
109 
110  filemask = {}
111  for dirname, files in six.iteritems(self.test_result):
112  for name, status in six.iteritems(files):
113  fname = os.path.join(dirname, name)
114  filemask[fname] = status
115 
116  def isCrabFile(name):
117  _, fname = os.path.split(name)
118  base, _ = os.path.splitext(fname)
119  return re.match(".*_\d+_\d+_\w+$", base) is not None, base
120  def getCrabIndex(base):
121  tokens = base.split('_')
122  if len(tokens) > 2:
123  return (int(tokens[-3]), int(tokens[-2]))
124  return None
125 
126  files = {}
127 
128  mmin = 1000000000
129  mmax = -100000000
130  for f in filemask:
131  isCrab, base = isCrabFile(f)
132  if isCrab:
133  index = getCrabIndex(base)
134  if index is not None:
135  jobid, retry = index
136 
137  mmin = min(mmin, jobid)
138  mmax = max(mmax, jobid)
139  if jobid in files and filemask[f][0]:
140  files[jobid].append((retry, f))
141  elif filemask[f][0]:
142  files[jobid] = [(retry, f)]
143 
144  good_duplicates = {}
145  bad_jobs = set()
146  sum_dup = 0
147  for i in xrange(mmin, mmax+1):
148  if i in files:
149  duplicates = sorted(files[i])
150 
151  fname = duplicates[-1][1]
152  if len(duplicates) > 1:
153  for d in duplicates[:-1]:
154  good_duplicates[d[1]] = filemask[d[1]][1]
155  sum_dup += good_duplicates[d[1]]
156  else:
157  bad_jobs.add(i)
158  return good_duplicates, sorted(list(bad_jobs)), sum_dup
159 
160  def test(self, previous = None, timeout = -1):
161  if not castortools.fileExists(self.directory):
162  raise Exception("The top level directory '%s' for this dataset does not exist" % self.directory)
163 
164  self.query()
165 
166  test_results = {}
167 
168  #support updating to speed things up
169  prev_results = {}
170  if previous is not None:
171  for name, status in six.iteritems(previous['Files']):
172  prev_results[name] = status
173 
174  filesToTest = self.sortByBaseDir(self.listRootFiles(self.directory))
175  for dir, filelist in six.iteritems(filesToTest):
176  filemask = {}
177  #apply a UNIX wildcard if specified
178  filtered = filelist
179  if self.options.wildcard is not None:
180  filtered = fnmatch.filter(filelist, self.options.wildcard)
181  if not filtered:
182  print("Warning: The wildcard '%s' does not match any files in '%s'. Please check you are using quotes." % (self.options.wildcard,self.directory), file=sys.stderr)
183 
184  count = 0
185  for ff in filtered:
186  fname = os.path.join(dir, ff)
187  lfn = castortools.castorToLFN(fname)
188 
189  #try to update from the previous result if available
190  if lfn in prev_results and prev_results[lfn][0]:
191  if self.options.printout:
192  print('[%i/%i]\t Skipping %s...' % (count, len(filtered),fname), end=' ')
193  OK, num = prev_results[lfn]
194  else:
195  if self.options.printout:
196  print('[%i/%i]\t Checking %s...' % (count, len(filtered),fname), end=' ')
197  OK, num = self.testFileTimeOut(lfn, timeout)
198 
199  filemask[ff] = (OK,num)
200  if self.options.printout:
201  print((OK, num))
202  if OK:
203  self.eventsSeen += num
204  count += 1
205  test_results[castortools.castorToLFN(dir)] = filemask
206  self.test_result = test_results
207 
208  self.duplicates, self.bad_jobs, sum_dup = self.stripDuplicates()
209  #remove duplicate entries from the event count
210  self.eventsSeen -= sum_dup
211 
212  def report(self):
213 
214  if self.test_result is None:
215  self.test()
216 
217  print('DBS Dataset name: %s' % self.options.name)
218  print('Storage path: %s' % self.topdir)
219 
220  for dirname, files in six.iteritems(self.test_result):
221  print('Directory: %s' % dirname)
222  for name, status in six.iteritems(files):
223  fname = os.path.join(dirname, name)
224  if not fname in self.duplicates:
225  print('\t\t %s: %s' % (name, str(status)))
226  else:
227  print('\t\t %s: %s (Valid duplicate)' % (name, str(status)))
228  print('Total entries in DBS: %i' % self.eventsTotal)
229  print('Total entries in processed files: %i' % self.eventsSeen)
230  if self.eventsTotal>0:
231  print('Fraction of dataset processed: %f' % (self.eventsSeen/(1.*self.eventsTotal)))
232  else:
233  print('Total entries in DBS not determined')
234  if self.bad_jobs:
235  print("Bad Crab Jobs: '%s'" % ','.join([str(j) for j in self.bad_jobs]))
236 
237  def structured(self):
238 
239  if self.test_result is None:
240  self.test()
241 
242  totalGood = 0
243  totalBad = 0
244 
245  report = {'data':{},
246  'ReportVersion':3,
247  'PrimaryDataset':self.options.name,
248  'Name':self.dataset,
249  'PhysicsGroup':'CMG',
250  'Status':'VALID',
251  'TierList':[],
252  'AlgoList':[],
253  'RunList':[],
254  'PathList':[],
255  'Topdir':self.topdir,
256  'StageHost':self.stageHost(),
257  'CreatedBy':self.options.user,
258  'DateCreated':datetime.datetime.now().strftime("%s"),
259  'Files':{}}
260 
261  for dirname, files in six.iteritems(self.test_result):
262  report['PathList'].append(dirname)
263  for name, status in six.iteritems(files):
264  fname = os.path.join(dirname, name)
265  report['Files'][fname] = status
266  if status[0]:
267  totalGood += 1
268  else:
269  totalBad += 1
270 
271  report['PrimaryDatasetEntries'] = self.eventsTotal
272  if self.eventsTotal>0:
273  report['PrimaryDatasetFraction'] = (self.eventsSeen/(1.*self.eventsTotal))
274  else:
275  report['PrimaryDatasetFraction'] = -1.
276  report['FilesEntries'] = self.eventsSeen
277 
278  report['FilesGood'] = totalGood
279  report['FilesBad'] = totalBad
280  report['FilesCount'] = totalGood + totalBad
281 
282  report['BadJobs'] = self.bad_jobs
283  report['ValidDuplicates'] = self.duplicates
284 
285  report['MinRun'] = self.options.min_run
286  report['MaxRun'] = self.options.max_run
287 
288  return report
289 
290  def stageHost(self):
291  """Returns the CASTOR instance to use"""
292  return os.environ.get('STAGE_HOST','castorcms')
293 
294  def listFiles(self,dir):
295  """Recursively list a file or directory on castor"""
296  return castortools.listFiles(dir,self.options.resursive)
297 
298  def listRootFiles(self,dir):
299  """filter out filenames so that they only contain root files"""
300  return [f for f in self.listFiles(dir) if f.lower().endswith('.root')]
301 
302  def sortByBaseDir(self,files):
303  """Sort files into directories"""
304  result = {}
305  for f in files:
306  dirname = os.path.dirname(f)
307  filename = os.path.basename(f)
308  if dirname not in result: result[dirname] = []
309  result[dirname].append(filename)
310  return result
311 
312 
313  def getParseNumberOfEvents(self,output):
314  """Parse the output of edmFileUtil to get the number of events found"""
315  tokens = output.split(' ')
316  result = -2
317  try:
318  result = int(tokens[-4])
319  except ValueError:
320  pass
321  return result
322 
323  def testFile(self,lfn):
324  stdout = subprocess.Popen(['edmFileUtil',lfn], stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0]
325  for error in ["Fatal Root Error","Could not open file","Not a valid collection"]:
326  if error in stdout: return (False,-1)
327  return (True,self.getParseNumberOfEvents(stdout))
328 
329  def testFileTimeOut(self,lfn, timeout):
330  @timed_out(timeout)
331  def tf(lfn):
332  try:
333  return self.testFile(lfn)
334  except TimedOutExc as e:
335  print("ERROR:\tedmFileUtil timed out for lfn '%s' (%d)" % (lfn,timeout), file=sys.stderr)
336  return (False,-1)
337  if timeout > 0:
338  return tf(lfn)
339  else:
340  return self.testFile(lfn)
341 
342 
343 
344 if __name__ == '__main__':
345 
346  pub = PublishToFileSystem('Test')
347  report = {'DateCreated':'123456','PathList':['/store/cmst3/user/wreece']}
348  pub.publish(report)
349  print(pub.get('/store/cmst3/user/wreece'))
def read(self, lfn, local=False)
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:65
static void * communicate(void *obj)
Definition: DQMNet.cc:1251
def __init__(self, dataset, options)
T min(T a, T b)
Definition: MathUtil.h:58
def timed_out(timeout)
Definition: timeout.py:23
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
def test(self, previous=None, timeout=-1)
#define str(s)
def testFileTimeOut(self, lfn, timeout)
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger list("!*","!HLTx*"if it matches 2 triggers or more) will accept the event if all the matching triggers are FAIL.It will reject the event if any of the triggers are PASS or EXCEPTION(this matches the behavior of"!*"before the partial wildcard feature was incorporated).Triggers which are in the READY state are completely ignored.(READY should never be returned since the trigger paths have been run