CMS 3D CMS Logo

RootPrimaryFileSequence.cc
Go to the documentation of this file.
1 /*----------------------------------------------------------------------
2 ----------------------------------------------------------------------*/
3 #include "DuplicateChecker.h"
4 #include "InputFile.h"
5 #include "PoolSource.h"
6 #include "RootFile.h"
8 #include "RootTree.h"
9 
20 
21 namespace edm {
26  input_(input),
27  firstFile_(true),
28  branchesMustMatch_(BranchDescription::Permissive),
29  orderedProcessHistoryIDs_(),
30  eventSkipperByID_(EventSkipperByID::create(pset).release()),
31  initialNumberOfEventsToSkip_(pset.getUntrackedParameter<unsigned int>("skipEvents")),
32  noRunLumiSort_(pset.getUntrackedParameter<bool>("noRunLumiSort")),
33  noEventSort_(noRunLumiSort_ ? true : pset.getUntrackedParameter<bool>("noEventSort")),
34  treeCacheSize_(noEventSort_ ? pset.getUntrackedParameter<unsigned int>("cacheSize") : 0U),
35  duplicateChecker_(new DuplicateChecker(pset)),
36  usingGoToEvent_(false),
37  enablePrefetching_(false),
38  enforceGUIDInFileName_(pset.getUntrackedParameter<bool>("enforceGUIDInFileName")) {
39  if (noRunLumiSort_ && (remainingEvents() >= 0 || remainingLuminosityBlocks() >= 0)) {
40  // There would need to be some Framework development work to allow stopping
41  // early with noRunLumiSort set true related to closing lumis and runs that
42  // were supposed to be continued but were not... We cannot have events written
43  // to output with no run or lumi written to output.
45  "Illegal to configure noRunLumiSort and limit the number of events or luminosityBlocks");
46  }
47  // The SiteLocalConfig controls the TTreeCache size and the prefetching settings.
49  if (pSLC.isAvailable()) {
50  if (treeCacheSize_ != 0U && pSLC->sourceTTreeCacheSize()) {
51  treeCacheSize_ = *(pSLC->sourceTTreeCacheSize());
52  }
53  enablePrefetching_ = pSLC->enablePrefetching();
54  }
55 
56  std::string branchesMustMatch =
57  pset.getUntrackedParameter<std::string>("branchesMustMatch", std::string("permissive"));
58  if (branchesMustMatch == std::string("strict"))
60 
61  // Prestage the files
64  }
65  // Open the first file.
68  if (rootFile())
69  break;
70  }
71  if (rootFile()) {
72  input_.productRegistryUpdate().updateFromInput(rootFile()->productRegistry()->productList());
75  }
76  }
77  }
78 
80 
82 
83  std::shared_ptr<FileBlock> RootPrimaryFileSequence::readFile_() {
84  std::shared_ptr<FileBlock> fileBlock;
85  if (firstFile_) {
86  firstFile_ = false;
87  // Usually the first input file will already be open
88  if (!rootFile()) {
90  }
91  } else if (goToEventInNewFile_) {
92  goToEventInNewFile_ = false;
94  initFile(false);
95  assert(rootFile());
96  bool found = rootFile()->goToEvent(goToEventID_);
97  assert(found);
98  } else if (skipIntoNewFile_) {
99  skipIntoNewFile_ = false;
101  initFile(false);
102  assert(rootFile());
103  if (skipToOffsetInFinalFile_ < 0) {
104  rootFile()->setToLastEntry();
105  }
106  bool atEnd = rootFile()->skipEvents(skipToOffsetInFinalFile_);
107  assert(!atEnd && skipToOffsetInFinalFile_ == 0);
108  } else {
109  if (!nextFile()) {
110  // handle case with last file bad and
111  // skipBadFiles true
112  fb_ = fileBlock;
113  return fileBlock;
114  }
115  }
116  if (!rootFile()) {
117  fileBlock = std::make_shared<FileBlock>();
118  fb_ = fileBlock;
119  return fileBlock;
120  }
121  fileBlock = rootFile()->createFileBlock();
122  fb_ = fileBlock;
123  return fileBlock;
124  }
125 
127  // close the currently open file, if any, and delete the RootFile object.
128  if (rootFile()) {
129  auto sentry = std::make_unique<InputSource::FileCloseSentry>(input_, lfn());
130  rootFile()->close();
131  if (duplicateChecker_)
132  duplicateChecker_->inputFileClosed();
133  rootFile().reset();
134  }
135  }
136 
138  // If we are not duplicate checking across files and we are not using random access to find events,
139  // then we can delete the IndexIntoFile for the file we are closing.
140  // If we can't delete all of it, then we can delete the parts we do not need.
141  bool deleteIndexIntoFile = !usingGoToEvent_ && !(duplicateChecker_ && duplicateChecker_->checkingAllFiles() &&
142  !duplicateChecker_->checkDisabled());
143  initTheFile(skipBadFiles, deleteIndexIntoFile, &input_, "primaryFiles", InputType::Primary);
144  }
145 
147  size_t currentIndexIntoFile = sequenceNumberOfFile();
148  return std::make_shared<RootFile>(fileNames()[0],
150  logicalFileName(),
151  filePtr,
154  remainingEvents(),
156  input_.nStreams(),
160  input_.runHelper(),
162  noEventSort_,
166  input_.processBlockHelper().get(),
168  nullptr, // associationsFromSecondary
173  currentIndexIntoFile,
180  }
181 
183  do {
184  if (!noMoreFiles())
185  setAtNextFile();
186  if (noMoreFiles()) {
187  return false;
188  }
189 
191  if (rootFile()) {
192  break;
193  }
194  // If we are not skipping bad files and the file
195  // open failed, then initFile should have thrown
197  } while (true);
198 
199  // make sure the new product registry is compatible with the main one
200  std::string mergeInfo =
201  input_.productRegistryUpdate().merge(*rootFile()->productRegistry(), fileNames()[0], branchesMustMatch_);
202  if (!mergeInfo.empty()) {
203  throw Exception(errors::MismatchedInputFiles, "RootPrimaryFileSequence::nextFile()") << mergeInfo;
204  }
205  return true;
206  }
207 
209  if (atFirstFile()) {
210  return false;
211  }
213 
214  initFile(false);
215 
216  if (rootFile()) {
217  // make sure the new product registry is compatible to the main one
218  std::string mergeInfo =
219  input_.productRegistryUpdate().merge(*rootFile()->productRegistry(), fileNames()[0], branchesMustMatch_);
220  if (!mergeInfo.empty()) {
221  throw Exception(errors::MismatchedInputFiles, "RootPrimaryFileSequence::previousEvent()") << mergeInfo;
222  }
223  }
224  if (rootFile())
225  rootFile()->setToLastEntry();
226  return true;
227  }
228 
231  EventNumber_t& event) {
232  if (noMoreFiles() || skipToStop_) {
233  skipToStop_ = false;
234  return InputSource::IsStop;
235  }
237  return InputSource::IsFile;
238  }
239  if (rootFile()) {
240  IndexIntoFile::EntryType entryType = rootFile()->getNextItemType(run, lumi, event);
241  if (entryType == IndexIntoFile::kEvent) {
242  return InputSource::IsEvent;
243  } else if (entryType == IndexIntoFile::kLumi) {
244  return InputSource::IsLumi;
245  } else if (entryType == IndexIntoFile::kRun) {
246  return InputSource::IsRun;
247  }
248  assert(entryType == IndexIntoFile::kEnd);
249  }
250  if (atLastFile()) {
251  return InputSource::IsStop;
252  }
253  return InputSource::IsFile;
254  }
255 
256  // Rewind to before the first event that was read.
258  if (!atFirstFile()) {
259  closeFile();
260  setAtFirstFile();
261  }
262  if (!rootFile()) {
263  initFile(false);
264  }
265  rewindFile();
266  firstFile_ = true;
267  goToEventInNewFile_ = false;
268  skipIntoNewFile_ = false;
269  skipToStop_ = false;
270  if (rootFile()) {
271  if (initialNumberOfEventsToSkip_ != 0) {
273  }
274  }
275  }
276 
277  // Rewind to the beginning of the current file
279  if (rootFile())
280  rootFile()->rewind();
281  }
282 
283  // Advance "offset" events. Offset will be positive.
285  assert(rootFile());
286  assert(offset >= 0);
287  while (offset != 0) {
288  bool atEnd = rootFile()->skipEvents(offset);
289  if ((offset > 0 || atEnd) && !nextFile()) {
290  return;
291  }
292  }
293  }
294 
295  // Advance "offset" events. Offset can be positive or negative (or zero).
297  assert(rootFile());
298 
299  bool atEnd = rootFile()->skipEvents(offset);
300  if (!atEnd && offset == 0) {
301  // successfully completed skip within current file
302  return;
303  }
304 
305  // Return, if without closing the current file we know the skip cannot be completed
306  skipToStop_ = false;
307  if (offset > 0 || atEnd) {
308  if (atLastFile() || noMoreFiles()) {
309  skipToStop_ = true;
310  return;
311  }
312  }
313  if (offset < 0 && atFirstFile()) {
314  skipToStop_ = true;
315  return;
316  }
317 
318  // Save the current file and position so that we can restore them
319  size_t const originalFileSequenceNumber = sequenceNumberOfFile();
320  IndexIntoFile::IndexIntoFileItr originalPosition = rootFile()->indexIntoFileIter();
321 
322  if ((offset > 0 || atEnd) && !nextFile()) {
323  skipToStop_ = true; // Can only get here if skipBadFiles is true
324  }
325  if (offset < 0 && !previousFile()) {
326  skipToStop_ = true; // Can't actually get here
327  }
328 
329  if (!skipToStop_) {
330  while (offset != 0) {
332  bool atEnd = rootFile()->skipEvents(offset);
333  if ((offset > 0 || atEnd) && !nextFile()) {
334  skipToStop_ = true;
335  break;
336  }
337  if (offset < 0 && !previousFile()) {
338  skipToStop_ = true;
339  break;
340  }
341  }
342  if (!skipToStop_) {
343  skipIntoNewFile_ = true;
344  }
345  }
347 
348  // Restore the original file and position
349  setAtFileSequenceNumber(originalFileSequenceNumber);
350  initFile(false);
351  assert(rootFile());
352  rootFile()->setPosition(originalPosition);
353  rootFile()->updateFileBlock(*fb_);
354  }
355 
357  usingGoToEvent_ = true;
358  if (rootFile()) {
359  if (rootFile()->goToEvent(eventID)) {
360  return true;
361  }
362  // If only one input file, give up now, to save time.
363  if (rootFile() && indexesIntoFiles().size() == 1) {
364  return false;
365  }
366  // Look for item (run/lumi/event) in files previously opened without reopening unnecessary files.
367  for (auto it = indexesIntoFiles().begin(), itEnd = indexesIntoFiles().end(); it != itEnd; ++it) {
368  if (*it && (*it)->containsItem(eventID.run(), eventID.luminosityBlock(), eventID.event())) {
369  goToEventInNewFile_ = true;
370  goToFileSequenceOffset_ = it - indexesIntoFiles().begin();
371  goToEventID_ = eventID;
372  return true;
373  }
374  }
375 
376  // Save the current file and position so that we can restore them
377  bool closedOriginalFile = false;
378  size_t const originalFileSequenceNumber = sequenceNumberOfFile();
379  IndexIntoFile::IndexIntoFileItr originalPosition = rootFile()->indexIntoFileIter();
380 
381  // Look for item in files not yet opened.
382  bool foundIt = false;
383  for (auto it = indexesIntoFiles().begin(), itEnd = indexesIntoFiles().end(); it != itEnd; ++it) {
384  if (!*it) {
386  initFile(false);
387  assert(rootFile());
388  closedOriginalFile = true;
389  if ((*it)->containsItem(eventID.run(), eventID.luminosityBlock(), eventID.event())) {
390  foundIt = true;
391  goToEventInNewFile_ = true;
392  goToFileSequenceOffset_ = it - indexesIntoFiles().begin();
393  goToEventID_ = eventID;
394  }
395  }
396  }
397  if (closedOriginalFile) {
398  setAtFileSequenceNumber(originalFileSequenceNumber);
399  initFile(false);
400  assert(rootFile());
401  rootFile()->setPosition(originalPosition);
402  rootFile()->updateFileBlock(*fb_);
403  }
404  return foundIt;
405  }
406  return false;
407  }
408 
410 
412 
414  desc.addUntracked<unsigned int>("skipEvents", 0U)
415  ->setComment("Skip the first 'skipEvents' events that otherwise would have been processed.");
416  desc.addUntracked<bool>("noEventSort", true)
417  ->setComment(
418  "True: Process runs, lumis and events in the order they appear in the file (but see notes 1 and 2).\n"
419  "False: Process runs, lumis and events in each file in numerical order (run#, lumi#, event#) (but see note "
420  "3).\n"
421  "Note 1: Events within the same lumi will always be processed contiguously.\n"
422  "Note 2: Lumis within the same run will always be processed contiguously.\n"
423  "Note 3: Any sorting occurs independently in each input file (no sorting across input files).");
424  desc.addUntracked<bool>("noRunLumiSort", false)
425  ->setComment(
426  "True: Process runs, lumis and events in the order they appear in the file.\n"
427  "False: Follow settings based on 'noEventSort' setting.");
428  desc.addUntracked<unsigned int>("cacheSize", roottree::defaultCacheSize)
429  ->setComment("Size of ROOT TTree prefetch cache. Affects performance.");
430  std::string defaultString("permissive");
431  desc.addUntracked<std::string>("branchesMustMatch", defaultString)
432  ->setComment(
433  "'strict': Branches in each input file must match those in the first file.\n"
434  "'permissive': Branches in each input file may be any subset of those in the first file.");
435  desc.addUntracked<bool>("enforceGUIDInFileName", false)
436  ->setComment(
437  "True: file name part is required to be equal to the GUID of the file\n"
438  "False: file name can be anything");
439 
442  }
443 
445  if (rootFile()) {
446  if (!rootFile()->wasLastEventJustRead()) {
448  }
449  if (noMoreFiles() || atLastFile()) {
451  } else {
453  }
454  }
456  }
457 
459  if (rootFile()) {
460  if (!rootFile()->wasFirstEventJustRead()) {
462  }
463  if (!atFirstFile()) {
465  }
467  }
469  }
470 
471 } // namespace edm
size
Write out results.
bool labelRawDataLikeMC() const
Definition: PoolSource.h:43
void initFile(bool skipBadFiles)
InputSource::ItemType getNextItemType(RunNumber_t &run, LuminosityBlockNumber_t &lumi, EventNumber_t &event)
void setAtFileSequenceNumber(size_t offset)
def create(alignables, pedeDump, additionalData, outputFile, config)
void initFile_(bool skipBadFiles) override
int remainingEvents() const
Definition: InputSource.h:178
std::string const & logicalFileName() const
unsigned long long EventNumber_t
bool skipBadFiles() const
Definition: PoolSource.h:40
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:328
static void fillDescription(ParameterSetDescription &desc)
std::shared_ptr< ProcessBlockHelper const > processBlockHelper() const
Accessors for processBlockHelper.
Definition: InputSource.h:150
static void fillDescription(ParameterSetDescription &desc)
assert(be >=bs)
unsigned int const defaultCacheSize
Definition: RootTree.h:38
unsigned int nStreams() const
Definition: PoolSource.h:44
unsigned int LuminosityBlockNumber_t
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:329
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
Accessors for thinnedAssociationsHelper.
Definition: InputSource.h:156
std::vector< std::string > const & fileNames() const
std::shared_ptr< DuplicateChecker const > duplicateChecker() const
static std::string const input
Definition: EdmProvDump.cc:47
LuminosityBlockNumber_t luminosityBlock() const
Definition: EventID.h:39
std::shared_ptr< EventSkipperByID const > eventSkipperByID() const
RunHelperBase * runHelper()
Definition: PoolSource.h:47
std::shared_ptr< RootFile > RootFileSharedPtr
bool goToEvent(EventID const &eventID)
ProcessingController::ForwardState forwardState() const
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
Accessors for branchIDListHelper.
Definition: InputSource.h:144
std::shared_ptr< FileBlock > readFile_()
RootFileSharedPtr makeRootFile(std::shared_ptr< InputFile > filePtr) override
void initTheFile(bool skipBadFiles, bool deleteIndexIntoFile, InputSource *input, char const *inputTypeName, InputType inputType)
std::vector< ProcessHistoryID > orderedProcessHistoryIDs_
RootPrimaryFileSequence(ParameterSet const &pset, PoolSource &input, InputFileCatalog const &catalog)
std::string merge(ProductRegistry const &other, std::string const &fileName, BranchDescription::MatchMode branchesMustMatch=BranchDescription::Permissive)
ProcessConfiguration const & processConfiguration() const
Accessor for Process Configuration.
Definition: InputSource.h:192
std::shared_ptr< RootFile const > rootFile() const
void stagein(const std::string &url) const
RunNumber_t run() const
Definition: EventID.h:38
int treeMaxVirtualSize() const
Definition: PoolSource.h:45
edm::propagate_const< std::shared_ptr< DuplicateChecker > > duplicateChecker_
BranchDescription::MatchMode branchesMustMatch_
static void fillDescription(ParameterSetDescription &desc)
int remainingLuminosityBlocks() const
Definition: InputSource.h:186
HLT enums.
void updateFromInput(ProductList const &other)
ProductSelectorRules const & productSelectorRules() const
Definition: PoolSource.h:46
std::string const & lfn() const
bool isAvailable() const
Definition: Service.h:40
unsigned int RunNumber_t
bool bypassVersionCheck() const
Definition: PoolSource.h:42
static const StorageFactory * get(void)
ProcessingController::ReverseState reverseState() const
bool dropDescendants() const
Definition: PoolSource.h:41
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
EventNumber_t event() const
Definition: EventID.h:40
Definition: event.py:1
ProcessingMode processingMode() const
RunsLumisAndEvents (default), RunsAndLumis, or Runs.
Definition: InputSource.h:224
std::vector< std::shared_ptr< IndexIntoFile > > const & indexesIntoFiles() const