CMS 3D CMS Logo

RepeatingCachedRootSource.cc
Go to the documentation of this file.
1 // -*- C++ -*-
2 //
3 // Package: Subsystem/Package
4 // Class : RepeatingCachedRootSource
5 //
6 // Implementation:
7 // [Notes on implementation]
8 //
9 // Original Author: Chris Jones
10 // Created: Mon, 15 Mar 2021 19:02:31 GMT
11 //
12 
13 // system include files
14 #include <memory>
15 
16 // user include files
25 
27 
38 
40 
41 #include "RunHelper.h"
42 #include "RootFile.h"
43 #include "InputFile.h"
44 #include "DuplicateChecker.h"
45 
46 namespace edm {
47  class RunHelperBase;
48 
50  public:
52 
53  static void fillDescriptions(ConfigurationDescriptions& descriptions);
54 
55  std::shared_ptr<WrapperBase> getProduct(unsigned int iStreamIndex,
56  BranchID const& k,
57  EDProductGetter const* ep) const;
58 
60  public:
61  RCProductGetter(RCProductGetter const& iOther) : map_(iOther.map_), wrappers_(iOther.wrappers_) {}
62 
63  RCProductGetter const& operator=(RCProductGetter const& iOther) {
64  map_ = iOther.map_;
65  wrappers_ = iOther.wrappers_;
66  return *this;
67  }
68 
69  RCProductGetter(std::map<edm::ProductID, size_t> const* iMap,
70  std::vector<std::shared_ptr<edm::WrapperBase>> const* iWrappers)
71  : map_(iMap), wrappers_(iWrappers) {}
72 
73  WrapperBase const* getIt(ProductID const&) const override;
74 
75  std::optional<std::tuple<WrapperBase const*, unsigned int>> getThinnedProduct(ProductID const&,
76  unsigned int key) const override;
77 
78  void getThinnedProducts(ProductID const& pid,
79  std::vector<WrapperBase const*>& foundContainers,
80  std::vector<unsigned int>& keys) const override;
81 
83  unsigned int key,
84  ProductID const& thinned) const override;
85 
86  private:
87  unsigned int transitionIndex_() const override;
88 
89  std::map<edm::ProductID, size_t> const* map_;
90  std::vector<std::shared_ptr<edm::WrapperBase>> const* wrappers_;
91  };
92 
94  public:
95  std::shared_ptr<edm::WrapperBase> getProduct_(edm::BranchID const& k, edm::EDProductGetter const* ep) final {
97  }
98  void mergeReaders_(edm::DelayedReader*) final { assert(false); }
99  void reset_() final {}
100 
101  unsigned int m_streamIndex;
103 
105  const final {
106  return nullptr;
107  }
109  const final {
110  return nullptr;
111  }
112  };
113 
114  protected:
115  ItemTypeInfo getNextItemType() override;
116  void readLuminosityBlock_(LuminosityBlockPrincipal& lumiPrincipal) override;
117  std::shared_ptr<LuminosityBlockAuxiliary> readLuminosityBlockAuxiliary_() override;
118  void readEvent_(EventPrincipal& eventPrincipal) override;
119 
120  private:
121  std::shared_ptr<RunAuxiliary> readRunAuxiliary_() override;
122  void readRun_(RunPrincipal& runPrincipal) override;
123  bool readIt(EventID const& id, EventPrincipal& eventPrincipal, StreamContext& streamContext) override;
124  void skip(int offset) override;
125  bool goToEvent_(EventID const& eventID) override;
126  void beginJob() override;
127 
128  void fillProcessBlockHelper_() override;
131 
132  std::unique_ptr<RootFile> makeRootFile(std::string const& logicalFileName,
133  std::string const& pName,
134  bool isSkipping,
135  std::shared_ptr<InputFile> filePtr,
136  std::shared_ptr<EventSkipperByID> skipper,
137  std::shared_ptr<DuplicateChecker> duplicateChecker,
138  std::vector<std::shared_ptr<IndexIntoFile>>& indexesIntoFiles);
139 
143  std::unique_ptr<RootFile> rootFile_;
144  std::vector<ProcessHistoryID> orderedProcessHistoryIDs_;
145  std::vector<std::vector<std::shared_ptr<edm::WrapperBase>>> cachedWrappers_;
146  std::vector<RCProductGetter> getters_; //one per cached event
147  std::vector<EventAuxiliary> eventAuxs_;
151  std::vector<RCDelayedReader> delayedReaders_; //one per stream
152  std::map<edm::BranchID, size_t> branchIDToWrapperIndex_;
153  std::map<edm::ProductID, size_t> productIDToWrapperIndex_;
154  std::vector<size_t> streamToCacheIndex_;
155  size_t nextEventIndex_ = 0;
157  unsigned long long eventIndex_ = 0;
158  };
159 } // namespace edm
160 
161 using namespace edm;
162 //
163 // constants, enums and typedefs
164 //
165 
166 //
167 // static data member definitions
168 //
169 
170 //
171 // constructors and destructor
172 //
174  : InputSource(pset, desc),
175  selectorRules_(pset, "inputCommands", "InputSource"),
176  runHelper_(std::make_unique<DefaultRunHelper>()),
177  cachedWrappers_(pset.getUntrackedParameter<unsigned int>("repeatNEvents")),
178  eventAuxs_(cachedWrappers_.size()),
179  provRetriever_(0),
180  delayedReaders_(desc.allocations_->numberOfStreams()),
181  streamToCacheIndex_(desc.allocations_->numberOfStreams(), 0) {
182  {
183  getters_.reserve(cachedWrappers_.size());
184  for (auto& cw : cachedWrappers_) {
185  getters_.emplace_back(&productIDToWrapperIndex_, &cw);
186  }
187 
188  int index = 0;
189  std::for_each(delayedReaders_.begin(), delayedReaders_.end(), [&index, this](auto& iR) {
190  iR.m_streamIndex = index++;
191  iR.m_source = this;
192  });
193  }
194  auto logicalFileName = pset.getUntrackedParameter<std::string>("fileName");
195  InputFileCatalog catalog(std::vector<std::string>(1U, logicalFileName), "");
196  auto const& physicalFileName = catalog.fileCatalogItems().front().fileNames().front();
197  auto const nEventsToSkip = pset.getUntrackedParameter<unsigned int>("skipEvents");
198  std::shared_ptr<EventSkipperByID> skipper(EventSkipperByID::create(pset).release());
199 
200  auto duplicateChecker = std::make_shared<DuplicateChecker>(pset);
201 
202  std::vector<std::shared_ptr<IndexIntoFile>> indexesIntoFiles(1);
203 
204  auto input =
205  std::make_shared<InputFile>(physicalFileName.c_str(), " Initiating request to open file ", InputType::Primary);
207  logicalFileName, physicalFileName, 0 != nEventsToSkip, input, skipper, duplicateChecker, indexesIntoFiles);
208  rootFile_->reportOpened("repeating");
209 
210  auto const& prodList = rootFile_->productRegistry()->productList();
212 
213  //setup caching
214  auto nProdsInEvent =
215  std::count_if(prodList.begin(), prodList.end(), [](auto&& iV) { return iV.second.branchType() == edm::InEvent; });
216  {
217  size_t index = 0;
218  for (auto& prod : prodList) {
219  if (prod.second.branchType() == edm::InEvent) {
220  branchIDToWrapperIndex_[prod.second.branchID()] = index++;
221  }
222  }
223  }
224  for (auto& cache : cachedWrappers_) {
225  cache.resize(nProdsInEvent);
226  }
227 }
228 
233 
234  //Thinned collection associations are not supported at this time
235  EventPrincipal eventPrincipal(productRegistry(),
237  std::make_shared<ThinnedAssociationsHelper>(),
239  nullptr);
240  {
241  RunNumber_t run = 0;
243  auto itAux = eventAuxs_.begin();
244  auto itGetter = getters_.begin();
245  for (auto& cache : cachedWrappers_) {
246  rootFile_->nextEventEntry();
247  rootFile_->readCurrentEvent(eventPrincipal);
248  auto const& aux = eventPrincipal.aux();
249  *(itAux++) = aux;
250  if (0 == run) {
251  run = aux.run();
252  lumi = aux.luminosityBlock();
253  } else {
254  if (run != aux.run()) {
255  throw cms::Exception("EventsWithDifferentRuns") << "The requested events to cache are from different Runs";
256  }
257  if (lumi != aux.luminosityBlock()) {
258  throw cms::Exception("EventsWithDifferentLuminosityBlocks")
259  << "The requested events to cache are from different LuminosityBlocks";
260  }
261  }
262  selectionIDs_ = eventPrincipal.eventSelectionIDs();
263  branchListIndexes_ = eventPrincipal.branchListIndexes();
264  {
265  auto reader = eventPrincipal.reader();
266  auto& getter = *(itGetter++);
267  for (auto const& branchToIndex : branchIDToWrapperIndex_) {
268  cache[branchToIndex.second] = reader->getProduct(branchToIndex.first, &getter);
269  }
270  }
271  }
272  for (auto const& branchToIndex : branchIDToWrapperIndex_) {
273  auto pid = eventPrincipal.branchIDToProductID(branchToIndex.first);
274  productIDToWrapperIndex_[pid] = branchToIndex.second;
275  }
276  rootFile_->rewind();
277  }
278 }
279 
282  desc.setComment(
283  "Read only a few Events from one EDM/Root file, and repeat them in sequence. The Events are required to be from "
284  "the same Run and LuminosityBlock.");
285  desc.addUntracked<std::string>("fileName")->setComment("Name of file to be processed.");
286  desc.addUntracked<unsigned int>("repeatNEvents", 10U)
287  ->setComment("Number of events to read from file and then repeat in sequence.");
288  desc.addUntracked<unsigned int>("skipEvents", 0);
291 
292  descriptions.add("source", desc);
293 }
294 
295 //
296 // member functions
297 //
298 
299 std::unique_ptr<RootFile> RepeatingCachedRootSource::makeRootFile(
300  std::string const& logicalFileName,
301  std::string const& pName,
302  bool isSkipping,
303  std::shared_ptr<InputFile> filePtr,
304  std::shared_ptr<EventSkipperByID> skipper,
305  std::shared_ptr<DuplicateChecker> duplicateChecker,
306  std::vector<std::shared_ptr<IndexIntoFile>>& indexesIntoFiles) {
307  return std::make_unique<RootFile>(pName,
309  logicalFileName,
310  filePtr,
311  skipper,
312  isSkipping,
313  remainingEvents(),
315  1,
316  roottree::defaultCacheSize, //treeCacheSize_,
317  -1, //treeMaxVirtualSize(),
318  processingMode(),
319  runHelper_,
320  false, //noRunLumiSort_
321  true, //noEventSort_,
325  processBlockHelper().get(),
327  nullptr, // associationsFromSecondary
328  duplicateChecker,
329  false, //dropDescendants(),
331  indexesIntoFiles,
332  0, //currentIndexIntoFile,
334  false, //bypassVersionCheck(),
335  true, //labelRawDataLikeMC(),
336  false, //usingGoToEvent_,
337  true, //enablePrefetching_,
338  false); //enforceGUIDInFileName_);
339 }
340 
341 std::shared_ptr<WrapperBase> RepeatingCachedRootSource::getProduct(unsigned int iStreamIndex,
342  BranchID const& k,
343  EDProductGetter const* ep) const {
344  return cachedWrappers_[streamToCacheIndex_[iStreamIndex]][branchIDToWrapperIndex_.find(k)->second];
345 }
346 
348  auto v = presentState_;
349  switch (presentState_) {
350  case ItemType::IsFile:
352  break;
353  case ItemType::IsRun:
355  break;
356  case ItemType::IsLumi:
358  break;
359  default:
360  break;
361  }
362  return v;
363 }
364 
366  rootFile_->readLuminosityBlock_(lumiPrincipal);
367 }
368 
369 std::shared_ptr<LuminosityBlockAuxiliary> RepeatingCachedRootSource::readLuminosityBlockAuxiliary_() {
370  return rootFile_->readLuminosityBlockAuxiliary_();
371 }
373  auto index = eventIndex_++;
374 
375  auto repeatedIndex = index % cachedWrappers_.size();
376 
377  auto const& aux = eventAuxs_[repeatedIndex];
378 
379  auto history = processHistoryRegistry().getMapped(aux.processHistoryID());
380 
381  streamToCacheIndex_[eventPrincipal.streamID().value()] = repeatedIndex;
382  eventPrincipal.fillEventPrincipal(aux,
383  history,
388  &delayedReaders_[eventPrincipal.streamID().value()]);
389 }
390 
391 std::shared_ptr<RunAuxiliary> RepeatingCachedRootSource::readRunAuxiliary_() {
392  return rootFile_->readRunAuxiliary_();
393  ;
394 }
395 
396 void RepeatingCachedRootSource::readRun_(RunPrincipal& runPrincipal) { rootFile_->readRun_(runPrincipal); }
397 
399  EventPrincipal& eventPrincipal,
400  StreamContext& streamContext) {
401  return false;
402 }
403 
405 
406 bool RepeatingCachedRootSource::goToEvent_(EventID const& eventID) { return false; }
407 
408 void RepeatingCachedRootSource::fillProcessBlockHelper_() { rootFile_->fillProcessBlockHelper_(); }
409 
411  return rootFile_->nextProcessBlock_(processBlockPrincipal);
412 }
413 
415  rootFile_->readProcessBlock_(processBlockPrincipal);
416 }
417 
419  auto itFound = map_->find(iPID);
420  if (itFound == map_->end()) {
421  return nullptr;
422  }
423  return (*wrappers_)[itFound->second].get();
424 }
425 
426 std::optional<std::tuple<WrapperBase const*, unsigned int>>
428  return {};
429 };
430 
432  std::vector<WrapperBase const*>& foundContainers,
433  std::vector<unsigned int>& keys) const {}
434 
436  unsigned int key,
437  ProductID const& thinned) const {
438  return {};
439 }
441 
442 //
443 // const member functions
444 //
445 
446 //
447 // static member functions
448 //
449 
std::vector< RCDelayedReader > delayedReaders_
std::map< edm::ProductID, size_t > productIDToWrapperIndex_
void beginJob() override
Begin protected makes it easier to do template programming.
ProcessConfigurationID setProcessConfigurationID()
void readEvent_(EventPrincipal &eventPrincipal) override
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:359
EventAuxiliary const & aux() const
std::map< edm::BranchID, size_t > branchIDToWrapperIndex_
std::variant< unsigned int, detail::GetThinnedKeyFromExceptionFactory, std::monostate > OptionalThinnedKey
void readRun_(RunPrincipal &runPrincipal) override
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> const * postEventReadFromSourceSignal() const final
static std::unique_ptr< EventSkipperByID > create(ParameterSet const &pset)
int remainingEvents() const
Definition: InputSource.h:209
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:458
std::unique_ptr< RootFile > rootFile_
void getThinnedProducts(ProductID const &pid, std::vector< WrapperBase const *> &foundContainers, std::vector< unsigned int > &keys) const override
void setParameterSetID(ParameterSetID const &pSetID)
static void fillDescription(ParameterSetDescription &desc, char const *parameterName, std::vector< std::string > const &defaultStrings=defaultSelectionStrings())
void readProcessBlock_(ProcessBlockPrincipal &) override
std::shared_ptr< ProcessBlockHelper const > processBlockHelper() const
Accessors for processBlockHelper.
Definition: InputSource.h:178
StreamID streamID() const
assert(be >=bs)
unsigned int const defaultCacheSize
Definition: RootTree.h:38
unsigned int LuminosityBlockNumber_t
bool readIt(EventID const &id, EventPrincipal &eventPrincipal, StreamContext &streamContext) override
std::vector< RCProductGetter > getters_
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
Accessors for thinnedAssociationsHelper.
Definition: InputSource.h:184
std::vector< EventSelectionID > EventSelectionIDVector
static std::string const input
Definition: EdmProvDump.cc:50
OptionalThinnedKey getThinnedKeyFrom(ProductID const &parent, unsigned int key, ProductID const &thinned) const override
ProductID branchIDToProductID(BranchID const &bid) const
std::vector< BranchListIndex > BranchListIndexes
RCProductGetter(std::map< edm::ProductID, size_t > const *iMap, std::vector< std::shared_ptr< edm::WrapperBase >> const *iWrappers)
ProductProvenanceRetriever provRetriever_
#define DEFINE_FWK_INPUT_SOURCE(type)
std::optional< std::tuple< WrapperBase const *, unsigned int > > getThinnedProduct(ProductID const &, unsigned int key) const override
std::vector< ProcessHistoryID > orderedProcessHistoryIDs_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
Accessors for branchIDListHelper.
Definition: InputSource.h:172
WrapperBase const * getIt(ProductID const &) const override
std::shared_ptr< RunAuxiliary > readRunAuxiliary_() override
bool nextProcessBlock_(ProcessBlockPrincipal &) override
key
prepare the HTCondor submission files and eventually submit them
std::shared_ptr< LuminosityBlockAuxiliary > readLuminosityBlockAuxiliary_() override
std::shared_ptr< WrapperBase > getProduct(unsigned int iStreamIndex, BranchID const &k, EDProductGetter const *ep) const
ProcessConfiguration const & processConfiguration() const
Accessor for Process Configuration.
Definition: InputSource.h:223
void readLuminosityBlock_(LuminosityBlockPrincipal &lumiPrincipal) override
edm::propagate_const< std::unique_ptr< RunHelperBase > > runHelper_
ProcessHistoryRegistry const & processHistoryRegistry() const
Accessors for process history registry.
Definition: InputSource.h:168
std::shared_ptr< edm::WrapperBase > getProduct_(edm::BranchID const &k, edm::EDProductGetter const *ep) final
std::vector< std::shared_ptr< edm::WrapperBase > > const * wrappers_
std::vector< std::vector< std::shared_ptr< edm::WrapperBase > > > cachedWrappers_
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> const * preEventReadFromSourceSignal() const final
std::shared_ptr< ProductRegistry const > productRegistry() const
Accessors for product registry.
Definition: InputSource.h:165
void add(std::string const &label, ParameterSetDescription const &psetDescription)
RepeatingCachedRootSource(ParameterSet const &pset, InputSourceDescription const &desc)
bool getMapped(ProcessHistoryID const &key, ProcessHistory &value) const
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:360
void fillEventPrincipal(EventAuxiliary const &aux, ProcessHistory const *processHistory, DelayedReader *reader=nullptr)
int remainingLuminosityBlocks() const
Definition: InputSource.h:217
HLT enums.
def cache(function)
Definition: utilities.py:3
RCProductGetter const & operator=(RCProductGetter const &iOther)
void updateFromInput(ProductList const &other)
EventSelectionIDVector const & eventSelectionIDs() const
static void fillDescription(ParameterSetDescription &desc)
Definition: InputSource.cc:117
unsigned int RunNumber_t
unsigned int value() const
Definition: StreamID.h:43
std::unique_ptr< RootFile > makeRootFile(std::string const &logicalFileName, std::string const &pName, bool isSkipping, std::shared_ptr< InputFile > filePtr, std::shared_ptr< EventSkipperByID > skipper, std::shared_ptr< DuplicateChecker > duplicateChecker, std::vector< std::shared_ptr< IndexIntoFile >> &indexesIntoFiles)
std::map< edm::ProductID, size_t > const * map_
std::vector< EventAuxiliary > eventAuxs_
BranchListIndexes const & branchListIndexes() const
static ParameterSetID emptyParameterSetID()
Definition: ParameterSet.cc:94
DelayedReader * reader() const
Definition: Principal.h:181
bool goToEvent_(EventID const &eventID) override
ProcessingMode processingMode() const
RunsLumisAndEvents (default), RunsAndLumis, or Runs.
Definition: InputSource.h:255
static void fillDescriptions(ConfigurationDescriptions &descriptions)