CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
RepeatingCachedRootSource.cc
Go to the documentation of this file.
1 // -*- C++ -*-
2 //
3 // Package: Subsystem/Package
4 // Class : RepeatingCachedRootSource
5 //
6 // Implementation:
7 // [Notes on implementation]
8 //
9 // Original Author: Chris Jones
10 // Created: Mon, 15 Mar 2021 19:02:31 GMT
11 //
12 
13 // system include files
14 #include <memory>
15 
16 // user include files
24 
26 
37 
39 
40 #include "RunHelper.h"
41 #include "RootFile.h"
42 #include "InputFile.h"
43 #include "DuplicateChecker.h"
44 
45 namespace edm {
46  class RunHelperBase;
47  class RCDelayedReader;
48 
50  public:
52 
53  static void fillDescriptions(ConfigurationDescriptions& descriptions);
54 
55  std::shared_ptr<WrapperBase> getProduct(unsigned int iStreamIndex,
56  BranchID const& k,
57  EDProductGetter const* ep) const;
58 
60  public:
61  std::shared_ptr<edm::WrapperBase> getProduct_(edm::BranchID const& k, edm::EDProductGetter const* ep) final {
62  return m_source->getProduct(m_streamIndex, k, ep);
63  }
64  void mergeReaders_(edm::DelayedReader*) final { assert(false); }
65  void reset_() final {}
66 
67  unsigned int m_streamIndex;
69 
71  const final {
72  return nullptr;
73  }
75  const final {
76  return nullptr;
77  }
78  };
79 
80  protected:
81  ItemType getNextItemType() override;
82  void readLuminosityBlock_(LuminosityBlockPrincipal& lumiPrincipal) override;
83  std::shared_ptr<LuminosityBlockAuxiliary> readLuminosityBlockAuxiliary_() override;
84  void readEvent_(EventPrincipal& eventPrincipal) override;
85 
86  private:
87  std::shared_ptr<RunAuxiliary> readRunAuxiliary_() override;
88  void readRun_(RunPrincipal& runPrincipal) override;
89  bool readIt(EventID const& id, EventPrincipal& eventPrincipal, StreamContext& streamContext) override;
90  void skip(int offset) override;
91  bool goToEvent_(EventID const& eventID) override;
92  void beginJob() override;
93 
94  void fillProcessBlockHelper_() override;
97 
98  std::unique_ptr<RootFile> makeRootFile(std::string const& logicalFileName,
99  std::string const& pName,
100  bool isSkipping,
101  std::shared_ptr<InputFile> filePtr,
102  std::shared_ptr<EventSkipperByID> skipper,
103  std::shared_ptr<DuplicateChecker> duplicateChecker,
104  std::vector<std::shared_ptr<IndexIntoFile>>& indexesIntoFiles);
105 
109  std::unique_ptr<RootFile> rootFile_;
110  std::vector<ProcessHistoryID> orderedProcessHistoryIDs_;
111  std::vector<std::vector<std::shared_ptr<edm::WrapperBase>>> cachedWrappers_;
112  std::vector<EventAuxiliary> eventAuxs_;
116  std::vector<RCDelayedReader> delayedReaders_; //one per stream
117  std::map<edm::BranchID, size_t> branchIDToWrapperIndex_;
118  std::vector<size_t> streamToCacheIndex_;
119  size_t nextEventIndex_ = 0;
121  unsigned long long eventIndex_ = 0;
122  };
123 } // namespace edm
124 
125 using namespace edm;
126 //
127 // constants, enums and typedefs
128 //
129 
130 //
131 // static data member definitions
132 //
133 
134 //
135 // constructors and destructor
136 //
138  : InputSource(pset, desc),
139  selectorRules_(pset, "inputCommands", "InputSource"),
140  runHelper_(std::make_unique<DefaultRunHelper>()),
141  cachedWrappers_(pset.getUntrackedParameter<unsigned int>("repeatNEvents")),
142  eventAuxs_(cachedWrappers_.size()),
143  provRetriever_(0),
144  delayedReaders_(desc.allocations_->numberOfStreams()),
145  streamToCacheIndex_(desc.allocations_->numberOfStreams(), 0) {
146  {
147  int index = 0;
148  std::for_each(delayedReaders_.begin(), delayedReaders_.end(), [&index, this](auto& iR) {
149  iR.m_streamIndex = index++;
150  iR.m_source = this;
151  });
152  }
153  auto logicalFileName = pset.getUntrackedParameter<std::string>("fileName");
154  InputFileCatalog catalog(std::vector<std::string>(1U, logicalFileName), "");
155  auto const& physicalFileName = catalog.fileCatalogItems().front().fileNames().front();
156  auto const nEventsToSkip = pset.getUntrackedParameter<unsigned int>("skipEvents");
157  std::shared_ptr<EventSkipperByID> skipper(EventSkipperByID::create(pset).release());
158 
159  auto duplicateChecker = std::make_shared<DuplicateChecker>(pset);
160 
161  std::vector<std::shared_ptr<IndexIntoFile>> indexesIntoFiles(1);
162 
163  auto input =
164  std::make_shared<InputFile>(physicalFileName.c_str(), " Initiating request to open file ", InputType::Primary);
166  logicalFileName, physicalFileName, 0 != nEventsToSkip, input, skipper, duplicateChecker, indexesIntoFiles);
167  rootFile_->reportOpened("repeating");
168 
169  auto const& prodList = rootFile_->productRegistry()->productList();
171 
172  //setup caching
173  auto nProdsInEvent =
174  std::count_if(prodList.begin(), prodList.end(), [](auto&& iV) { return iV.second.branchType() == edm::InEvent; });
175  {
176  size_t index = 0;
177  for (auto& prod : prodList) {
178  if (prod.second.branchType() == edm::InEvent) {
179  branchIDToWrapperIndex_[prod.second.branchID()] = index++;
180  }
181  }
182  }
183  for (auto& cache : cachedWrappers_) {
184  cache.resize(nProdsInEvent);
185  }
186 }
187 
191  processConfiguration.setProcessConfigurationID();
192 
193  //TODO: to make edm::Ref work we need to find a way to pass in a different EDProductGetter
194  EventPrincipal eventPrincipal(productRegistry(),
195  std::make_shared<BranchIDListHelper>(),
196  std::make_shared<ThinnedAssociationsHelper>(),
197  processConfiguration,
198  nullptr);
199 
200  {
201  RunNumber_t run = 0;
203  auto itAux = eventAuxs_.begin();
204  for (auto& cache : cachedWrappers_) {
205  rootFile_->nextEventEntry();
206  rootFile_->readCurrentEvent(eventPrincipal);
207  auto const& aux = eventPrincipal.aux();
208  *(itAux++) = aux;
209  if (0 == run) {
210  run = aux.run();
211  lumi = aux.luminosityBlock();
212  } else {
213  if (run != aux.run()) {
214  throw cms::Exception("EventsWithDifferentRuns") << "The requested events to cache are from different Runs";
215  }
216  if (lumi != aux.luminosityBlock()) {
217  throw cms::Exception("EventsWithDifferentLuminosityBlocks")
218  << "The requested events to cache are from different LuminosityBlocks";
219  }
220  }
221  selectionIDs_ = eventPrincipal.eventSelectionIDs();
222  branchListIndexes_ = eventPrincipal.branchListIndexes();
223  {
224  auto reader = eventPrincipal.reader();
225  for (auto const& branchToIndex : branchIDToWrapperIndex_) {
226  cache[branchToIndex.second] = reader->getProduct(branchToIndex.first, &eventPrincipal);
227  }
228  }
229  }
230  rootFile_->rewind();
231  }
232 }
233 
236  desc.setComment(
237  "Read only a few Events from one EDM/Root file, and repeat them in sequence. The Events are required to be from "
238  "the same Run and LuminosityBlock.");
239  desc.addUntracked<std::string>("fileName")->setComment("Name of file to be processed.");
240  desc.addUntracked<unsigned int>("repeatNEvents", 10U)
241  ->setComment("Number of events to read from file and then repeat in sequence.");
242  desc.addUntracked<unsigned int>("skipEvents", 0);
243  ProductSelectorRules::fillDescription(desc, "inputCommands");
245 
246  descriptions.add("source", desc);
247 }
248 
249 //
250 // member functions
251 //
252 
253 std::unique_ptr<RootFile> RepeatingCachedRootSource::makeRootFile(
254  std::string const& logicalFileName,
255  std::string const& pName,
256  bool isSkipping,
257  std::shared_ptr<InputFile> filePtr,
258  std::shared_ptr<EventSkipperByID> skipper,
259  std::shared_ptr<DuplicateChecker> duplicateChecker,
260  std::vector<std::shared_ptr<IndexIntoFile>>& indexesIntoFiles) {
261  return std::make_unique<RootFile>(pName,
263  logicalFileName,
264  filePtr,
265  skipper,
266  isSkipping,
267  remainingEvents(),
269  1,
270  roottree::defaultCacheSize, //treeCacheSize_,
271  -1, //treeMaxVirtualSize(),
272  processingMode(),
273  runHelper_,
274  false, //noRunLumiSort_
275  true, //noEventSort_,
279  processBlockHelper().get(),
281  nullptr, // associationsFromSecondary
282  duplicateChecker,
283  false, //dropDescendants(),
285  indexesIntoFiles,
286  0, //currentIndexIntoFile,
288  false, //bypassVersionCheck(),
289  true, //labelRawDataLikeMC(),
290  false, //usingGoToEvent_,
291  true, //enablePrefetching_,
292  false); //enforceGUIDInFileName_);
293 }
294 
295 std::shared_ptr<WrapperBase> RepeatingCachedRootSource::getProduct(unsigned int iStreamIndex,
296  BranchID const& k,
297  EDProductGetter const* ep) const {
298  return cachedWrappers_[streamToCacheIndex_[iStreamIndex]][branchIDToWrapperIndex_.find(k)->second];
299 }
300 
302  auto v = presentState_;
303  switch (presentState_) {
304  case IsFile:
306  break;
307  case IsRun:
309  break;
310  case IsLumi:
312  break;
313  default:
314  break;
315  }
316  return v;
317 }
318 
320  return rootFile_->readLuminosityBlock_(lumiPrincipal);
321 }
322 
323 std::shared_ptr<LuminosityBlockAuxiliary> RepeatingCachedRootSource::readLuminosityBlockAuxiliary_() {
324  return rootFile_->readLuminosityBlockAuxiliary_();
325 }
327  auto index = eventIndex_++;
328 
329  auto repeatedIndex = index % cachedWrappers_.size();
330 
331  auto const& aux = eventAuxs_[repeatedIndex];
332 
333  auto history = processHistoryRegistry().getMapped(aux.processHistoryID());
334 
335  streamToCacheIndex_[eventPrincipal.streamID().value()] = repeatedIndex;
336  eventPrincipal.fillEventPrincipal(aux,
337  history,
342  &delayedReaders_[eventPrincipal.streamID().value()]);
343 }
344 
345 std::shared_ptr<RunAuxiliary> RepeatingCachedRootSource::readRunAuxiliary_() {
346  return rootFile_->readRunAuxiliary_();
347  ;
348 }
349 
350 void RepeatingCachedRootSource::readRun_(RunPrincipal& runPrincipal) { rootFile_->readRun_(runPrincipal); }
351 
353  EventPrincipal& eventPrincipal,
354  StreamContext& streamContext) {
355  return false;
356 }
357 
359 
360 bool RepeatingCachedRootSource::goToEvent_(EventID const& eventID) { return false; }
361 
362 void RepeatingCachedRootSource::fillProcessBlockHelper_() { rootFile_->fillProcessBlockHelper_(); }
363 
365  return rootFile_->nextProcessBlock_(processBlockPrincipal);
366 }
367 
369  rootFile_->readProcessBlock_(processBlockPrincipal);
370 }
371 
372 //
373 // const member functions
374 //
375 
376 //
377 // static member functions
378 //
379 
ProcessHistoryRegistry const & processHistoryRegistry() const
Accessors for process history registry.
Definition: InputSource.h:140
std::vector< RCDelayedReader > delayedReaders_
T getUntrackedParameter(std::string const &, T const &) const
void beginJob() override
Begin protected makes it easier to do template programming.
EventSelectionIDVector const & eventSelectionIDs() const
ProcessConfigurationID setProcessConfigurationID()
void readEvent_(EventPrincipal &eventPrincipal) override
std::shared_ptr< ProcessBlockHelper const > processBlockHelper() const
Accessors for processBlockHelper.
Definition: InputSource.h:150
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:330
bool getMapped(ProcessHistoryID const &key, ProcessHistory &value) const
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
std::map< edm::BranchID, size_t > branchIDToWrapperIndex_
void readRun_(RunPrincipal &runPrincipal) override
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> const * postEventReadFromSourceSignal() const final
static std::unique_ptr< EventSkipperByID > create(ParameterSet const &pset)
ProcessingMode processingMode() const
RunsLumisAndEvents (default), RunsAndLumis, or Runs.
Definition: InputSource.h:224
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:457
std::unique_ptr< RootFile > rootFile_
void setParameterSetID(ParameterSetID const &pSetID)
static void fillDescription(ParameterSetDescription &desc, char const *parameterName, std::vector< std::string > const &defaultStrings=defaultSelectionStrings())
void readProcessBlock_(ProcessBlockPrincipal &) override
assert(be >=bs)
unsigned int const defaultCacheSize
Definition: RootTree.h:38
BranchListIndexes const & branchListIndexes() const
unsigned int LuminosityBlockNumber_t
bool readIt(EventID const &id, EventPrincipal &eventPrincipal, StreamContext &streamContext) override
std::vector< EventSelectionID > EventSelectionIDVector
static std::string const input
Definition: EdmProvDump.cc:47
void setComment(std::string const &value)
std::vector< BranchListIndex > BranchListIndexes
int remainingEvents() const
Definition: InputSource.h:178
edm::RepeatingCachedRootSource const * m_source
ProductProvenanceRetriever provRetriever_
#define DEFINE_FWK_INPUT_SOURCE(type)
std::vector< ProcessHistoryID > orderedProcessHistoryIDs_
std::shared_ptr< RunAuxiliary > readRunAuxiliary_() override
bool nextProcessBlock_(ProcessBlockPrincipal &) override
StreamID streamID() const
tuple reader
Definition: DQM.py:105
std::shared_ptr< LuminosityBlockAuxiliary > readLuminosityBlockAuxiliary_() override
list lumi
Definition: dqmdumpme.py:53
void readLuminosityBlock_(LuminosityBlockPrincipal &lumiPrincipal) override
edm::propagate_const< std::unique_ptr< RunHelperBase > > runHelper_
std::shared_ptr< edm::WrapperBase > getProduct_(edm::BranchID const &k, edm::EDProductGetter const *ep) final
DelayedReader * reader() const
Definition: Principal.h:187
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
Accessors for branchIDListHelper.
Definition: InputSource.h:144
unsigned int value() const
Definition: StreamID.h:43
std::shared_ptr< WrapperBase > getProduct(unsigned int iStreamIndex, BranchID const &k, EDProductGetter const *ep) const
std::vector< std::vector< std::shared_ptr< edm::WrapperBase > > > cachedWrappers_
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> const * preEventReadFromSourceSignal() const final
void add(std::string const &label, ParameterSetDescription const &psetDescription)
RepeatingCachedRootSource(ParameterSet const &pset, InputSourceDescription const &desc)
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:331
void fillEventPrincipal(EventAuxiliary const &aux, ProcessHistory const *processHistory, DelayedReader *reader=nullptr)
std::shared_ptr< ProductRegistry const > productRegistry() const
Accessors for product registry.
Definition: InputSource.h:136
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
Accessors for thinnedAssociationsHelper.
Definition: InputSource.h:156
void updateFromInput(ProductList const &other)
static void fillDescription(ParameterSetDescription &desc)
Definition: InputSource.cc:117
int remainingLuminosityBlocks() const
Definition: InputSource.h:186
unsigned int RunNumber_t
std::unique_ptr< RootFile > makeRootFile(std::string const &logicalFileName, std::string const &pName, bool isSkipping, std::shared_ptr< InputFile > filePtr, std::shared_ptr< EventSkipperByID > skipper, std::shared_ptr< DuplicateChecker > duplicateChecker, std::vector< std::shared_ptr< IndexIntoFile >> &indexesIntoFiles)
std::vector< EventAuxiliary > eventAuxs_
ProcessConfiguration const & processConfiguration() const
Accessor for Process Configuration.
Definition: InputSource.h:192
static ParameterSetID emptyParameterSetID()
Definition: ParameterSet.cc:94
EventAuxiliary const & aux() const
tuple size
Write out results.
bool goToEvent_(EventID const &eventID) override
def cache
Definition: utilities.py:3
static void fillDescriptions(ConfigurationDescriptions &descriptions)