CMS 3D CMS Logo

List of all members | Public Member Functions | Public Attributes
edmIntegrityCheck.IntegrityCheck Class Reference
Inheritance diagram for edmIntegrityCheck.IntegrityCheck:

Public Member Functions

def __init__ (self, dataset, options)
 
def getParseNumberOfEvents (self, output)
 
def listFiles (self, dir)
 
def listRootFiles (self, dir)
 
def query (self)
 
def report (self)
 
def sortByBaseDir (self, files)
 
def stageHost (self)
 
def stripDuplicates (self)
 
def structured (self)
 
def test (self, previous=None, timeout=-1)
 
def testFile (self, lfn)
 
def testFileTimeOut (self, lfn, timeout)
 

Public Attributes

 dataset
 
 directory
 
 eventsSeen
 
 eventsTotal
 
 options
 
 test_result
 
 topdir
 

Detailed Description

Definition at line 74 of file edmIntegrityCheck.py.

Constructor & Destructor Documentation

◆ __init__()

def edmIntegrityCheck.IntegrityCheck.__init__ (   self,
  dataset,
  options 
)

Definition at line 76 of file edmIntegrityCheck.py.

76  def __init__(self, dataset, options):
77  if not dataset.startswith(os.sep):
78  dataset = os.sep + dataset
79 
80  self.dataset = dataset
81  self.options = options
82  self.topdir = castortools.lfnToCastor( castorBaseDir(user=options.user) )
83  self.directory = os.path.join(self.topdir, *self.dataset.split(os.sep))
84 
85  #event counters
86  self.eventsTotal = -1
87  self.eventsSeen = 0
88 
89  self.test_result = None
90 

Member Function Documentation

◆ getParseNumberOfEvents()

def edmIntegrityCheck.IntegrityCheck.getParseNumberOfEvents (   self,
  output 
)
Parse the output of edmFileUtil to get the number of events found

Definition at line 314 of file edmIntegrityCheck.py.

314  def getParseNumberOfEvents(self,output):
315  """Parse the output of edmFileUtil to get the number of events found"""
316  tokens = output.split(' ')
317  result = -2
318  try:
319  result = int(tokens[-4])
320  except ValueError:
321  pass
322  return result
323 

References edmIntegrityCheck.int.

Referenced by edmIntegrityCheck.IntegrityCheck.testFile().

◆ listFiles()

def edmIntegrityCheck.IntegrityCheck.listFiles (   self,
  dir 
)
Recursively list a file or directory on castor

Definition at line 295 of file edmIntegrityCheck.py.

295  def listFiles(self,dir):
296  """Recursively list a file or directory on castor"""
297  return castortools.listFiles(dir,self.options.resursive)
298 

References cmsswPreprocessor.CmsswPreprocessor.options, DTCalibrationWorker.DTCalibrationWorker.options, DTWorkflow.DTWorkflow.options, TestProcess.TestProcess.options, confdb.HLTProcess.options, edmIntegrityCheck.IntegrityCheck.options, Config.Process.options, and validateAlignments.ValidationJobMultiIOV.options.

Referenced by edmIntegrityCheck.IntegrityCheck.listRootFiles().

◆ listRootFiles()

def edmIntegrityCheck.IntegrityCheck.listRootFiles (   self,
  dir 
)
filter out filenames so that they only contain root files

Definition at line 299 of file edmIntegrityCheck.py.

299  def listRootFiles(self,dir):
300  """filter out filenames so that they only contain root files"""
301  return [f for f in self.listFiles(dir) if f.lower().endswith('.root')]
302 

References edmIntegrityCheck.IntegrityCheck.listFiles().

Referenced by edmIntegrityCheck.IntegrityCheck.test().

◆ query()

def edmIntegrityCheck.IntegrityCheck.query (   self)
Query DAS to find out how many events are in the dataset

Definition at line 91 of file edmIntegrityCheck.py.

91  def query(self):
92  """Query DAS to find out how many events are in the dataset"""
93  from .production_tasks import BaseDataset
94  base = BaseDataset(self.dataset, self.options.user, self.options)
95 
96  data = None
97  output = base.run({})
98  if 'Das' in output:
99  self.options.name = output['Name']
100  data = output['Das']
101 
102  if data is None:
103  raise Exception("Dataset '%s' not found in Das. Please check." % self.dataset)
104  #get the number of events in the dataset
105  self.eventsTotal = CMSDataset.findPrimaryDatasetEntries(self.options.name, self.options.min_run, self.options.max_run)
106 

References edmIntegrityCheck.IntegrityCheck.dataset, genericValidation.GenericValidationData.dataset, upgradeWorkflowComponents.UpgradeFragment.dataset, DiLeptonHelp::Counts.eventsTotal, edmIntegrityCheck.IntegrityCheck.eventsTotal, StraightTrackAlignment.eventsTotal, cmsswPreprocessor.CmsswPreprocessor.options, DTCalibrationWorker.DTCalibrationWorker.options, DTWorkflow.DTWorkflow.options, TestProcess.TestProcess.options, confdb.HLTProcess.options, edmIntegrityCheck.IntegrityCheck.options, Config.Process.options, and validateAlignments.ValidationJobMultiIOV.options.

Referenced by production_tasks.BaseDataset.run(), and edmIntegrityCheck.IntegrityCheck.test().

◆ report()

def edmIntegrityCheck.IntegrityCheck.report (   self)

Definition at line 213 of file edmIntegrityCheck.py.

213  def report(self):
214 
215  if self.test_result is None:
216  self.test()
217 
218  print('DBS Dataset name: %s' % self.options.name)
219  print('Storage path: %s' % self.topdir)
220 
221  for dirname, files in self.test_result.items():
222  print('Directory: %s' % dirname)
223  for name, status in files.items():
224  fname = os.path.join(dirname, name)
225  if not fname in self.duplicates:
226  print('\t\t %s: %s' % (name, str(status)))
227  else:
228  print('\t\t %s: %s (Valid duplicate)' % (name, str(status)))
229  print('Total entries in DBS: %i' % self.eventsTotal)
230  print('Total entries in processed files: %i' % self.eventsSeen)
231  if self.eventsTotal>0:
232  print('Fraction of dataset processed: %f' % (self.eventsSeen/(1.*self.eventsTotal)))
233  else:
234  print('Total entries in DBS not determined')
235  if self.bad_jobs:
236  print("Bad Crab Jobs: '%s'" % ','.join([str(j) for j in self.bad_jobs]))
237 

References pat::GenericDuplicateRemover< Comparator, Arbitrator >.duplicates(), LumiList.LumiList.duplicates, edmIntegrityCheck.IntegrityCheck.eventsSeen, DiLeptonHelp::Counts.eventsTotal, edmIntegrityCheck.IntegrityCheck.eventsTotal, StraightTrackAlignment.eventsTotal, reco::HitPattern.int ::test::TestHitPattern::test(), edm::RunningAverage.int ::test_average::running_average::test(), mps_monitormerge.items, join(), cmsswPreprocessor.CmsswPreprocessor.options, DTCalibrationWorker.DTCalibrationWorker.options, DTWorkflow.DTWorkflow.options, TestProcess.TestProcess.options, confdb.HLTProcess.options, edmIntegrityCheck.IntegrityCheck.options, Config.Process.options, validateAlignments.ValidationJobMultiIOV.options, print(), str, value_test.ValueTestCase.test(), eventstfile_test.EventsTFileTestCase.test(), pat::Flags.test(), reco::PFBlock::Link.test, helper::Parser.test(), L1TkMuMantra.test(), helper::ScannerBase.test(), DiMuonHistograms.test, MiniFloatConverter::ReduceMantissaToNbitsRounding.test, XMLProcessor.test(), edmIntegrityCheck.IntegrityCheck.test(), edm::test::TestProcessor.test(), DTTFBitArray< N >.test(), BitArray< N >.test(), TwoObjectVariable< LHS, lLHS, RHS, lRHS, Calculator >::getObject.test, cond::SmallWORMDict.test::SmallWORMDict::test, edmIntegrityCheck.IntegrityCheck.test_result, and edmIntegrityCheck.IntegrityCheck.topdir.

◆ sortByBaseDir()

def edmIntegrityCheck.IntegrityCheck.sortByBaseDir (   self,
  files 
)
Sort files into directories

Definition at line 303 of file edmIntegrityCheck.py.

303  def sortByBaseDir(self,files):
304  """Sort files into directories"""
305  result = {}
306  for f in files:
307  dirname = os.path.dirname(f)
308  filename = os.path.basename(f)
309  if dirname not in result: result[dirname] = []
310  result[dirname].append(filename)
311  return result
312 
313 

References mps_setup.append.

Referenced by edmIntegrityCheck.IntegrityCheck.test().

◆ stageHost()

def edmIntegrityCheck.IntegrityCheck.stageHost (   self)
Returns the CASTOR instance to use

Definition at line 291 of file edmIntegrityCheck.py.

291  def stageHost(self):
292  """Returns the CASTOR instance to use"""
293  return os.environ.get('STAGE_HOST','castorcms')
294 

Referenced by edmIntegrityCheck.IntegrityCheck.structured().

◆ stripDuplicates()

def edmIntegrityCheck.IntegrityCheck.stripDuplicates (   self)

Definition at line 107 of file edmIntegrityCheck.py.

107  def stripDuplicates(self):
108 
109  import re
110 
111  filemask = {}
112  for dirname, files in self.test_result.items():
113  for name, status in files.items():
114  fname = os.path.join(dirname, name)
115  filemask[fname] = status
116 
117  def isCrabFile(name):
118  _, fname = os.path.split(name)
119  base, _ = os.path.splitext(fname)
120  return re.match(".*_\d+_\d+_\w+$", base) is not None, base
121  def getCrabIndex(base):
122  tokens = base.split('_')
123  if len(tokens) > 2:
124  return (int(tokens[-3]), int(tokens[-2]))
125  return None
126 
127  files = {}
128 
129  mmin = 1000000000
130  mmax = -100000000
131  for f in filemask:
132  isCrab, base = isCrabFile(f)
133  if isCrab:
134  index = getCrabIndex(base)
135  if index is not None:
136  jobid, retry = index
137 
138  mmin = min(mmin, jobid)
139  mmax = max(mmax, jobid)
140  if jobid in files and filemask[f][0]:
141  files[jobid].append((retry, f))
142  elif filemask[f][0]:
143  files[jobid] = [(retry, f)]
144 
145  good_duplicates = {}
146  bad_jobs = set()
147  sum_dup = 0
148  for i in range(mmin, mmax+1):
149  if i in files:
150  duplicates = sorted(files[i])
151 
152  fname = duplicates[-1][1]
153  if len(duplicates) > 1:
154  for d in duplicates[:-1]:
155  good_duplicates[d[1]] = filemask[d[1]][1]
156  sum_dup += good_duplicates[d[1]]
157  else:
158  bad_jobs.add(i)
159  return good_duplicates, sorted(list(bad_jobs)), sum_dup
160 

References mps_setup.append, edmIntegrityCheck.int, mps_monitormerge.items, SiStripPI.max, min(), FastTimerService_cff.range, and edmIntegrityCheck.IntegrityCheck.test_result.

Referenced by edmIntegrityCheck.IntegrityCheck.test().

◆ structured()

def edmIntegrityCheck.IntegrityCheck.structured (   self)

Definition at line 238 of file edmIntegrityCheck.py.

238  def structured(self):
239 
240  if self.test_result is None:
241  self.test()
242 
243  totalGood = 0
244  totalBad = 0
245 
246  report = {'data':{},
247  'ReportVersion':3,
248  'PrimaryDataset':self.options.name,
249  'Name':self.dataset,
250  'PhysicsGroup':'CMG',
251  'Status':'VALID',
252  'TierList':[],
253  'AlgoList':[],
254  'RunList':[],
255  'PathList':[],
256  'Topdir':self.topdir,
257  'StageHost':self.stageHost(),
258  'CreatedBy':self.options.user,
259  'DateCreated':datetime.datetime.now().strftime("%s"),
260  'Files':{}}
261 
262  for dirname, files in self.test_result.items():
263  report['PathList'].append(dirname)
264  for name, status in files.items():
265  fname = os.path.join(dirname, name)
266  report['Files'][fname] = status
267  if status[0]:
268  totalGood += 1
269  else:
270  totalBad += 1
271 
272  report['PrimaryDatasetEntries'] = self.eventsTotal
273  if self.eventsTotal>0:
274  report['PrimaryDatasetFraction'] = (self.eventsSeen/(1.*self.eventsTotal))
275  else:
276  report['PrimaryDatasetFraction'] = -1.
277  report['FilesEntries'] = self.eventsSeen
278 
279  report['FilesGood'] = totalGood
280  report['FilesBad'] = totalBad
281  report['FilesCount'] = totalGood + totalBad
282 
283  report['BadJobs'] = self.bad_jobs
284  report['ValidDuplicates'] = self.duplicates
285 
286  report['MinRun'] = self.options.min_run
287  report['MaxRun'] = self.options.max_run
288 
289  return report
290 

References mps_setup.append, edmIntegrityCheck.IntegrityCheck.dataset, genericValidation.GenericValidationData.dataset, upgradeWorkflowComponents.UpgradeFragment.dataset, pat::GenericDuplicateRemover< Comparator, Arbitrator >.duplicates(), LumiList.LumiList.duplicates, edmIntegrityCheck.IntegrityCheck.eventsSeen, DiLeptonHelp::Counts.eventsTotal, edmIntegrityCheck.IntegrityCheck.eventsTotal, StraightTrackAlignment.eventsTotal, reco::HitPattern.int ::test::TestHitPattern::test(), edm::RunningAverage.int ::test_average::running_average::test(), mps_monitormerge.items, cmsswPreprocessor.CmsswPreprocessor.options, DTCalibrationWorker.DTCalibrationWorker.options, DTWorkflow.DTWorkflow.options, TestProcess.TestProcess.options, confdb.HLTProcess.options, edmIntegrityCheck.IntegrityCheck.options, Config.Process.options, validateAlignments.ValidationJobMultiIOV.options, edmIntegrityCheck.IntegrityCheck.stageHost(), value_test.ValueTestCase.test(), eventstfile_test.EventsTFileTestCase.test(), pat::Flags.test(), reco::PFBlock::Link.test, helper::Parser.test(), L1TkMuMantra.test(), helper::ScannerBase.test(), DiMuonHistograms.test, MiniFloatConverter::ReduceMantissaToNbitsRounding.test, XMLProcessor.test(), edmIntegrityCheck.IntegrityCheck.test(), edm::test::TestProcessor.test(), DTTFBitArray< N >.test(), BitArray< N >.test(), TwoObjectVariable< LHS, lLHS, RHS, lRHS, Calculator >::getObject.test, cond::SmallWORMDict.test::SmallWORMDict::test, edmIntegrityCheck.IntegrityCheck.test_result, and edmIntegrityCheck.IntegrityCheck.topdir.

◆ test()

def edmIntegrityCheck.IntegrityCheck.test (   self,
  previous = None,
  timeout = -1 
)

Definition at line 161 of file edmIntegrityCheck.py.

161  def test(self, previous = None, timeout = -1):
162  if not castortools.fileExists(self.directory):
163  raise Exception("The top level directory '%s' for this dataset does not exist" % self.directory)
164 
165  self.query()
166 
167  test_results = {}
168 
169  #support updating to speed things up
170  prev_results = {}
171  if previous is not None:
172  for name, status in previous['Files'].items():
173  prev_results[name] = status
174 
175  filesToTest = self.sortByBaseDir(self.listRootFiles(self.directory))
176  for dir, filelist in filesToTest.items():
177  filemask = {}
178  #apply a UNIX wildcard if specified
179  filtered = filelist
180  if self.options.wildcard is not None:
181  filtered = fnmatch.filter(filelist, self.options.wildcard)
182  if not filtered:
183  print("Warning: The wildcard '%s' does not match any files in '%s'. Please check you are using quotes." % (self.options.wildcard,self.directory), file=sys.stderr)
184 
185  count = 0
186  for ff in filtered:
187  fname = os.path.join(dir, ff)
188  lfn = castortools.castorToLFN(fname)
189 
190  #try to update from the previous result if available
191  if lfn in prev_results and prev_results[lfn][0]:
192  if self.options.printout:
193  print('[%i/%i]\t Skipping %s...' % (count, len(filtered),fname), end=' ')
194  OK, num = prev_results[lfn]
195  else:
196  if self.options.printout:
197  print('[%i/%i]\t Checking %s...' % (count, len(filtered),fname), end=' ')
198  OK, num = self.testFileTimeOut(lfn, timeout)
199 
200  filemask[ff] = (OK,num)
201  if self.options.printout:
202  print((OK, num))
203  if OK:
204  self.eventsSeen += num
205  count += 1
206  test_results[castortools.castorToLFN(dir)] = filemask
207  self.test_result = test_results
208 
209  self.duplicates, self.bad_jobs, sum_dup = self.stripDuplicates()
210  #remove duplicate entries from the event count
211  self.eventsSeen -= sum_dup
212 

References Book.directory, edmIntegrityCheck.IntegrityCheck.directory, pat::GenericDuplicateRemover< Comparator, Arbitrator >.duplicates(), LumiList.LumiList.duplicates, edmIntegrityCheck.IntegrityCheck.eventsSeen, mps_monitormerge.items, edmIntegrityCheck.IntegrityCheck.listRootFiles(), cmsswPreprocessor.CmsswPreprocessor.options, DTCalibrationWorker.DTCalibrationWorker.options, DTWorkflow.DTWorkflow.options, TestProcess.TestProcess.options, confdb.HLTProcess.options, edmIntegrityCheck.IntegrityCheck.options, Config.Process.options, validateAlignments.ValidationJobMultiIOV.options, print(), DbQuery.query, edmIntegrityCheck.IntegrityCheck.query(), confdbOfflineConverter.OfflineConverter.query(), upload_popcon.HTTP.query(), uploadConditions.HTTP.query(), edmIntegrityCheck.IntegrityCheck.sortByBaseDir(), edmIntegrityCheck.IntegrityCheck.stripDuplicates(), edmIntegrityCheck.IntegrityCheck.test_result, and edmIntegrityCheck.IntegrityCheck.testFileTimeOut().

Referenced by edmIntegrityCheck.IntegrityCheck.report(), and edmIntegrityCheck.IntegrityCheck.structured().

◆ testFile()

def edmIntegrityCheck.IntegrityCheck.testFile (   self,
  lfn 
)

Definition at line 324 of file edmIntegrityCheck.py.

324  def testFile(self,lfn):
325  stdout = subprocess.Popen(['edmFileUtil',lfn], stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0]
326  for error in ["Fatal Root Error","Could not open file","Not a valid collection"]:
327  if error in stdout: return (False,-1)
328  return (True,self.getParseNumberOfEvents(stdout))
329 

References communicate(), and edmIntegrityCheck.IntegrityCheck.getParseNumberOfEvents().

Referenced by edmIntegrityCheck.IntegrityCheck.testFileTimeOut().

◆ testFileTimeOut()

def edmIntegrityCheck.IntegrityCheck.testFileTimeOut (   self,
  lfn,
  timeout 
)

Definition at line 330 of file edmIntegrityCheck.py.

330  def testFileTimeOut(self,lfn, timeout):
331  @timed_out(timeout)
332  def tf(lfn):
333  try:
334  return self.testFile(lfn)
335  except TimedOutExc as e:
336  print("ERROR:\tedmFileUtil timed out for lfn '%s' (%d)" % (lfn,timeout), file=sys.stderr)
337  return (False,-1)
338  if timeout > 0:
339  return tf(lfn)
340  else:
341  return self.testFile(lfn)
342 
343 
344 

References print(), AlignmentIORootBase.testFile(), edmIntegrityCheck.IntegrityCheck.testFile(), and timeout.timed_out().

Referenced by edmIntegrityCheck.IntegrityCheck.test().

Member Data Documentation

◆ dataset

edmIntegrityCheck.IntegrityCheck.dataset

◆ directory

edmIntegrityCheck.IntegrityCheck.directory

◆ eventsSeen

edmIntegrityCheck.IntegrityCheck.eventsSeen

◆ eventsTotal

edmIntegrityCheck.IntegrityCheck.eventsTotal

◆ options

edmIntegrityCheck.IntegrityCheck.options

◆ test_result

edmIntegrityCheck.IntegrityCheck.test_result

◆ topdir

edmIntegrityCheck.IntegrityCheck.topdir
FastTimerService_cff.range
range
Definition: FastTimerService_cff.py:34
min
T min(T a, T b)
Definition: MathUtil.h:58
join
static std::string join(char **cmd)
Definition: RemoteFile.cc:17
mps_monitormerge.items
list items
Definition: mps_monitormerge.py:29
castorBaseDir
Definition: castorBaseDir.py:1
eostools.listFiles
def listFiles(path, rec=False, full_info=False)
Definition: eostools.py:300
edmIntegrityCheck.int
int
Definition: edmIntegrityCheck.py:27
ctpps_dqm_sourceclient-live_cfg.test
test
Definition: ctpps_dqm_sourceclient-live_cfg.py:7
submitPVValidationJobs.split
def split(sequence, size)
Definition: submitPVValidationJobs.py:352
str
#define str(s)
Definition: TestProcessor.cc:53
communicate
static void * communicate(void *obj)
Definition: DQMNet.cc:1049
print
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:46
SiStripPI::max
Definition: SiStripPayloadInspectorHelper.h:169
timeout.timed_out
def timed_out(timeout)
Definition: timeout.py:23
Exception
mps_setup.append
append
Definition: mps_setup.py:85
edmIntegrityCheck.report
report
Definition: edmIntegrityCheck.py:348
contentValuesFiles.query
query
Definition: contentValuesFiles.py:38