CMS 3D CMS Logo

RootEmbeddedFileSequence.cc
Go to the documentation of this file.
1 /*----------------------------------------------------------------------
2 ----------------------------------------------------------------------*/
3 #include "EmbeddedRootSource.h"
4 #include "InputFile.h"
5 #include "RootFile.h"
7 #include "RootTree.h"
8 
19 
20 #include "CLHEP/Random/RandFlat.h"
21 
22 #include <random>
23 #include <algorithm>
24 #include <atomic>
25 
26 namespace {
27  std::atomic<unsigned int> badFilesSkipped_{0};
28  auto operator"" _uz(unsigned long long i) -> std::size_t { return std::size_t{i}; } // uz will be in C++23
29 } // namespace
30 
31 namespace edm {
32  class EventPrincipal;
33 
38  input_(input),
39  orderedProcessHistoryIDs_(),
40  sequential_(pset.getUntrackedParameter<bool>("sequential", false)),
41  sameLumiBlock_(pset.getUntrackedParameter<bool>("sameLumiBlock", false)),
42  fptr_(nullptr),
43  eventsRemainingInFile_(0),
44  // The default value provided as the second argument to the getUntrackedParameter function call
45  // is not used when the ParameterSet has been validated and the parameters are not optional
46  // in the description. This is currently true when PoolSource is the primary input source.
47  // The modules that use PoolSource as a SecSource have not defined their fillDescriptions function
48  // yet, so the ParameterSet does not get validated yet. As soon as all the modules with a SecSource
49  // have defined descriptions, the defaults in the getUntrackedParameterSet function calls can
50  // and should be deleted from the code.
51  initialNumberOfEventsToSkip_(pset.getUntrackedParameter<unsigned int>("skipEvents", 0U)),
52  treeCacheSize_(pset.getUntrackedParameter<unsigned int>("cacheSize", roottree::defaultCacheSize)),
53  enablePrefetching_(false),
54  enforceGUIDInFileName_(pset.getUntrackedParameter<bool>("enforceGUIDInFileName", false)),
55  maxFileSkips_(pset.getUntrackedParameter<unsigned int>("maxFileSkips", std::min(3_uz, numberOfFiles()))) {
56  if (noFiles()) {
58  << "RootEmbeddedFileSequence no input files specified for secondary input source.\n";
59  }
60  //
61  // The SiteLocalConfig controls the TTreeCache size and the prefetching settings.
63  if (pSLC.isAvailable()) {
64  if (treeCacheSize_ != 0U && pSLC->sourceTTreeCacheSize()) {
65  treeCacheSize_ = *(pSLC->sourceTTreeCacheSize());
66  }
67  enablePrefetching_ = pSLC->enablePrefetching();
68  }
69 
70  // Set the pointer to the function that reads an event.
71  if (sameLumiBlock_) {
72  if (sequential_) {
74  } else {
76  }
77  } else {
78  if (sequential_) {
80  } else {
82  }
83  }
84 
85  // For the secondary input source we do not stage in.
86  if (sequential_) {
87  // We open the first file
88  if (!atFirstFile()) {
90  initFile(false);
91  }
92  assert(rootFile());
93  rootFile()->setAtEventEntry(IndexIntoFile::invalidEntry);
94  if (!sameLumiBlock_) {
96  }
97  } else {
98  // We randomly choose the first file to open.
99  // We cannot use the random number service yet.
100  std::ifstream f("/dev/urandom");
101  unsigned int seed;
102  f.read(reinterpret_cast<char*>(&seed), sizeof(seed));
103  std::default_random_engine dre(seed);
104  std::uniform_int_distribution<int> distribution(0, numberOfFiles() - 1);
105  while (!rootFile() && badFilesSkipped_ < maxFileSkips_) {
106  int offset = distribution(dre);
109  if (not rootFile()) {
110  ++badFilesSkipped_;
111  }
112  }
113  }
114  if (rootFile()) {
115  input_.productRegistryUpdate().updateFromInput(rootFile()->productRegistry()->productList());
116  } else {
117  throw Exception(errors::FileOpenError) << "RootEmbeddedFileSequence::RootEmbeddedFileSequence(): "
118  << " input file retries exhausted.\n";
119  }
120  }
121 
123 
125 
127  // delete the RootFile object.
128  if (rootFile()) {
129  rootFile().reset();
130  }
131  }
132 
134  initTheFile(skipBadFiles, false, nullptr, "mixingFiles", InputType::SecondarySource);
135  }
136 
138  std::shared_ptr<InputFile> filePtr) {
139  size_t currentIndexIntoFile = sequenceNumberOfFile();
140  return std::make_shared<RootFile>(fileNames()[0],
142  logicalFileName(),
143  filePtr,
144  input_.nStreams(),
147  input_.runHelper(),
152  currentIndexIntoFile,
157  }
158 
160  // offset is decremented by the number of events actually skipped.
161  bool completed = rootFile()->skipEntries(offset);
162  while (!completed) {
163  setAtNextFile();
164  if (noMoreFiles()) {
165  setAtFirstFile();
166  }
167  initFile(false);
168  assert(rootFile());
169  rootFile()->setAtEventEntry(IndexIntoFile::invalidEntry);
170  completed = rootFile()->skipEntries(offset);
171  }
172  }
173 
175  EventPrincipal& cache, size_t& fileNameHash, CLHEP::HepRandomEngine*, EventID const*, bool recycleFiles) {
176  assert(rootFile());
177  bool found = rootFile()->nextEventEntry();
178  if (found) {
179  auto [found2, succeeded] = rootFile()->readCurrentEvent(cache);
180  found = found2;
181  }
182  if (!found) {
183  setAtNextFile();
184  if (noMoreFiles()) {
185  if (recycleFiles) {
186  setAtFirstFile();
187  } else {
188  return false;
189  }
190  }
191  initFile(false);
192  assert(rootFile());
193  rootFile()->setAtEventEntry(IndexIntoFile::invalidEntry);
194  return readOneSequential(cache, fileNameHash, nullptr, nullptr, recycleFiles);
195  }
196  fileNameHash = lfnHash();
197  return true;
198  }
199 
201  EventPrincipal& cache, size_t& fileNameHash, CLHEP::HepRandomEngine*, EventID const* idp, bool recycleFiles) {
202  assert(idp);
203  EventID const& id = *idp;
206  if (offset > 0) {
207  assert(rootFile());
208  while (offset > 0) {
209  bool found = readOneSequentialWithID(cache, fileNameHash, nullptr, idp, recycleFiles);
210  if (!found) {
211  return false;
212  }
213  --offset;
214  }
215  }
216  assert(rootFile());
217  if (noMoreFiles() || rootFile()->indexIntoFileIter().run() != id.run() ||
218  rootFile()->indexIntoFileIter().lumi() != id.luminosityBlock()) {
219  bool found = skipToItem(id.run(), id.luminosityBlock(), 0, 0, false);
220  if (!found) {
221  return false;
222  }
223  }
224  assert(rootFile());
225  bool found = rootFile()->setEntryAtNextEventInLumi(id.run(), id.luminosityBlock());
226  if (found) {
227  auto [found2, succeeded] = rootFile()->readCurrentEvent(cache);
228  found = found2;
229  }
230  if (!found) {
231  found = skipToItemInNewFile(id.run(), id.luminosityBlock(), 0);
232  if (!found) {
233  return false;
234  }
235  return readOneSequentialWithID(cache, fileNameHash, nullptr, idp, recycleFiles);
236  }
237  fileNameHash = lfnHash();
238  return true;
239  }
240 
242  size_t& fileNameHash,
244  EventID const& id = idx.eventID();
245  bool found = skipToItem(id.run(), id.luminosityBlock(), id.event(), idx.fileNameHash());
246  if (!found) {
247  throw Exception(errors::NotFound) << "RootEmbeddedFileSequence::readOneSpecified(): Secondary Input files"
248  << " do not contain specified event:\n"
249  << id << " in file id " << idx.fileNameHash() << "\n";
250  }
251  assert(rootFile());
252  auto [found2, succeeded] = rootFile()->readCurrentEvent(cache);
253  found = found2;
254  assert(found);
255  fileNameHash = idx.fileNameHash();
256  if (fileNameHash == 0U) {
257  fileNameHash = lfnHash();
258  }
259  }
260 
262  EventPrincipal& cache, size_t& fileNameHash, CLHEP::HepRandomEngine* engine, EventID const*, bool) {
263  assert(rootFile());
264  assert(engine);
265  unsigned int currentSeqNumber = sequenceNumberOfFile();
266  while (eventsRemainingInFile_ == 0) {
267  bool opened{false};
268  while (!opened && badFilesSkipped_ < maxFileSkips_) {
269  unsigned int newSeqNumber = CLHEP::RandFlat::shootInt(engine, fileCatalogItems().size());
270  setAtFileSequenceNumber(newSeqNumber);
271  if (newSeqNumber != currentSeqNumber) {
273  currentSeqNumber = newSeqNumber;
274  }
275  if (rootFile()) {
276  eventsRemainingInFile_ = rootFile()->eventTree().entries();
277  if (eventsRemainingInFile_ == 0) {
278  if (!input_.skipBadFiles()) {
279  throw Exception(errors::NotFound) << "RootEmbeddedFileSequence::readOneRandom(): Secondary Input file "
280  << fileNames()[0] << " contains no events.\n";
281  }
282  LogWarning("RootEmbeddedFileSequence") << "RootEmbeddedFileSequence::readOneRandom(): Secondary Input file "
283  << fileNames()[0] << " contains no events and will be skipped.\n";
284  ++badFilesSkipped_;
285  } else {
286  opened = true;
287  }
288  } else {
289  if (newSeqNumber != currentSeqNumber) {
290  ++badFilesSkipped_;
291  }
292  }
293  }
294  if (not opened) {
295  throw Exception(errors::FileOpenError) << "RootEmbeddedFileSequence::readOneRandom(): "
296  << " input file retries exhausted.\n";
297  }
298  rootFile()->setAtEventEntry(CLHEP::RandFlat::shootInt(engine, eventsRemainingInFile_) - 1);
299  }
300  rootFile()->nextEventEntry();
301 
302  auto [found, succeeded] = rootFile()->readCurrentEvent(cache);
303  if (!found) {
304  rootFile()->setAtEventEntry(0);
305  auto [found2, succeeded] = rootFile()->readCurrentEvent(cache);
306  assert(found2);
307  }
308  fileNameHash = lfnHash();
310  return true;
311  }
312 
314  size_t& fileNameHash,
315  CLHEP::HepRandomEngine* engine,
316  EventID const* idp,
317  bool recycleFiles) {
318  assert(engine);
319  assert(idp);
320  EventID const& id = *idp;
321  if (noMoreFiles() || !rootFile() || rootFile()->indexIntoFileIter().run() != id.run() ||
322  rootFile()->indexIntoFileIter().lumi() != id.luminosityBlock()) {
323  bool found = skipToItem(id.run(), id.luminosityBlock(), 0);
324  if (!found) {
325  return false;
326  }
327  int eventsInLumi = 0;
328  assert(rootFile());
329  while (rootFile()->setEntryAtNextEventInLumi(id.run(), id.luminosityBlock()))
330  ++eventsInLumi;
331  found = skipToItem(id.run(), id.luminosityBlock(), 0);
332  assert(found);
333  int eventInLumi = CLHEP::RandFlat::shootInt(engine, eventsInLumi);
334  for (int i = 0; i < eventInLumi; ++i) {
335  bool foundEventInLumi = rootFile()->setEntryAtNextEventInLumi(id.run(), id.luminosityBlock());
336  assert(foundEventInLumi);
337  }
338  }
339  assert(rootFile());
340  bool found = rootFile()->setEntryAtNextEventInLumi(id.run(), id.luminosityBlock());
341  if (found) {
342  auto [found2, succeeded] = rootFile()->readCurrentEvent(cache);
343  found = found2;
344  }
345  if (!found) {
346  found = rootFile()->setEntryAtItem(id.run(), id.luminosityBlock(), 0);
347  if (!found) {
348  return false;
349  }
350  return readOneRandomWithID(cache, fileNameHash, engine, idp, recycleFiles);
351  }
352  fileNameHash = lfnHash();
353  return true;
354  }
355 
357  size_t& fileNameHash,
358  CLHEP::HepRandomEngine* engine,
359  EventID const* id,
360  bool recycleFiles) {
361  assert(!sameLumiBlock_ || id != nullptr);
362  assert(sequential_ || engine != nullptr);
363  return (this->*fptr_)(cache, fileNameHash, engine, id, recycleFiles);
364  }
365 
367  desc.addUntracked<bool>("sequential", false)
368  ->setComment(
369  "True: loopEvents() reads events sequentially from beginning of first file.\n"
370  "False: loopEvents() first reads events beginning at random event. New files also chosen randomly");
371  desc.addUntracked<bool>("sameLumiBlock", false)
372  ->setComment(
373  "True: loopEvents() reads events only in same lumi as the specified event.\n"
374  "False: loopEvents() reads events regardless of lumi.");
375  desc.addUntracked<unsigned int>("skipEvents", 0U)
376  ->setComment(
377  "Skip the first 'skipEvents' events. Used only if 'sequential' is True and 'sameLumiBlock' is False");
378  desc.addUntracked<unsigned int>("maxFileSkips")
379  ->setComment(
380  "How many files to try if 'sequential' is False and 'skipBadFiles' is True.\n"
381  "Defaults to 3 (or # of files if smaller).");
382  desc.addUntracked<unsigned int>("cacheSize", roottree::defaultCacheSize)
383  ->setComment("Size of ROOT TTree prefetch cache. Affects performance.");
384  desc.addUntracked<bool>("enforceGUIDInFileName", false)
385  ->setComment(
386  "True: file name part is required to be equal to the GUID of the file\n"
387  "False: file name can be anything");
388  }
389 } // namespace edm
size
Write out results.
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
RootFileSharedPtr makeRootFile(std::shared_ptr< InputFile > filePtr) override
void initFile(bool skipBadFiles)
bool(RootEmbeddedFileSequence::* fptr_)(EventPrincipal &, size_t &, CLHEP::HepRandomEngine *, EventID const *, bool)
bool readOneRandomWithID(EventPrincipal &cache, size_t &fileNameHash, CLHEP::HepRandomEngine *, EventID const *id, bool)
void readOneSpecified(EventPrincipal &cache, size_t &fileNameHash, SecondaryEventIDAndFileInfo const &id)
void setAtFileSequenceNumber(size_t offset)
bool readOneSequentialWithID(EventPrincipal &cache, size_t &fileNameHash, CLHEP::HepRandomEngine *, EventID const *id, bool)
std::string const & logicalFileName() const
void initFile_(bool skipBadFiles) override
assert(be >=bs)
unsigned int const defaultCacheSize
Definition: RootTree.h:38
std::vector< std::string > const & fileNames() const
bool readOneRandom(EventPrincipal &cache, size_t &fileNameHash, CLHEP::HepRandomEngine *, EventID const *, bool)
static std::string const input
Definition: EdmProvDump.cc:50
ProductSelectorRules const & productSelectorRules() const
std::shared_ptr< RootFile > RootFileSharedPtr
bool readOneEvent(EventPrincipal &cache, size_t &fileNameHash, CLHEP::HepRandomEngine *, EventID const *id, bool recycleFiles)
bool readOneSequential(EventPrincipal &cache, size_t &fileNameHash, CLHEP::HepRandomEngine *, EventID const *, bool recycleFiles)
RunHelperBase * runHelper()
void initTheFile(bool skipBadFiles, bool deleteIndexIntoFile, InputSource *input, char const *inputTypeName, InputType inputType)
double f[11][100]
EventID const & min(EventID const &lh, EventID const &rh)
Definition: EventID.h:116
std::shared_ptr< RootFile const > rootFile() const
ProductRegistry & productRegistryUpdate()
unsigned int nStreams() const
bool skipToItem(RunNumber_t run, LuminosityBlockNumber_t lumi, EventNumber_t event, size_t fileNameHash=0U, bool currentFileFirst=true)
std::vector< FileCatalogItem > const & fileCatalogItems() const
static void fillDescription(ParameterSetDescription &desc)
static constexpr EntryNumber_t invalidEntry
bool skipToItemInNewFile(RunNumber_t run, LuminosityBlockNumber_t lumi, EventNumber_t event)
HLT enums.
def cache(function)
Definition: utilities.py:3
void updateFromInput(ProductList const &other)
std::vector< ProcessHistoryID > orderedProcessHistoryIDs_
void skipEntries(unsigned int offset)
bool isAvailable() const
Definition: Service.h:40
Log< level::Warning, false > LogWarning
RootEmbeddedFileSequence(ParameterSet const &pset, EmbeddedRootSource &input, InputFileCatalog const &catalog)
std::vector< std::shared_ptr< IndexIntoFile > > const & indexesIntoFiles() const