CMS 3D CMS Logo

List of all members | Public Member Functions | Public Attributes
edmIntegrityCheck.IntegrityCheck Class Reference
Inheritance diagram for edmIntegrityCheck.IntegrityCheck:

Public Member Functions

def __init__ (self, dataset, options)
 
def getParseNumberOfEvents (self, output)
 
def listFiles (self, dir)
 
def listRootFiles (self, dir)
 
def query (self)
 
def report (self)
 
def sortByBaseDir (self, files)
 
def stageHost (self)
 
def stripDuplicates (self)
 
def structured (self)
 
def test (self, previous=None, timeout=-1)
 
def testFile (self, lfn)
 
def testFileTimeOut (self, lfn, timeout)
 

Public Attributes

 dataset
 
 directory
 
 eventsSeen
 
 eventsTotal
 
 options
 
 test_result
 
 topdir
 

Detailed Description

Definition at line 75 of file edmIntegrityCheck.py.

Constructor & Destructor Documentation

◆ __init__()

def edmIntegrityCheck.IntegrityCheck.__init__ (   self,
  dataset,
  options 
)

Definition at line 77 of file edmIntegrityCheck.py.

77  def __init__(self, dataset, options):
78  if not dataset.startswith(os.sep):
79  dataset = os.sep + dataset
80 
81  self.dataset = dataset
82  self.options = options
83  self.topdir = castortools.lfnToCastor( castorBaseDir(user=options.user) )
84  self.directory = os.path.join(self.topdir, *self.dataset.split(os.sep))
85 
86  #event counters
87  self.eventsTotal = -1
88  self.eventsSeen = 0
89 
90  self.test_result = None
91 

Member Function Documentation

◆ getParseNumberOfEvents()

def edmIntegrityCheck.IntegrityCheck.getParseNumberOfEvents (   self,
  output 
)
Parse the output of edmFileUtil to get the number of events found

Definition at line 315 of file edmIntegrityCheck.py.

315  def getParseNumberOfEvents(self,output):
316  """Parse the output of edmFileUtil to get the number of events found"""
317  tokens = output.split(' ')
318  result = -2
319  try:
320  result = int(tokens[-4])
321  except ValueError:
322  pass
323  return result
324 

References edmIntegrityCheck.int.

Referenced by edmIntegrityCheck.IntegrityCheck.testFile().

◆ listFiles()

def edmIntegrityCheck.IntegrityCheck.listFiles (   self,
  dir 
)
Recursively list a file or directory on castor

Definition at line 296 of file edmIntegrityCheck.py.

296  def listFiles(self,dir):
297  """Recursively list a file or directory on castor"""
298  return castortools.listFiles(dir,self.options.resursive)
299 

References cmsswPreprocessor.CmsswPreprocessor.options, DTCalibrationWorker.DTCalibrationWorker.options, DTWorkflow.DTWorkflow.options, TestProcess.TestProcess.options, confdb.HLTProcess.options, edmIntegrityCheck.IntegrityCheck.options, Config.Process.options, and validateAlignments.ValidationJobMultiIOV.options.

Referenced by edmIntegrityCheck.IntegrityCheck.listRootFiles().

◆ listRootFiles()

def edmIntegrityCheck.IntegrityCheck.listRootFiles (   self,
  dir 
)
filter out filenames so that they only contain root files

Definition at line 300 of file edmIntegrityCheck.py.

300  def listRootFiles(self,dir):
301  """filter out filenames so that they only contain root files"""
302  return [f for f in self.listFiles(dir) if f.lower().endswith('.root')]
303 

References edmIntegrityCheck.IntegrityCheck.listFiles().

Referenced by edmIntegrityCheck.IntegrityCheck.test().

◆ query()

def edmIntegrityCheck.IntegrityCheck.query (   self)
Query DAS to find out how many events are in the dataset

Definition at line 92 of file edmIntegrityCheck.py.

92  def query(self):
93  """Query DAS to find out how many events are in the dataset"""
94  from .production_tasks import BaseDataset
95  base = BaseDataset(self.dataset, self.options.user, self.options)
96 
97  data = None
98  output = base.run({})
99  if 'Das' in output:
100  self.options.name = output['Name']
101  data = output['Das']
102 
103  if data is None:
104  raise Exception("Dataset '%s' not found in Das. Please check." % self.dataset)
105  #get the number of events in the dataset
106  self.eventsTotal = CMSDataset.findPrimaryDatasetEntries(self.options.name, self.options.min_run, self.options.max_run)
107 

References edmIntegrityCheck.IntegrityCheck.dataset, genericValidation.GenericValidationData.dataset, upgradeWorkflowComponents.UpgradeFragment.dataset, edmIntegrityCheck.IntegrityCheck.eventsTotal, StraightTrackAlignment.eventsTotal, cmsswPreprocessor.CmsswPreprocessor.options, DTCalibrationWorker.DTCalibrationWorker.options, DTWorkflow.DTWorkflow.options, TestProcess.TestProcess.options, confdb.HLTProcess.options, edmIntegrityCheck.IntegrityCheck.options, Config.Process.options, and validateAlignments.ValidationJobMultiIOV.options.

Referenced by production_tasks.BaseDataset.run(), and edmIntegrityCheck.IntegrityCheck.test().

◆ report()

def edmIntegrityCheck.IntegrityCheck.report (   self)

Definition at line 214 of file edmIntegrityCheck.py.

214  def report(self):
215 
216  if self.test_result is None:
217  self.test()
218 
219  print('DBS Dataset name: %s' % self.options.name)
220  print('Storage path: %s' % self.topdir)
221 
222  for dirname, files in six.iteritems(self.test_result):
223  print('Directory: %s' % dirname)
224  for name, status in six.iteritems(files):
225  fname = os.path.join(dirname, name)
226  if not fname in self.duplicates:
227  print('\t\t %s: %s' % (name, str(status)))
228  else:
229  print('\t\t %s: %s (Valid duplicate)' % (name, str(status)))
230  print('Total entries in DBS: %i' % self.eventsTotal)
231  print('Total entries in processed files: %i' % self.eventsSeen)
232  if self.eventsTotal>0:
233  print('Fraction of dataset processed: %f' % (self.eventsSeen/(1.*self.eventsTotal)))
234  else:
235  print('Total entries in DBS not determined')
236  if self.bad_jobs:
237  print("Bad Crab Jobs: '%s'" % ','.join([str(j) for j in self.bad_jobs]))
238 

References pat::GenericDuplicateRemover< Comparator, Arbitrator >.duplicates(), LumiList.LumiList.duplicates, edmIntegrityCheck.IntegrityCheck.eventsSeen, edmIntegrityCheck.IntegrityCheck.eventsTotal, StraightTrackAlignment.eventsTotal, reco::HitPattern.int ::test::TestHitPattern::test(), edm::RunningAverage.int ::test_average::running_average::test(), join(), cmsswPreprocessor.CmsswPreprocessor.options, DTCalibrationWorker.DTCalibrationWorker.options, DTWorkflow.DTWorkflow.options, TestProcess.TestProcess.options, confdb.HLTProcess.options, edmIntegrityCheck.IntegrityCheck.options, Config.Process.options, validateAlignments.ValidationJobMultiIOV.options, print(), str, value_test.ValueTestCase.test(), eventstfile_test.EventsTFileTestCase.test(), pat::Flags.test(), reco::PFBlock::Link.test, helper::Parser.test(), L1TkMuMantra.test(), helper::ScannerBase.test(), DiMuonHistograms.test, MiniFloatConverter::ReduceMantissaToNbitsRounding.test, XMLProcessor.test(), edmIntegrityCheck.IntegrityCheck.test(), edm::test::TestProcessor.test(), BitArray< N >.test(), DTTFBitArray< N >.test(), TwoObjectVariable< LHS, lLHS, RHS, lRHS, Calculator >::getObject.test, cond::SmallWORMDict.test::SmallWORMDict::test, edmIntegrityCheck.IntegrityCheck.test_result, and edmIntegrityCheck.IntegrityCheck.topdir.

◆ sortByBaseDir()

def edmIntegrityCheck.IntegrityCheck.sortByBaseDir (   self,
  files 
)
Sort files into directories

Definition at line 304 of file edmIntegrityCheck.py.

304  def sortByBaseDir(self,files):
305  """Sort files into directories"""
306  result = {}
307  for f in files:
308  dirname = os.path.dirname(f)
309  filename = os.path.basename(f)
310  if dirname not in result: result[dirname] = []
311  result[dirname].append(filename)
312  return result
313 
314 

References mps_setup.append.

Referenced by edmIntegrityCheck.IntegrityCheck.test().

◆ stageHost()

def edmIntegrityCheck.IntegrityCheck.stageHost (   self)
Returns the CASTOR instance to use

Definition at line 292 of file edmIntegrityCheck.py.

292  def stageHost(self):
293  """Returns the CASTOR instance to use"""
294  return os.environ.get('STAGE_HOST','castorcms')
295 

Referenced by edmIntegrityCheck.IntegrityCheck.structured().

◆ stripDuplicates()

def edmIntegrityCheck.IntegrityCheck.stripDuplicates (   self)

Definition at line 108 of file edmIntegrityCheck.py.

108  def stripDuplicates(self):
109 
110  import re
111 
112  filemask = {}
113  for dirname, files in six.iteritems(self.test_result):
114  for name, status in six.iteritems(files):
115  fname = os.path.join(dirname, name)
116  filemask[fname] = status
117 
118  def isCrabFile(name):
119  _, fname = os.path.split(name)
120  base, _ = os.path.splitext(fname)
121  return re.match(".*_\d+_\d+_\w+$", base) is not None, base
122  def getCrabIndex(base):
123  tokens = base.split('_')
124  if len(tokens) > 2:
125  return (int(tokens[-3]), int(tokens[-2]))
126  return None
127 
128  files = {}
129 
130  mmin = 1000000000
131  mmax = -100000000
132  for f in filemask:
133  isCrab, base = isCrabFile(f)
134  if isCrab:
135  index = getCrabIndex(base)
136  if index is not None:
137  jobid, retry = index
138 
139  mmin = min(mmin, jobid)
140  mmax = max(mmax, jobid)
141  if jobid in files and filemask[f][0]:
142  files[jobid].append((retry, f))
143  elif filemask[f][0]:
144  files[jobid] = [(retry, f)]
145 
146  good_duplicates = {}
147  bad_jobs = set()
148  sum_dup = 0
149  for i in range(mmin, mmax+1):
150  if i in files:
151  duplicates = sorted(files[i])
152 
153  fname = duplicates[-1][1]
154  if len(duplicates) > 1:
155  for d in duplicates[:-1]:
156  good_duplicates[d[1]] = filemask[d[1]][1]
157  sum_dup += good_duplicates[d[1]]
158  else:
159  bad_jobs.add(i)
160  return good_duplicates, sorted(list(bad_jobs)), sum_dup
161 

References mps_setup.append, edmIntegrityCheck.int, SiStripPI.max, min(), FastTimerService_cff.range, and edmIntegrityCheck.IntegrityCheck.test_result.

Referenced by edmIntegrityCheck.IntegrityCheck.test().

◆ structured()

def edmIntegrityCheck.IntegrityCheck.structured (   self)

Definition at line 239 of file edmIntegrityCheck.py.

239  def structured(self):
240 
241  if self.test_result is None:
242  self.test()
243 
244  totalGood = 0
245  totalBad = 0
246 
247  report = {'data':{},
248  'ReportVersion':3,
249  'PrimaryDataset':self.options.name,
250  'Name':self.dataset,
251  'PhysicsGroup':'CMG',
252  'Status':'VALID',
253  'TierList':[],
254  'AlgoList':[],
255  'RunList':[],
256  'PathList':[],
257  'Topdir':self.topdir,
258  'StageHost':self.stageHost(),
259  'CreatedBy':self.options.user,
260  'DateCreated':datetime.datetime.now().strftime("%s"),
261  'Files':{}}
262 
263  for dirname, files in six.iteritems(self.test_result):
264  report['PathList'].append(dirname)
265  for name, status in six.iteritems(files):
266  fname = os.path.join(dirname, name)
267  report['Files'][fname] = status
268  if status[0]:
269  totalGood += 1
270  else:
271  totalBad += 1
272 
273  report['PrimaryDatasetEntries'] = self.eventsTotal
274  if self.eventsTotal>0:
275  report['PrimaryDatasetFraction'] = (self.eventsSeen/(1.*self.eventsTotal))
276  else:
277  report['PrimaryDatasetFraction'] = -1.
278  report['FilesEntries'] = self.eventsSeen
279 
280  report['FilesGood'] = totalGood
281  report['FilesBad'] = totalBad
282  report['FilesCount'] = totalGood + totalBad
283 
284  report['BadJobs'] = self.bad_jobs
285  report['ValidDuplicates'] = self.duplicates
286 
287  report['MinRun'] = self.options.min_run
288  report['MaxRun'] = self.options.max_run
289 
290  return report
291 

References mps_setup.append, edmIntegrityCheck.IntegrityCheck.dataset, genericValidation.GenericValidationData.dataset, upgradeWorkflowComponents.UpgradeFragment.dataset, pat::GenericDuplicateRemover< Comparator, Arbitrator >.duplicates(), LumiList.LumiList.duplicates, edmIntegrityCheck.IntegrityCheck.eventsSeen, edmIntegrityCheck.IntegrityCheck.eventsTotal, StraightTrackAlignment.eventsTotal, reco::HitPattern.int ::test::TestHitPattern::test(), edm::RunningAverage.int ::test_average::running_average::test(), cmsswPreprocessor.CmsswPreprocessor.options, DTCalibrationWorker.DTCalibrationWorker.options, DTWorkflow.DTWorkflow.options, TestProcess.TestProcess.options, confdb.HLTProcess.options, edmIntegrityCheck.IntegrityCheck.options, Config.Process.options, validateAlignments.ValidationJobMultiIOV.options, edmIntegrityCheck.IntegrityCheck.stageHost(), value_test.ValueTestCase.test(), eventstfile_test.EventsTFileTestCase.test(), pat::Flags.test(), reco::PFBlock::Link.test, helper::Parser.test(), L1TkMuMantra.test(), helper::ScannerBase.test(), DiMuonHistograms.test, MiniFloatConverter::ReduceMantissaToNbitsRounding.test, XMLProcessor.test(), edmIntegrityCheck.IntegrityCheck.test(), edm::test::TestProcessor.test(), BitArray< N >.test(), DTTFBitArray< N >.test(), TwoObjectVariable< LHS, lLHS, RHS, lRHS, Calculator >::getObject.test, cond::SmallWORMDict.test::SmallWORMDict::test, edmIntegrityCheck.IntegrityCheck.test_result, and edmIntegrityCheck.IntegrityCheck.topdir.

◆ test()

def edmIntegrityCheck.IntegrityCheck.test (   self,
  previous = None,
  timeout = -1 
)

Definition at line 162 of file edmIntegrityCheck.py.

162  def test(self, previous = None, timeout = -1):
163  if not castortools.fileExists(self.directory):
164  raise Exception("The top level directory '%s' for this dataset does not exist" % self.directory)
165 
166  self.query()
167 
168  test_results = {}
169 
170  #support updating to speed things up
171  prev_results = {}
172  if previous is not None:
173  for name, status in six.iteritems(previous['Files']):
174  prev_results[name] = status
175 
176  filesToTest = self.sortByBaseDir(self.listRootFiles(self.directory))
177  for dir, filelist in six.iteritems(filesToTest):
178  filemask = {}
179  #apply a UNIX wildcard if specified
180  filtered = filelist
181  if self.options.wildcard is not None:
182  filtered = fnmatch.filter(filelist, self.options.wildcard)
183  if not filtered:
184  print("Warning: The wildcard '%s' does not match any files in '%s'. Please check you are using quotes." % (self.options.wildcard,self.directory), file=sys.stderr)
185 
186  count = 0
187  for ff in filtered:
188  fname = os.path.join(dir, ff)
189  lfn = castortools.castorToLFN(fname)
190 
191  #try to update from the previous result if available
192  if lfn in prev_results and prev_results[lfn][0]:
193  if self.options.printout:
194  print('[%i/%i]\t Skipping %s...' % (count, len(filtered),fname), end=' ')
195  OK, num = prev_results[lfn]
196  else:
197  if self.options.printout:
198  print('[%i/%i]\t Checking %s...' % (count, len(filtered),fname), end=' ')
199  OK, num = self.testFileTimeOut(lfn, timeout)
200 
201  filemask[ff] = (OK,num)
202  if self.options.printout:
203  print((OK, num))
204  if OK:
205  self.eventsSeen += num
206  count += 1
207  test_results[castortools.castorToLFN(dir)] = filemask
208  self.test_result = test_results
209 
210  self.duplicates, self.bad_jobs, sum_dup = self.stripDuplicates()
211  #remove duplicate entries from the event count
212  self.eventsSeen -= sum_dup
213 

References Book.directory, edmIntegrityCheck.IntegrityCheck.directory, pat::GenericDuplicateRemover< Comparator, Arbitrator >.duplicates(), LumiList.LumiList.duplicates, edmIntegrityCheck.IntegrityCheck.eventsSeen, edmIntegrityCheck.IntegrityCheck.listRootFiles(), cmsswPreprocessor.CmsswPreprocessor.options, DTCalibrationWorker.DTCalibrationWorker.options, DTWorkflow.DTWorkflow.options, TestProcess.TestProcess.options, confdb.HLTProcess.options, edmIntegrityCheck.IntegrityCheck.options, Config.Process.options, validateAlignments.ValidationJobMultiIOV.options, print(), DbQuery.query, edmIntegrityCheck.IntegrityCheck.query(), confdbOfflineConverter.OfflineConverter.query(), upload_popcon.HTTP.query(), uploadConditions.HTTP.query(), edmIntegrityCheck.IntegrityCheck.sortByBaseDir(), edmIntegrityCheck.IntegrityCheck.stripDuplicates(), edmIntegrityCheck.IntegrityCheck.test_result, and edmIntegrityCheck.IntegrityCheck.testFileTimeOut().

Referenced by edmIntegrityCheck.IntegrityCheck.report(), and edmIntegrityCheck.IntegrityCheck.structured().

◆ testFile()

def edmIntegrityCheck.IntegrityCheck.testFile (   self,
  lfn 
)

Definition at line 325 of file edmIntegrityCheck.py.

325  def testFile(self,lfn):
326  stdout = subprocess.Popen(['edmFileUtil',lfn], stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0]
327  for error in ["Fatal Root Error","Could not open file","Not a valid collection"]:
328  if error in stdout: return (False,-1)
329  return (True,self.getParseNumberOfEvents(stdout))
330 

References communicate(), and edmIntegrityCheck.IntegrityCheck.getParseNumberOfEvents().

Referenced by edmIntegrityCheck.IntegrityCheck.testFileTimeOut().

◆ testFileTimeOut()

def edmIntegrityCheck.IntegrityCheck.testFileTimeOut (   self,
  lfn,
  timeout 
)

Definition at line 331 of file edmIntegrityCheck.py.

331  def testFileTimeOut(self,lfn, timeout):
332  @timed_out(timeout)
333  def tf(lfn):
334  try:
335  return self.testFile(lfn)
336  except TimedOutExc as e:
337  print("ERROR:\tedmFileUtil timed out for lfn '%s' (%d)" % (lfn,timeout), file=sys.stderr)
338  return (False,-1)
339  if timeout > 0:
340  return tf(lfn)
341  else:
342  return self.testFile(lfn)
343 
344 
345 

References print(), AlignmentIORootBase.testFile(), edmIntegrityCheck.IntegrityCheck.testFile(), and timeout.timed_out().

Referenced by edmIntegrityCheck.IntegrityCheck.test().

Member Data Documentation

◆ dataset

edmIntegrityCheck.IntegrityCheck.dataset

◆ directory

edmIntegrityCheck.IntegrityCheck.directory

◆ eventsSeen

edmIntegrityCheck.IntegrityCheck.eventsSeen

◆ eventsTotal

edmIntegrityCheck.IntegrityCheck.eventsTotal

◆ options

edmIntegrityCheck.IntegrityCheck.options

◆ test_result

edmIntegrityCheck.IntegrityCheck.test_result

◆ topdir

edmIntegrityCheck.IntegrityCheck.topdir
FastTimerService_cff.range
range
Definition: FastTimerService_cff.py:34
min
T min(T a, T b)
Definition: MathUtil.h:58
join
static std::string join(char **cmd)
Definition: RemoteFile.cc:17
castorBaseDir
Definition: castorBaseDir.py:1
eostools.listFiles
def listFiles(path, rec=False, full_info=False)
Definition: eostools.py:300
edmIntegrityCheck.int
int
Definition: edmIntegrityCheck.py:27
ctpps_dqm_sourceclient-live_cfg.test
test
Definition: ctpps_dqm_sourceclient-live_cfg.py:7
submitPVValidationJobs.split
def split(sequence, size)
Definition: submitPVValidationJobs.py:352
str
#define str(s)
Definition: TestProcessor.cc:52
communicate
static void * communicate(void *obj)
Definition: DQMNet.cc:1049
print
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:46
SiStripPI::max
Definition: SiStripPayloadInspectorHelper.h:169
timeout.timed_out
def timed_out(timeout)
Definition: timeout.py:23
Exception
mps_setup.append
append
Definition: mps_setup.py:85
edmIntegrityCheck.report
report
Definition: edmIntegrityCheck.py:349
contentValuesFiles.query
query
Definition: contentValuesFiles.py:38