CMS 3D CMS Logo

RootPrimaryFileSequence.cc
Go to the documentation of this file.
1 /*----------------------------------------------------------------------
2 ----------------------------------------------------------------------*/
3 #include "DuplicateChecker.h"
4 #include "InputFile.h"
5 #include "PoolSource.h"
6 #include "RootFile.h"
8 #include "RootTree.h"
9 
19 
20 namespace edm {
24  : RootInputFileSequence(pset, catalog),
25  input_(input),
26  firstFile_(true),
27  branchesMustMatch_(BranchDescription::Permissive),
28  orderedProcessHistoryIDs_(),
29  eventSkipperByID_(EventSkipperByID::create(pset).release()),
30  initialNumberOfEventsToSkip_(pset.getUntrackedParameter<unsigned int>("skipEvents")),
31  noEventSort_(pset.getUntrackedParameter<bool>("noEventSort")),
32  treeCacheSize_(noEventSort_ ? pset.getUntrackedParameter<unsigned int>("cacheSize") : 0U),
33  duplicateChecker_(new DuplicateChecker(pset)),
34  usingGoToEvent_(false),
35  enablePrefetching_(false) {
36  // The SiteLocalConfig controls the TTreeCache size and the prefetching settings.
38  if (pSLC.isAvailable()) {
39  if (treeCacheSize_ != 0U && pSLC->sourceTTreeCacheSize()) {
40  treeCacheSize_ = *(pSLC->sourceTTreeCacheSize());
41  }
42  enablePrefetching_ = pSLC->enablePrefetching();
43  }
44 
45  std::string branchesMustMatch =
46  pset.getUntrackedParameter<std::string>("branchesMustMatch", std::string("permissive"));
47  if (branchesMustMatch == std::string("strict"))
49 
50  // Prestage the files
53  }
54  // Open the first file.
57  if (rootFile())
58  break;
59  }
60  if (rootFile()) {
61  input_.productRegistryUpdate().updateFromInput(rootFile()->productRegistry()->productList());
64  }
65  }
66  }
67 
69 
71 
72  std::unique_ptr<FileBlock> RootPrimaryFileSequence::readFile_() {
73  if (firstFile_) {
74  // The first input file has already been opened.
75  firstFile_ = false;
76  if (!rootFile()) {
78  }
79  } else {
80  if (!nextFile()) {
81  assert(0);
82  }
83  }
84  if (!rootFile()) {
85  return std::make_unique<FileBlock>();
86  }
87  return rootFile()->createFileBlock();
88  }
89 
91  // close the currently open file, if any, and delete the RootFile object.
92  if (rootFile()) {
93  auto sentry = std::make_unique<InputSource::FileCloseSentry>(input_, lfn(), usedFallback());
94  rootFile()->close();
96  duplicateChecker_->inputFileClosed();
97  rootFile().reset();
98  }
99  }
100 
102  // If we are not duplicate checking across files and we are not using random access to find events,
103  // then we can delete the IndexIntoFile for the file we are closing.
104  // If we can't delete all of it, then we can delete the parts we do not need.
105  bool deleteIndexIntoFile = !usingGoToEvent_ && !(duplicateChecker_ && duplicateChecker_->checkingAllFiles() &&
106  !duplicateChecker_->checkDisabled());
107  initTheFile(skipBadFiles, deleteIndexIntoFile, &input_, "primaryFiles", InputType::Primary);
108  }
109 
111  size_t currentIndexIntoFile = sequenceNumberOfFile();
112  return std::make_shared<RootFile>(fileName(),
114  logicalFileName(),
115  filePtr,
118  remainingEvents(),
120  input_.nStreams(),
124  input_.runHelper(),
125  noEventSort_,
130  nullptr, // associationsFromSecondary
135  currentIndexIntoFile,
141  }
142 
144  if (!noMoreFiles())
145  setAtNextFile();
146  if (noMoreFiles()) {
147  return false;
148  }
149 
151 
152  if (rootFile()) {
153  // make sure the new product registry is compatible with the main one
154  std::string mergeInfo =
156  if (!mergeInfo.empty()) {
157  throw Exception(errors::MismatchedInputFiles, "RootPrimaryFileSequence::nextFile()") << mergeInfo;
158  }
159  }
160  return true;
161  }
162 
164  if (atFirstFile()) {
165  return false;
166  }
168 
169  initFile(false);
170 
171  if (rootFile()) {
172  // make sure the new product registry is compatible to the main one
173  std::string mergeInfo =
175  if (!mergeInfo.empty()) {
176  throw Exception(errors::MismatchedInputFiles, "RootPrimaryFileSequence::previousEvent()") << mergeInfo;
177  }
178  }
179  if (rootFile())
180  rootFile()->setToLastEntry();
181  return true;
182  }
183 
186  EventNumber_t& event) {
187  if (noMoreFiles()) {
188  return InputSource::IsStop;
189  }
190  if (firstFile_) {
191  return InputSource::IsFile;
192  }
193  if (rootFile()) {
194  IndexIntoFile::EntryType entryType = rootFile()->getNextItemType(run, lumi, event);
195  if (entryType == IndexIntoFile::kEvent) {
196  return InputSource::IsEvent;
197  } else if (entryType == IndexIntoFile::kLumi) {
198  return InputSource::IsLumi;
199  } else if (entryType == IndexIntoFile::kRun) {
200  return InputSource::IsRun;
201  }
202  assert(entryType == IndexIntoFile::kEnd);
203  }
204  if (atLastFile()) {
205  return InputSource::IsStop;
206  }
207  return InputSource::IsFile;
208  }
209 
210  // Rewind to before the first event that was read.
212  if (!atFirstFile()) {
213  closeFile_();
214  setAtFirstFile();
215  }
216  if (!rootFile()) {
217  initFile(false);
218  }
219  rewindFile();
220  firstFile_ = true;
221  if (rootFile()) {
222  if (initialNumberOfEventsToSkip_ != 0) {
224  }
225  }
226  }
227 
228  // Rewind to the beginning of the current file
230  if (rootFile())
231  rootFile()->rewind();
232  }
233 
234  // Advance "offset" events. Offset can be positive or negative (or zero).
236  assert(rootFile());
237  while (offset != 0) {
238  bool atEnd = rootFile()->skipEvents(offset);
239  if ((offset > 0 || atEnd) && !nextFile()) {
240  return false;
241  }
242  if (offset < 0 && !previousFile()) {
243  setNoMoreFiles();
244  return false;
245  }
246  }
247  return true;
248  }
249 
251  usingGoToEvent_ = true;
252  if (rootFile()) {
253  if (rootFile()->goToEvent(eventID)) {
254  return true;
255  }
256  // If only one input file, give up now, to save time.
257  if (rootFile() && indexesIntoFiles().size() == 1) {
258  return false;
259  }
260  // Save the current file and position so that we can restore them
261  // if we fail to restore the desired event
262  bool closedOriginalFile = false;
263  size_t const originalFileSequenceNumber = sequenceNumberOfFile();
264  IndexIntoFile::IndexIntoFileItr originalPosition = rootFile()->indexIntoFileIter();
265 
266  // Look for item (run/lumi/event) in files previously opened without reopening unnecessary files.
267  for (auto it = indexesIntoFiles().begin(), itEnd = indexesIntoFiles().end(); it != itEnd; ++it) {
268  if (*it && (*it)->containsItem(eventID.run(), eventID.luminosityBlock(), eventID.event())) {
269  // We found it. Close the currently open file, and open the correct one.
271  initFile(false);
272  // Now get the item from the correct file.
273  assert(rootFile());
274  bool found = rootFile()->goToEvent(eventID);
275  assert(found);
276  return true;
277  }
278  }
279  // Look for item in files not yet opened.
280  for (auto it = indexesIntoFiles().begin(), itEnd = indexesIntoFiles().end(); it != itEnd; ++it) {
281  if (!*it) {
283  initFile(false);
284  closedOriginalFile = true;
285  if ((*it)->containsItem(eventID.run(), eventID.luminosityBlock(), eventID.event())) {
286  assert(rootFile());
287  if (rootFile()->goToEvent(eventID)) {
288  return true;
289  }
290  }
291  }
292  }
293  if (closedOriginalFile) {
294  setAtFileSequenceNumber(originalFileSequenceNumber);
295  initFile(false);
296  assert(rootFile());
297  rootFile()->setPosition(originalPosition);
298  }
299  }
300  return false;
301  }
302 
304 
306 
308  desc.addUntracked<unsigned int>("skipEvents", 0U)
309  ->setComment("Skip the first 'skipEvents' events that otherwise would have been processed.");
310  desc.addUntracked<bool>("noEventSort", true)
311  ->setComment(
312  "True: Process runs, lumis and events in the order they appear in the file (but see notes 1 and 2).\n"
313  "False: Process runs, lumis and events in each file in numerical order (run#, lumi#, event#) (but see note "
314  "3).\n"
315  "Note 1: Events within the same lumi will always be processed contiguously.\n"
316  "Note 2: Lumis within the same run will always be processed contiguously.\n"
317  "Note 3: Any sorting occurs independently in each input file (no sorting across input files).");
318  desc.addUntracked<unsigned int>("cacheSize", roottree::defaultCacheSize)
319  ->setComment("Size of ROOT TTree prefetch cache. Affects performance.");
320  std::string defaultString("permissive");
321  desc.addUntracked<std::string>("branchesMustMatch", defaultString)
322  ->setComment(
323  "'strict': Branches in each input file must match those in the first file.\n"
324  "'permissive': Branches in each input file may be any subset of those in the first file.");
325 
328  }
329 
331  if (rootFile()) {
332  if (!rootFile()->wasLastEventJustRead()) {
334  }
335  if (noMoreFiles() || atLastFile()) {
337  } else {
339  }
340  }
342  }
343 
345  if (rootFile()) {
346  if (!rootFile()->wasFirstEventJustRead()) {
348  }
349  if (!atFirstFile()) {
351  }
353  }
355  }
356 
357 } // namespace edm
RunNumber_t run() const
Definition: EventID.h:39
size
Write out results.
EventNumber_t event() const
Definition: EventID.h:41
T getUntrackedParameter(std::string const &, T const &) const
std::string const & logicalFileName() const
void initFile(bool skipBadFiles)
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:326
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
std::string const & fileName() const
InputSource::ItemType getNextItemType(RunNumber_t &run, LuminosityBlockNumber_t &lumi, EventNumber_t &event)
void setAtFileSequenceNumber(size_t offset)
def create(alignables, pedeDump, additionalData, outputFile, config)
void initFile_(bool skipBadFiles) override
ProcessingController::ForwardState forwardState() const
unsigned long long EventNumber_t
std::vector< std::shared_ptr< IndexIntoFile > > const & indexesIntoFiles() const
ProcessingMode processingMode() const
RunsLumisAndEvents (default), RunsAndLumis, or Runs.
Definition: InputSource.h:233
static void fillDescription(ParameterSetDescription &desc)
static void fillDescription(ParameterSetDescription &desc)
std::unique_ptr< FileBlock > readFile_()
LuminosityBlockNumber_t luminosityBlock() const
Definition: EventID.h:40
unsigned int const defaultCacheSize
Definition: RootTree.h:46
unsigned int LuminosityBlockNumber_t
static std::string const input
Definition: EdmProvDump.cc:48
RunHelperBase * runHelper()
Definition: PoolSource.h:47
ProcessingController::ReverseState reverseState() const
std::shared_ptr< RootFile > RootFileSharedPtr
bool goToEvent(EventID const &eventID)
static const StorageFactory * get(void)
int remainingEvents() const
Definition: InputSource.h:187
void stagein(const std::string &url) const
std::string const & lfn() const
RootFileSharedPtr makeRootFile(std::shared_ptr< InputFile > filePtr) override
void initTheFile(bool skipBadFiles, bool deleteIndexIntoFile, InputSource *input, char const *inputTypeName, InputType inputType)
std::vector< ProcessHistoryID > orderedProcessHistoryIDs_
bool isAvailable() const
Definition: Service.h:40
RootPrimaryFileSequence(ParameterSet const &pset, PoolSource &input, InputFileCatalog const &catalog)
std::string merge(ProductRegistry const &other, std::string const &fileName, BranchDescription::MatchMode branchesMustMatch=BranchDescription::Permissive)
unsigned int nStreams() const
Definition: PoolSource.h:44
bool dropDescendants() const
Definition: PoolSource.h:41
bool bypassVersionCheck() const
Definition: PoolSource.h:42
#define end
Definition: vmac.h:39
std::shared_ptr< RootFile const > rootFile() const
int treeMaxVirtualSize() const
Definition: PoolSource.h:45
bool skipBadFiles() const
Definition: PoolSource.h:40
std::shared_ptr< EventSkipperByID const > eventSkipperByID() const
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
Accessors for branchIDListHelper.
Definition: InputSource.h:159
bool labelRawDataLikeMC() const
Definition: PoolSource.h:43
edm::propagate_const< std::shared_ptr< DuplicateChecker > > duplicateChecker_
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:327
BranchDescription::MatchMode branchesMustMatch_
static void fillDescription(ParameterSetDescription &desc)
#define begin
Definition: vmac.h:32
HLT enums.
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
Accessors for thinnedAssociationsHelper.
Definition: InputSource.h:165
void updateFromInput(ProductList const &other)
int remainingLuminosityBlocks() const
Definition: InputSource.h:195
unsigned int RunNumber_t
ProcessConfiguration const & processConfiguration() const
Accessor for Process Configuration.
Definition: InputSource.h:201
std::shared_ptr< DuplicateChecker const > duplicateChecker() const
Definition: event.py:1
ProductSelectorRules const & productSelectorRules() const
Definition: PoolSource.h:46