CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
RootPrimaryFileSequence.cc
Go to the documentation of this file.
1 /*----------------------------------------------------------------------
2 ----------------------------------------------------------------------*/
3 #include "DuplicateChecker.h"
4 #include "InputFile.h"
5 #include "PoolSource.h"
6 #include "RootFile.h"
8 #include "RootTree.h"
9 
20 
21 namespace edm {
25  : RootInputFileSequence(pset, catalog),
26  input_(input),
27  firstFile_(true),
28  branchesMustMatch_(BranchDescription::Permissive),
29  orderedProcessHistoryIDs_(),
30  eventSkipperByID_(EventSkipperByID::create(pset).release()),
31  initialNumberOfEventsToSkip_(pset.getUntrackedParameter<unsigned int>("skipEvents")),
32  noRunLumiSort_(pset.getUntrackedParameter<bool>("noRunLumiSort")),
33  noEventSort_(noRunLumiSort_ ? true : pset.getUntrackedParameter<bool>("noEventSort")),
34  treeCacheSize_(noEventSort_ ? pset.getUntrackedParameter<unsigned int>("cacheSize") : 0U),
35  duplicateChecker_(new DuplicateChecker(pset)),
36  usingGoToEvent_(false),
37  enablePrefetching_(false),
38  enforceGUIDInFileName_(pset.getUntrackedParameter<bool>("enforceGUIDInFileName")) {
39  // The SiteLocalConfig controls the TTreeCache size and the prefetching settings.
41  if (pSLC.isAvailable()) {
42  if (treeCacheSize_ != 0U && pSLC->sourceTTreeCacheSize()) {
43  treeCacheSize_ = *(pSLC->sourceTTreeCacheSize());
44  }
45  enablePrefetching_ = pSLC->enablePrefetching();
46  }
47 
48  std::string branchesMustMatch =
49  pset.getUntrackedParameter<std::string>("branchesMustMatch", std::string("permissive"));
50  if (branchesMustMatch == std::string("strict"))
52 
53  // Prestage the files
56  }
57  // Open the first file.
60  if (rootFile())
61  break;
62  }
63  if (rootFile()) {
64  input_.productRegistryUpdate().updateFromInput(rootFile()->productRegistry()->productList());
67  }
68  }
69  }
70 
72 
74 
75  std::shared_ptr<FileBlock> RootPrimaryFileSequence::readFile_() {
76  std::shared_ptr<FileBlock> fileBlock;
77  if (firstFile_) {
78  firstFile_ = false;
79  // Usually the first input file will already be open
80  if (!rootFile()) {
82  }
83  } else if (goToEventInNewFile_) {
84  goToEventInNewFile_ = false;
86  initFile(false);
87  assert(rootFile());
88  bool found = rootFile()->goToEvent(goToEventID_);
89  assert(found);
90  } else if (skipIntoNewFile_) {
91  skipIntoNewFile_ = false;
93  initFile(false);
94  assert(rootFile());
95  if (skipToOffsetInFinalFile_ < 0) {
96  rootFile()->setToLastEntry();
97  }
98  bool atEnd = rootFile()->skipEvents(skipToOffsetInFinalFile_);
99  assert(!atEnd && skipToOffsetInFinalFile_ == 0);
100  } else {
101  if (!nextFile()) {
102  // handle case with last file bad and
103  // skipBadFiles true
104  fb_ = fileBlock;
105  return fileBlock;
106  }
107  }
108  if (!rootFile()) {
109  fileBlock = std::make_shared<FileBlock>();
110  fb_ = fileBlock;
111  return fileBlock;
112  }
113  fileBlock = rootFile()->createFileBlock();
114  fb_ = fileBlock;
115  return fileBlock;
116  }
117 
119  // close the currently open file, if any, and delete the RootFile object.
120  if (rootFile()) {
121  auto sentry = std::make_unique<InputSource::FileCloseSentry>(input_, lfn());
122  rootFile()->close();
123  if (duplicateChecker_)
124  duplicateChecker_->inputFileClosed();
125  rootFile().reset();
126  }
127  }
128 
130  // If we are not duplicate checking across files and we are not using random access to find events,
131  // then we can delete the IndexIntoFile for the file we are closing.
132  // If we can't delete all of it, then we can delete the parts we do not need.
133  bool deleteIndexIntoFile = !usingGoToEvent_ && !(duplicateChecker_ && duplicateChecker_->checkingAllFiles() &&
134  !duplicateChecker_->checkDisabled());
135  initTheFile(skipBadFiles, deleteIndexIntoFile, &input_, "primaryFiles", InputType::Primary);
136  }
137 
139  size_t currentIndexIntoFile = sequenceNumberOfFile();
140  return std::make_shared<RootFile>(fileNames()[0],
142  logicalFileName(),
143  filePtr,
146  remainingEvents(),
148  input_.nStreams(),
152  input_.runHelper(),
154  noEventSort_,
158  input_.processBlockHelper().get(),
160  nullptr, // associationsFromSecondary
165  currentIndexIntoFile,
172  }
173 
175  do {
176  if (!noMoreFiles())
177  setAtNextFile();
178  if (noMoreFiles()) {
179  return false;
180  }
181 
183  if (rootFile()) {
184  break;
185  }
186  // If we are not skipping bad files and the file
187  // open failed, then initFile should have thrown
189  } while (true);
190 
191  // make sure the new product registry is compatible with the main one
192  std::string mergeInfo =
193  input_.productRegistryUpdate().merge(*rootFile()->productRegistry(), fileNames()[0], branchesMustMatch_);
194  if (!mergeInfo.empty()) {
195  throw Exception(errors::MismatchedInputFiles, "RootPrimaryFileSequence::nextFile()") << mergeInfo;
196  }
197  return true;
198  }
199 
201  if (atFirstFile()) {
202  return false;
203  }
205 
206  initFile(false);
207 
208  if (rootFile()) {
209  // make sure the new product registry is compatible to the main one
210  std::string mergeInfo =
211  input_.productRegistryUpdate().merge(*rootFile()->productRegistry(), fileNames()[0], branchesMustMatch_);
212  if (!mergeInfo.empty()) {
213  throw Exception(errors::MismatchedInputFiles, "RootPrimaryFileSequence::previousEvent()") << mergeInfo;
214  }
215  }
216  if (rootFile())
217  rootFile()->setToLastEntry();
218  return true;
219  }
220 
223  EventNumber_t& event) {
224  if (noMoreFiles() || skipToStop_) {
225  skipToStop_ = false;
226  return InputSource::IsStop;
227  }
229  return InputSource::IsFile;
230  }
231  if (rootFile()) {
232  IndexIntoFile::EntryType entryType = rootFile()->getNextItemType(run, lumi, event);
233  if (entryType == IndexIntoFile::kEvent) {
234  return InputSource::IsEvent;
235  } else if (entryType == IndexIntoFile::kLumi) {
236  return InputSource::IsLumi;
237  } else if (entryType == IndexIntoFile::kRun) {
238  return InputSource::IsRun;
239  }
240  assert(entryType == IndexIntoFile::kEnd);
241  }
242  if (atLastFile()) {
243  return InputSource::IsStop;
244  }
245  return InputSource::IsFile;
246  }
247 
248  // Rewind to before the first event that was read.
250  if (!atFirstFile()) {
251  closeFile();
252  setAtFirstFile();
253  }
254  if (!rootFile()) {
255  initFile(false);
256  }
257  rewindFile();
258  firstFile_ = true;
259  goToEventInNewFile_ = false;
260  skipIntoNewFile_ = false;
261  skipToStop_ = false;
262  if (rootFile()) {
263  if (initialNumberOfEventsToSkip_ != 0) {
265  }
266  }
267  }
268 
269  // Rewind to the beginning of the current file
271  if (rootFile())
272  rootFile()->rewind();
273  }
274 
275  // Advance "offset" events. Offset will be positive.
277  assert(rootFile());
278  assert(offset >= 0);
279  while (offset != 0) {
280  bool atEnd = rootFile()->skipEvents(offset);
281  if ((offset > 0 || atEnd) && !nextFile()) {
282  return;
283  }
284  }
285  }
286 
287  // Advance "offset" events. Offset can be positive or negative (or zero).
289  assert(rootFile());
290 
291  bool atEnd = rootFile()->skipEvents(offset);
292  if (!atEnd && offset == 0) {
293  // successfully completed skip within current file
294  return;
295  }
296 
297  // Return, if without closing the current file we know the skip cannot be completed
298  skipToStop_ = false;
299  if (offset > 0 || atEnd) {
300  if (atLastFile() || noMoreFiles()) {
301  skipToStop_ = true;
302  return;
303  }
304  }
305  if (offset < 0 && atFirstFile()) {
306  skipToStop_ = true;
307  return;
308  }
309 
310  // Save the current file and position so that we can restore them
311  size_t const originalFileSequenceNumber = sequenceNumberOfFile();
312  IndexIntoFile::IndexIntoFileItr originalPosition = rootFile()->indexIntoFileIter();
313 
314  if ((offset > 0 || atEnd) && !nextFile()) {
315  skipToStop_ = true; // Can only get here if skipBadFiles is true
316  }
317  if (offset < 0 && !previousFile()) {
318  skipToStop_ = true; // Can't actually get here
319  }
320 
321  if (!skipToStop_) {
322  while (offset != 0) {
324  bool atEnd = rootFile()->skipEvents(offset);
325  if ((offset > 0 || atEnd) && !nextFile()) {
326  skipToStop_ = true;
327  break;
328  }
329  if (offset < 0 && !previousFile()) {
330  skipToStop_ = true;
331  break;
332  }
333  }
334  if (!skipToStop_) {
335  skipIntoNewFile_ = true;
336  }
337  }
339 
340  // Restore the original file and position
341  setAtFileSequenceNumber(originalFileSequenceNumber);
342  initFile(false);
343  assert(rootFile());
344  rootFile()->setPosition(originalPosition);
345  rootFile()->updateFileBlock(*fb_);
346  }
347 
349  usingGoToEvent_ = true;
350  if (rootFile()) {
351  if (rootFile()->goToEvent(eventID)) {
352  return true;
353  }
354  // If only one input file, give up now, to save time.
355  if (rootFile() && indexesIntoFiles().size() == 1) {
356  return false;
357  }
358  // Look for item (run/lumi/event) in files previously opened without reopening unnecessary files.
359  for (auto it = indexesIntoFiles().begin(), itEnd = indexesIntoFiles().end(); it != itEnd; ++it) {
360  if (*it && (*it)->containsItem(eventID.run(), eventID.luminosityBlock(), eventID.event())) {
361  goToEventInNewFile_ = true;
362  goToFileSequenceOffset_ = it - indexesIntoFiles().begin();
363  goToEventID_ = eventID;
364  return true;
365  }
366  }
367 
368  // Save the current file and position so that we can restore them
369  bool closedOriginalFile = false;
370  size_t const originalFileSequenceNumber = sequenceNumberOfFile();
371  IndexIntoFile::IndexIntoFileItr originalPosition = rootFile()->indexIntoFileIter();
372 
373  // Look for item in files not yet opened.
374  bool foundIt = false;
375  for (auto it = indexesIntoFiles().begin(), itEnd = indexesIntoFiles().end(); it != itEnd; ++it) {
376  if (!*it) {
378  initFile(false);
379  assert(rootFile());
380  closedOriginalFile = true;
381  if ((*it)->containsItem(eventID.run(), eventID.luminosityBlock(), eventID.event())) {
382  foundIt = true;
383  goToEventInNewFile_ = true;
384  goToFileSequenceOffset_ = it - indexesIntoFiles().begin();
385  goToEventID_ = eventID;
386  }
387  }
388  }
389  if (closedOriginalFile) {
390  setAtFileSequenceNumber(originalFileSequenceNumber);
391  initFile(false);
392  assert(rootFile());
393  rootFile()->setPosition(originalPosition);
394  rootFile()->updateFileBlock(*fb_);
395  }
396  return foundIt;
397  }
398  return false;
399  }
400 
402 
404 
406  desc.addUntracked<unsigned int>("skipEvents", 0U)
407  ->setComment("Skip the first 'skipEvents' events that otherwise would have been processed.");
408  desc.addUntracked<bool>("noEventSort", true)
409  ->setComment(
410  "True: Process runs, lumis and events in the order they appear in the file (but see notes 1 and 2).\n"
411  "False: Process runs, lumis and events in each file in numerical order (run#, lumi#, event#) (but see note "
412  "3).\n"
413  "Note 1: Events within the same lumi will always be processed contiguously.\n"
414  "Note 2: Lumis within the same run will always be processed contiguously.\n"
415  "Note 3: Any sorting occurs independently in each input file (no sorting across input files).");
416  desc.addUntracked<bool>("noRunLumiSort", false)
417  ->setComment(
418  "True: Process runs, lumis and events in the order they appear in the file.\n"
419  "False: Follow settings based on 'noEventSort' setting.");
420  desc.addUntracked<unsigned int>("cacheSize", roottree::defaultCacheSize)
421  ->setComment("Size of ROOT TTree prefetch cache. Affects performance.");
422  std::string defaultString("permissive");
423  desc.addUntracked<std::string>("branchesMustMatch", defaultString)
424  ->setComment(
425  "'strict': Branches in each input file must match those in the first file.\n"
426  "'permissive': Branches in each input file may be any subset of those in the first file.");
427  desc.addUntracked<bool>("enforceGUIDInFileName", false)
428  ->setComment(
429  "True: file name part is required to be equal to the GUID of the file\n"
430  "False: file name can be anything");
431 
434  }
435 
437  if (rootFile()) {
438  if (!rootFile()->wasLastEventJustRead()) {
440  }
441  if (noMoreFiles() || atLastFile()) {
443  } else {
445  }
446  }
448  }
449 
451  if (rootFile()) {
452  if (!rootFile()->wasFirstEventJustRead()) {
454  }
455  if (!atFirstFile()) {
457  }
459  }
461  }
462 
463 } // namespace edm
RunNumber_t run() const
Definition: EventID.h:38
EventNumber_t event() const
Definition: EventID.h:40
T getUntrackedParameter(std::string const &, T const &) const
std::string const & logicalFileName() const
void initFile(bool skipBadFiles)
std::shared_ptr< ProcessBlockHelper const > processBlockHelper() const
Accessors for processBlockHelper.
Definition: InputSource.h:150
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:328
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
InputSource::ItemType getNextItemType(RunNumber_t &run, LuminosityBlockNumber_t &lumi, EventNumber_t &event)
void setAtFileSequenceNumber(size_t offset)
std::vector< std::string > const & fileNames() const
void initFile_(bool skipBadFiles) override
ProcessingController::ForwardState forwardState() const
unsigned long long EventNumber_t
std::vector< std::shared_ptr< IndexIntoFile > > const & indexesIntoFiles() const
ProcessingMode processingMode() const
RunsLumisAndEvents (default), RunsAndLumis, or Runs.
Definition: InputSource.h:224
static void fillDescription(ParameterSetDescription &desc)
static void fillDescription(ParameterSetDescription &desc)
assert(be >=bs)
LuminosityBlockNumber_t luminosityBlock() const
Definition: EventID.h:39
unsigned int const defaultCacheSize
Definition: RootTree.h:38
unsigned int LuminosityBlockNumber_t
static std::string const input
Definition: EdmProvDump.cc:47
RunHelperBase * runHelper()
Definition: PoolSource.h:47
ProcessingController::ReverseState reverseState() const
std::shared_ptr< RootFile > RootFileSharedPtr
bool goToEvent(EventID const &eventID)
int remainingEvents() const
Definition: InputSource.h:178
std::shared_ptr< FileBlock > readFile_()
std::string const & lfn() const
RootFileSharedPtr makeRootFile(std::shared_ptr< InputFile > filePtr) override
void initTheFile(bool skipBadFiles, bool deleteIndexIntoFile, InputSource *input, char const *inputTypeName, InputType inputType)
std::vector< ProcessHistoryID > orderedProcessHistoryIDs_
bool isAvailable() const
Definition: Service.h:40
RootPrimaryFileSequence(ParameterSet const &pset, PoolSource &input, InputFileCatalog const &catalog)
std::string merge(ProductRegistry const &other, std::string const &fileName, BranchDescription::MatchMode branchesMustMatch=BranchDescription::Permissive)
unsigned int nStreams() const
Definition: PoolSource.h:44
bool dropDescendants() const
Definition: PoolSource.h:41
bool bypassVersionCheck() const
Definition: PoolSource.h:42
list lumi
Definition: dqmdumpme.py:53
std::shared_ptr< RootFile const > rootFile() const
int treeMaxVirtualSize() const
Definition: PoolSource.h:45
bool skipBadFiles() const
Definition: PoolSource.h:40
std::shared_ptr< EventSkipperByID const > eventSkipperByID() const
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
Accessors for branchIDListHelper.
Definition: InputSource.h:144
bool labelRawDataLikeMC() const
Definition: PoolSource.h:43
edm::propagate_const< std::shared_ptr< DuplicateChecker > > duplicateChecker_
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:329
BranchDescription::MatchMode branchesMustMatch_
static void fillDescription(ParameterSetDescription &desc)
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
Accessors for thinnedAssociationsHelper.
Definition: InputSource.h:156
string end
Definition: dataset.py:937
void updateFromInput(ProductList const &other)
int remainingLuminosityBlocks() const
Definition: InputSource.h:186
unsigned int RunNumber_t
void stagein(const std::string &url) const
ProcessConfiguration const & processConfiguration() const
Accessor for Process Configuration.
Definition: InputSource.h:192
std::shared_ptr< DuplicateChecker const > duplicateChecker() const
static const StorageFactory * get(void)
tuple size
Write out results.
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
ProductSelectorRules const & productSelectorRules() const
Definition: PoolSource.h:46