CMS 3D CMS Logo

edmIntegrityCheck.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 """
3 Classes to check that a set of ROOT files are OK and publish a report
4 """
5 from __future__ import print_function
6 from __future__ import absolute_import
7 
8 from builtins import range
9 import datetime, fnmatch, json, os, shutil, sys, tempfile, time
10 import subprocess
11 
12 from . import eostools as castortools
13 from .timeout import timed_out, TimedOutExc
14 from .castorBaseDir import castorBaseDir
15 from .dataset import CMSDataset
16 
18  """Write a report to storage"""
19 
20  def __init__(self, parent):
21  if isinstance(parent, type("")):
22  self.parent = parent
23  else:
24  self.parent = parent.__class__.__name__
25 
26  def publish(self, report):
27  """Publish a file"""
28  for path in report['PathList']:
29  _, name = tempfile.mkstemp('.txt', text=True)
30  json.dump(report, file(name,'w'), sort_keys=True, indent=4)
31 
32  fname = '%s_%s.txt' % (self.parent, report['DateCreated'])
33  #rename the file locally - TODO: This is a potential problem
34  nname = os.path.join(os.path.dirname(name),fname)
35  os.rename(name, nname)
36 
37  castor_path = castortools.lfnToCastor(path)
38  new_name = '%s/%s' % (castor_path, fname)
39  castortools.xrdcp(nname,path)
40  time.sleep(1)
41 
42  if castortools.fileExists(new_name):
43 
44  #castortools.move(old_name, new_name)
45  #castortools.chmod(new_name, '644')
46 
47  print("File published: '%s'" % castortools.castorToLFN(new_name))
48  os.remove(nname)
49  else:
50  pathhash = path.replace('/','.')
51  hashed_name = 'PublishToFileSystem-%s-%s' % (pathhash, fname)
52  shutil.move(nname, hashed_name)
53  print("Cannot write to directory '%s' - written to local file '%s' instead." % (castor_path, hashed_name), file=sys.stderr)
54 
55  def read(self, lfn, local = False):
56  """Reads a report from storage"""
57  if local:
58  cat = file(lfn).read()
59  else:
60  cat = castortools.cat(castortools.lfnToCastor(lfn))
61  #print "the cat is: ", cat
62  return json.loads(cat)
63 
64  def get(self, dir):
65  """Finds the lastest file and reads it"""
66  reg = '^%s_.*\.txt$' % self.parent
67  files = castortools.matchingFiles(dir, reg)
68  files = sorted([ (os.path.basename(f), f) for f in files])
69  if not files:
70  return None
71  return self.read(files[-1][1])
72 
73 
75 
76  def __init__(self, dataset, options):
77  if not dataset.startswith(os.sep):
78  dataset = os.sep + dataset
79 
80  self.dataset = dataset
81  self.options = options
82  self.topdir = castortools.lfnToCastor( castorBaseDir(user=options.user) )
83  self.directory = os.path.join(self.topdir, *self.dataset.split(os.sep))
84 
85  #event counters
86  self.eventsTotal = -1
87  self.eventsSeen = 0
88 
89  self.test_result = None
90 
91  def query(self):
92  """Query DAS to find out how many events are in the dataset"""
93  from .production_tasks import BaseDataset
94  base = BaseDataset(self.dataset, self.options.user, self.options)
95 
96  data = None
97  output = base.run({})
98  if 'Das' in output:
99  self.options.name = output['Name']
100  data = output['Das']
101 
102  if data is None:
103  raise Exception("Dataset '%s' not found in Das. Please check." % self.dataset)
104  #get the number of events in the dataset
105  self.eventsTotal = CMSDataset.findPrimaryDatasetEntries(self.options.name, self.options.min_run, self.options.max_run)
106 
107  def stripDuplicates(self):
108 
109  import re
110 
111  filemask = {}
112  for dirname, files in self.test_result.items():
113  for name, status in files.items():
114  fname = os.path.join(dirname, name)
115  filemask[fname] = status
116 
117  def isCrabFile(name):
118  _, fname = os.path.split(name)
119  base, _ = os.path.splitext(fname)
120  return re.match(".*_\d+_\d+_\w+$", base) is not None, base
121  def getCrabIndex(base):
122  tokens = base.split('_')
123  if len(tokens) > 2:
124  return (int(tokens[-3]), int(tokens[-2]))
125  return None
126 
127  files = {}
128 
129  mmin = 1000000000
130  mmax = -100000000
131  for f in filemask:
132  isCrab, base = isCrabFile(f)
133  if isCrab:
134  index = getCrabIndex(base)
135  if index is not None:
136  jobid, retry = index
137 
138  mmin = min(mmin, jobid)
139  mmax = max(mmax, jobid)
140  if jobid in files and filemask[f][0]:
141  files[jobid].append((retry, f))
142  elif filemask[f][0]:
143  files[jobid] = [(retry, f)]
144 
145  good_duplicates = {}
146  bad_jobs = set()
147  sum_dup = 0
148  for i in range(mmin, mmax+1):
149  if i in files:
150  duplicates = sorted(files[i])
151 
152  fname = duplicates[-1][1]
153  if len(duplicates) > 1:
154  for d in duplicates[:-1]:
155  good_duplicates[d[1]] = filemask[d[1]][1]
156  sum_dup += good_duplicates[d[1]]
157  else:
158  bad_jobs.add(i)
159  return good_duplicates, sorted(list(bad_jobs)), sum_dup
160 
161  def test(self, previous = None, timeout = -1):
162  if not castortools.fileExists(self.directory):
163  raise Exception("The top level directory '%s' for this dataset does not exist" % self.directory)
164 
165  self.query()
166 
167  test_results = {}
168 
169  #support updating to speed things up
170  prev_results = {}
171  if previous is not None:
172  for name, status in previous['Files'].items():
173  prev_results[name] = status
174 
175  filesToTest = self.sortByBaseDir(self.listRootFiles(self.directory))
176  for dir, filelist in filesToTest.items():
177  filemask = {}
178  #apply a UNIX wildcard if specified
179  filtered = filelist
180  if self.options.wildcard is not None:
181  filtered = fnmatch.filter(filelist, self.options.wildcard)
182  if not filtered:
183  print("Warning: The wildcard '%s' does not match any files in '%s'. Please check you are using quotes." % (self.options.wildcard,self.directory), file=sys.stderr)
184 
185  count = 0
186  for ff in filtered:
187  fname = os.path.join(dir, ff)
188  lfn = castortools.castorToLFN(fname)
189 
190  #try to update from the previous result if available
191  if lfn in prev_results and prev_results[lfn][0]:
192  if self.options.printout:
193  print('[%i/%i]\t Skipping %s...' % (count, len(filtered),fname), end=' ')
194  OK, num = prev_results[lfn]
195  else:
196  if self.options.printout:
197  print('[%i/%i]\t Checking %s...' % (count, len(filtered),fname), end=' ')
198  OK, num = self.testFileTimeOut(lfn, timeout)
199 
200  filemask[ff] = (OK,num)
201  if self.options.printout:
202  print((OK, num))
203  if OK:
204  self.eventsSeen += num
205  count += 1
206  test_results[castortools.castorToLFN(dir)] = filemask
207  self.test_result = test_results
208 
209  self.duplicates, self.bad_jobs, sum_dup = self.stripDuplicates()
210  #remove duplicate entries from the event count
211  self.eventsSeen -= sum_dup
212 
213  def report(self):
214 
215  if self.test_result is None:
216  self.test()
217 
218  print('DBS Dataset name: %s' % self.options.name)
219  print('Storage path: %s' % self.topdir)
220 
221  for dirname, files in self.test_result.items():
222  print('Directory: %s' % dirname)
223  for name, status in files.items():
224  fname = os.path.join(dirname, name)
225  if not fname in self.duplicates:
226  print('\t\t %s: %s' % (name, str(status)))
227  else:
228  print('\t\t %s: %s (Valid duplicate)' % (name, str(status)))
229  print('Total entries in DBS: %i' % self.eventsTotal)
230  print('Total entries in processed files: %i' % self.eventsSeen)
231  if self.eventsTotal>0:
232  print('Fraction of dataset processed: %f' % (self.eventsSeen/(1.*self.eventsTotal)))
233  else:
234  print('Total entries in DBS not determined')
235  if self.bad_jobs:
236  print("Bad Crab Jobs: '%s'" % ','.join([str(j) for j in self.bad_jobs]))
237 
238  def structured(self):
239 
240  if self.test_result is None:
241  self.test()
242 
243  totalGood = 0
244  totalBad = 0
245 
246  report = {'data':{},
247  'ReportVersion':3,
248  'PrimaryDataset':self.options.name,
249  'Name':self.dataset,
250  'PhysicsGroup':'CMG',
251  'Status':'VALID',
252  'TierList':[],
253  'AlgoList':[],
254  'RunList':[],
255  'PathList':[],
256  'Topdir':self.topdir,
257  'StageHost':self.stageHost(),
258  'CreatedBy':self.options.user,
259  'DateCreated':datetime.datetime.now().strftime("%s"),
260  'Files':{}}
261 
262  for dirname, files in self.test_result.items():
263  report['PathList'].append(dirname)
264  for name, status in files.items():
265  fname = os.path.join(dirname, name)
266  report['Files'][fname] = status
267  if status[0]:
268  totalGood += 1
269  else:
270  totalBad += 1
271 
272  report['PrimaryDatasetEntries'] = self.eventsTotal
273  if self.eventsTotal>0:
274  report['PrimaryDatasetFraction'] = (self.eventsSeen/(1.*self.eventsTotal))
275  else:
276  report['PrimaryDatasetFraction'] = -1.
277  report['FilesEntries'] = self.eventsSeen
278 
279  report['FilesGood'] = totalGood
280  report['FilesBad'] = totalBad
281  report['FilesCount'] = totalGood + totalBad
282 
283  report['BadJobs'] = self.bad_jobs
284  report['ValidDuplicates'] = self.duplicates
285 
286  report['MinRun'] = self.options.min_run
287  report['MaxRun'] = self.options.max_run
288 
289  return report
290 
291  def stageHost(self):
292  """Returns the CASTOR instance to use"""
293  return os.environ.get('STAGE_HOST','castorcms')
294 
295  def listFiles(self,dir):
296  """Recursively list a file or directory on castor"""
297  return castortools.listFiles(dir,self.options.resursive)
298 
299  def listRootFiles(self,dir):
300  """filter out filenames so that they only contain root files"""
301  return [f for f in self.listFiles(dir) if f.lower().endswith('.root')]
302 
303  def sortByBaseDir(self,files):
304  """Sort files into directories"""
305  result = {}
306  for f in files:
307  dirname = os.path.dirname(f)
308  filename = os.path.basename(f)
309  if dirname not in result: result[dirname] = []
310  result[dirname].append(filename)
311  return result
312 
313 
314  def getParseNumberOfEvents(self,output):
315  """Parse the output of edmFileUtil to get the number of events found"""
316  tokens = output.split(' ')
317  result = -2
318  try:
319  result = int(tokens[-4])
320  except ValueError:
321  pass
322  return result
323 
324  def testFile(self,lfn):
325  stdout = subprocess.Popen(['edmFileUtil',lfn], stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0]
326  for error in ["Fatal Root Error","Could not open file","Not a valid collection"]:
327  if error in stdout: return (False,-1)
328  return (True,self.getParseNumberOfEvents(stdout))
329 
330  def testFileTimeOut(self,lfn, timeout):
331  @timed_out(timeout)
332  def tf(lfn):
333  try:
334  return self.testFile(lfn)
335  except TimedOutExc as e:
336  print("ERROR:\tedmFileUtil timed out for lfn '%s' (%d)" % (lfn,timeout), file=sys.stderr)
337  return (False,-1)
338  if timeout > 0:
339  return tf(lfn)
340  else:
341  return self.testFile(lfn)
342 
343 
344 
345 if __name__ == '__main__':
346 
347  pub = PublishToFileSystem('Test')
348  report = {'DateCreated':'123456','PathList':['/store/cmst3/user/wreece']}
349  pub.publish(report)
350  print(pub.get('/store/cmst3/user/wreece'))
def read(self, lfn, local=False)
static void * communicate(void *obj)
Definition: DQMNet.cc:1057
def __init__(self, dataset, options)
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
def timed_out(timeout)
Definition: timeout.py:23
static std::string join(char **cmd)
Definition: RemoteFile.cc:19
def test(self, previous=None, timeout=-1)
#define str(s)
def testFileTimeOut(self, lfn, timeout)