CMS 3D CMS Logo

PoolSource.cc
Go to the documentation of this file.
1 /*----------------------------------------------------------------------
2 ----------------------------------------------------------------------*/
3 #include "PoolSource.h"
4 #include "InputFile.h"
7 #include "RunHelper.h"
26 
27 #include <set>
28 
29 namespace edm {
30 
31  class BranchID;
32  class LuminosityBlockID;
33  class EventID;
34  class ThinnedAssociationsHelper;
35 
36  namespace {
37  void checkHistoryConsistency(Principal const& primary, Principal const& secondary) {
38  ProcessHistory const& ph1 = primary.processHistory();
39  ProcessHistory const& ph2 = secondary.processHistory();
40  if (ph1 != ph2 && !isAncestor(ph2, ph1)) {
41  throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency")
42  << "The secondary file is not an ancestor of the primary file\n";
43  }
44  }
45  void checkConsistency(EventPrincipal const& primary, EventPrincipal const& secondary) {
46  if (!isSameEvent(primary, secondary)) {
47  throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency")
48  << primary.id() << " has inconsistent EventAuxiliary data in the primary and secondary file\n";
49  }
50  }
51  void checkConsistency(LuminosityBlockAuxiliary const& primary, LuminosityBlockAuxiliary const& secondary) {
52  if (primary.id() != secondary.id()) {
53  throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency")
54  << primary.id() << " has inconsistent LuminosityBlockAuxiliary data in the primary and secondary file\n";
55  }
56  }
57  void checkConsistency(RunAuxiliary const& primary, RunAuxiliary const& secondary) {
58  if (primary.id() != secondary.id()) {
59  throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency")
60  << primary.id() << " has inconsistent RunAuxiliary data in the primary and secondary file\n";
61  }
62  }
63  } // namespace
64 
66  : InputSource(pset, desc),
67  rootServiceChecker_(),
68  catalog_(pset.getUntrackedParameter<std::vector<std::string> >("fileNames"),
69  pset.getUntrackedParameter<std::string>("overrideCatalog", std::string())),
70  secondaryCatalog_(
71  pset.getUntrackedParameter<std::vector<std::string> >("secondaryFileNames", std::vector<std::string>()),
72  pset.getUntrackedParameter<std::string>("overrideCatalog", std::string())),
73  secondaryRunPrincipal_(),
74  secondaryLumiPrincipal_(),
75  secondaryEventPrincipals_(),
76  branchIDsToReplace_(),
77  nStreams_(desc.allocations_->numberOfStreams()),
78  skipBadFiles_(pset.getUntrackedParameter<bool>("skipBadFiles")),
79  bypassVersionCheck_(pset.getUntrackedParameter<bool>("bypassVersionCheck")),
80  treeMaxVirtualSize_(pset.getUntrackedParameter<int>("treeMaxVirtualSize")),
81  productSelectorRules_(pset, "inputCommands", "InputSource"),
82  dropDescendants_(pset.getUntrackedParameter<bool>("dropDescendantsOfDroppedBranches")),
83  labelRawDataLikeMC_(pset.getUntrackedParameter<bool>("labelRawDataLikeMC")),
84  delayReadingEventProducts_(pset.getUntrackedParameter<bool>("delayReadingEventProducts")),
85  runHelper_(makeRunHelper(pset)),
86  resourceSharedWithDelayedReaderPtr_(),
87  // Note: primaryFileSequence_ and secondaryFileSequence_ need to be initialized last, because they use data members
88  // initialized previously in their own initialization.
89  primaryFileSequence_(new RootPrimaryFileSequence(pset, *this, catalog_)),
90  secondaryFileSequence_(
91  secondaryCatalog_.empty() ? nullptr : new RootSecondaryFileSequence(pset, *this, secondaryCatalog_)) {
93  resourceSharedWithDelayedReaderPtr_ = std::make_unique<SharedResourcesAcquirer>(std::move(resources.first));
95 
96  if (secondaryCatalog_.empty() && pset.getUntrackedParameter<bool>("needSecondaryFileNames", false)) {
97  throw Exception(errors::Configuration, "PoolSource") << "'secondaryFileNames' must be specified\n";
98  }
101  for (unsigned int index = 0; index < nStreams_; ++index) {
102  secondaryEventPrincipals_.emplace_back(new EventPrincipal(secondaryFileSequence_->fileProductRegistry(),
103  secondaryFileSequence_->fileBranchIDListHelper(),
104  std::make_shared<ThinnedAssociationsHelper const>(),
106  nullptr,
107  index));
108  }
109  std::array<std::set<BranchID>, NumBranchTypes> idsToReplace;
110  ProductRegistry::ProductList const& secondary = secondaryFileSequence_->fileProductRegistry()->productList();
111  ProductRegistry::ProductList const& primary = primaryFileSequence_->fileProductRegistry()->productList();
112  std::set<BranchID> associationsFromSecondary;
113  //this is the registry used by the 'outside' world and only has the primary file information in it at present
115  for (auto const& item : secondary) {
116  if (item.second.present()) {
117  idsToReplace[item.second.branchType()].insert(item.second.branchID());
118  if (item.second.branchType() == InEvent && item.second.unwrappedType() == typeid(ThinnedAssociation)) {
119  associationsFromSecondary.insert(item.second.branchID());
120  }
121  //now make sure this is marked as not dropped else the product will not be 'get'table from the Event
122  auto itFound = fullList.find(item.first);
123  if (itFound != fullList.end()) {
124  // If the branch in primary file was dropped, need to initilize the dictionary information
125  if (itFound->second.dropped()) {
126  itFound->second.initFromDictionary();
127  }
128  itFound->second.setDropped(false);
129  }
130  }
131  }
132  for (auto const& item : primary) {
133  if (item.second.present()) {
134  idsToReplace[item.second.branchType()].erase(item.second.branchID());
135  associationsFromSecondary.erase(item.second.branchID());
136  }
137  }
138  if (idsToReplace[InEvent].empty() && idsToReplace[InLumi].empty() && idsToReplace[InRun].empty()) {
139  secondaryFileSequence_ = nullptr; // propagate_const<T> has no reset() function
140  } else {
141  for (int i = InEvent; i < NumBranchTypes; ++i) {
142  branchIDsToReplace_[i].reserve(idsToReplace[i].size());
143  for (auto const& id : idsToReplace[i]) {
144  branchIDsToReplace_[i].push_back(id);
145  }
146  }
147  secondaryFileSequence_->initAssociationsFromSecondary(associationsFromSecondary);
148  }
149  }
150  }
151 
153 
156  secondaryFileSequence_->endJob();
157  primaryFileSequence_->endJob();
159  }
160 
161  std::shared_ptr<FileBlock> PoolSource::readFile_() {
162  std::shared_ptr<FileBlock> fb = primaryFileSequence_->readFile_();
164  fb->setNotFastClonable(FileBlock::HasSecondaryFileSequence);
165  }
166  return fb;
167  }
168 
170 
171  std::shared_ptr<RunAuxiliary> PoolSource::readRunAuxiliary_() { return primaryFileSequence_->readRunAuxiliary_(); }
172 
173  std::shared_ptr<LuminosityBlockAuxiliary> PoolSource::readLuminosityBlockAuxiliary_() {
174  return primaryFileSequence_->readLuminosityBlockAuxiliary_();
175  }
176 
177  void PoolSource::fillProcessBlockHelper_() { primaryFileSequence_->fillProcessBlockHelper_(); }
178 
180  return primaryFileSequence_->nextProcessBlock_(processBlockPrincipal);
181  }
182 
184  primaryFileSequence_->readProcessBlock_(processBlockPrincipal);
185  }
186 
187  void PoolSource::readRun_(RunPrincipal& runPrincipal) {
188  bool shouldWeProcessRun = primaryFileSequence_->readRun_(runPrincipal);
189  if (secondaryFileSequence_ && shouldWeProcessRun && !branchIDsToReplace_[InRun].empty()) {
190  bool found = secondaryFileSequence_->skipToItem(runPrincipal.run(), 0U, 0U);
191  if (found) {
192  std::shared_ptr<RunAuxiliary> secondaryAuxiliary = secondaryFileSequence_->readRunAuxiliary_();
193  checkConsistency(runPrincipal.aux(), *secondaryAuxiliary);
194  secondaryRunPrincipal_ = std::make_shared<RunPrincipal>(
195  secondaryFileSequence_->fileProductRegistry(), processConfiguration(), nullptr, runPrincipal.index());
196  secondaryRunPrincipal_->setAux(*secondaryAuxiliary);
198  checkHistoryConsistency(runPrincipal, *secondaryRunPrincipal_);
200  } else {
201  throw Exception(errors::MismatchedInputFiles, "PoolSource::readRun_")
202  << " Run " << runPrincipal.run() << " is not found in the secondary input files\n";
203  }
204  }
205  }
206 
208  bool shouldWeProcessLumi = primaryFileSequence_->readLuminosityBlock_(lumiPrincipal);
209  if (secondaryFileSequence_ && shouldWeProcessLumi && !branchIDsToReplace_[InLumi].empty()) {
210  bool found = secondaryFileSequence_->skipToItem(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0U);
211  if (found) {
212  std::shared_ptr<LuminosityBlockAuxiliary> secondaryAuxiliary =
213  secondaryFileSequence_->readLuminosityBlockAuxiliary_();
214  checkConsistency(lumiPrincipal.aux(), *secondaryAuxiliary);
215  secondaryLumiPrincipal_ = std::make_shared<LuminosityBlockPrincipal>(
216  secondaryFileSequence_->fileProductRegistry(), processConfiguration(), nullptr, lumiPrincipal.index());
217  secondaryLumiPrincipal_->setAux(*secondaryAuxiliary);
218  secondaryFileSequence_->readLuminosityBlock_(*secondaryLumiPrincipal_);
219  checkHistoryConsistency(lumiPrincipal, *secondaryLumiPrincipal_);
221  } else {
222  throw Exception(errors::MismatchedInputFiles, "PoolSource::readLuminosityBlock_")
223  << " Run " << lumiPrincipal.run() << " LuminosityBlock " << lumiPrincipal.luminosityBlock()
224  << " is not found in the secondary input files\n";
225  }
226  }
227  }
228 
229  void PoolSource::readEvent_(EventPrincipal& eventPrincipal) {
230  bool readEventSucceeded = primaryFileSequence_->readEvent(eventPrincipal);
231  assert(readEventSucceeded);
233  bool found = secondaryFileSequence_->skipToItem(
234  eventPrincipal.run(), eventPrincipal.luminosityBlock(), eventPrincipal.id().event());
235  if (found) {
236  EventPrincipal& secondaryEventPrincipal = *secondaryEventPrincipals_[eventPrincipal.streamID().value()];
237  bool readEventSucceeded = secondaryFileSequence_->readEvent(secondaryEventPrincipal);
238  checkConsistency(eventPrincipal, secondaryEventPrincipal);
239  checkHistoryConsistency(eventPrincipal, secondaryEventPrincipal);
240  assert(readEventSucceeded);
241  eventPrincipal.recombine(secondaryEventPrincipal, branchIDsToReplace_[InEvent]);
242  eventPrincipal.mergeProvenanceRetrievers(secondaryEventPrincipal);
243  secondaryEventPrincipal.clearPrincipal();
244  } else {
245  throw Exception(errors::MismatchedInputFiles, "PoolSource::readEvent_")
246  << eventPrincipal.id() << " is not found in the secondary input files\n";
247  }
248  }
249  if (not delayReadingEventProducts_) {
250  eventPrincipal.readAllFromSourceAndMergeImmediately();
251  }
252  }
253 
254  bool PoolSource::readIt(EventID const& id, EventPrincipal& eventPrincipal, StreamContext& streamContext) {
255  bool found = primaryFileSequence_->skipToItem(id.run(), id.luminosityBlock(), id.event());
256  if (!found)
257  return false;
258  EventSourceSentry sentry(*this, streamContext);
259  readEvent_(eventPrincipal);
260  return true;
261  }
262 
267  InputSource::ItemType itemType = primaryFileSequence_->getNextItemType(run, lumi, event);
269  if (itemType == ItemType::IsRun || itemType == ItemType::IsLumi || itemType == ItemType::IsEvent) {
270  if (!secondaryFileSequence_->containedInCurrentFile(run, lumi, event)) {
272  }
273  }
274  }
275  return runHelper_->nextItemType(state(), itemType, run, lumi, event);
276  }
277 
278  std::pair<SharedResourcesAcquirer*, std::recursive_mutex*> PoolSource::resourceSharedWithDelayedReader_() {
279  return std::make_pair(resourceSharedWithDelayedReaderPtr_.get(), mutexSharedWithDelayedReader_.get());
280  }
281 
282  // Rewind to before the first event that was read.
284 
285  // Advance "offset" events. Offset can be positive or negative (or zero).
287 
288  bool PoolSource::goToEvent_(EventID const& eventID) { return primaryFileSequence_->goToEvent(eventID); }
289 
292 
293  std::vector<std::string> defaultStrings;
294  desc.setComment("Reads EDM/Root files.");
295  desc.addUntracked<std::vector<std::string> >("fileNames")->setComment("Names of files to be processed.");
296  desc.addUntracked<std::vector<std::string> >("secondaryFileNames", defaultStrings)
297  ->setComment("Names of secondary files to be processed.");
298  desc.addUntracked<bool>("needSecondaryFileNames", false)
299  ->setComment("If True, 'secondaryFileNames' must be specified and be non-empty.");
300  desc.addUntracked<std::string>("overrideCatalog", std::string());
301  desc.addUntracked<bool>("skipBadFiles", false)
302  ->setComment(
303  "True: Ignore any missing or unopenable input file.\n"
304  "False: Throw exception if missing or unopenable input file.");
305  desc.addUntracked<bool>("bypassVersionCheck", false)
306  ->setComment(
307  "True: Bypass release version check.\n"
308  "False: Throw exception if reading file in a release prior to the release in which the file was written.");
309  desc.addUntracked<int>("treeMaxVirtualSize", -1)
310  ->setComment("Size of ROOT TTree TBasket cache. Affects performance.");
311  desc.addUntracked<bool>("dropDescendantsOfDroppedBranches", true)
312  ->setComment("If True, also drop on input any descendent of any branch dropped on input.");
313  desc.addUntracked<bool>("labelRawDataLikeMC", true)
314  ->setComment("If True: replace module label for raw data to match MC. Also use 'LHC' as process.");
315  desc.addUntracked<bool>("delayReadingEventProducts", true)
316  ->setComment(
317  "If True: do not read a data product from the file until it is requested. If False: all event data "
318  "products are read upfront.");
323 
324  descriptions.add("source", desc);
325  }
326 
327  bool PoolSource::randomAccess_() const { return true; }
328 
330 
332 } // namespace edm
LuminosityBlockNumber_t luminosityBlock() const
void fillProcessBlockHelper_() override
Definition: PoolSource.cc:177
edm::propagate_const< std::unique_ptr< RootSecondaryFileSequence > > secondaryFileSequence_
Definition: PoolSource.h:98
PoolSource(ParameterSet const &pset, InputSourceDescription const &desc)
Definition: PoolSource.cc:65
void clearPrincipal()
Definition: Principal.cc:383
bool goToEvent_(EventID const &eventID) override
Definition: PoolSource.cc:288
bool isSameEvent(EventAuxiliary const &a, EventAuxiliary const &b)
std::unique_ptr< SharedResourcesAcquirer > resourceSharedWithDelayedReaderPtr_
Definition: PoolSource.h:95
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:359
bool nextProcessBlock_(ProcessBlockPrincipal &) override
Definition: PoolSource.cc:179
RunNumber_t run() const
edm::propagate_const< std::unique_ptr< RunHelperBase > > runHelper_
Definition: PoolSource.h:93
std::map< BranchKey, BranchDescription > ProductList
static void fillDescription(ParameterSetDescription &desc)
Definition: RunHelper.cc:240
edm::propagate_const< std::shared_ptr< RunPrincipal > > secondaryRunPrincipal_
Definition: PoolSource.h:79
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:458
RunNumber_t run() const
Definition: RunPrincipal.h:61
unsigned long long EventNumber_t
void readProcessBlock_(ProcessBlockPrincipal &) override
Definition: PoolSource.cc:183
void readLuminosityBlock_(LuminosityBlockPrincipal &lumiPrincipal) override
Definition: PoolSource.cc:207
static void fillDescription(ParameterSetDescription &desc, char const *parameterName, std::vector< std::string > const &defaultStrings=defaultSelectionStrings())
static void fillDescription(ParameterSetDescription &desc)
StreamID streamID() const
assert(be >=bs)
unsigned int LuminosityBlockNumber_t
unsigned int nStreams_
Definition: PoolSource.h:84
static constexpr EventNumber_t invalidEvent
std::shared_ptr< LuminosityBlockAuxiliary > readLuminosityBlockAuxiliary_() override
Definition: PoolSource.cc:173
LuminosityBlockNumber_t luminosityBlock() const
~PoolSource() override
Definition: PoolSource.cc:152
std::pair< SharedResourcesAcquirer *, std::recursive_mutex * > resourceSharedWithDelayedReader_() override
Definition: PoolSource.cc:278
bool randomAccess_() const override
Definition: PoolSource.cc:327
void mergeProvenanceRetrievers(EventPrincipal &other)
ProcessingController::ReverseState reverseState_() const override
Definition: PoolSource.cc:331
bool delayReadingEventProducts_
Definition: PoolSource.h:91
void readRun_(RunPrincipal &runPrincipal) override
Definition: PoolSource.cc:187
std::vector< edm::propagate_const< std::unique_ptr< EventPrincipal > > > secondaryEventPrincipals_
Definition: PoolSource.h:81
bool isAncestor(ProcessHistory const &a, ProcessHistory const &b)
void readEvent_(EventPrincipal &eventPrincipal) override
Definition: PoolSource.cc:229
static constexpr RunNumber_t invalidRun
static SharedResourcesRegistry * instance()
static void reportReadBranches()
Definition: InputFile.cc:105
std::shared_ptr< RunAuxiliary > readRunAuxiliary_() override
Definition: PoolSource.cc:171
RunIndex index() const
Definition: RunPrincipal.h:56
ProcessConfiguration const & processConfiguration() const
Accessor for Process Configuration.
Definition: InputSource.h:223
std::unique_ptr< RunHelperBase > makeRunHelper(ParameterSet const &pset)
Definition: RunHelper.cc:12
InputFileCatalog secondaryCatalog_
Definition: PoolSource.h:78
static void fillDescriptions(ConfigurationDescriptions &descriptions)
Definition: PoolSource.cc:290
ProcessingController::ForwardState forwardState_() const override
Definition: PoolSource.cc:329
static constexpr LuminosityBlockNumber_t invalidLumi
bool readIt(EventID const &id, EventPrincipal &eventPrincipal, StreamContext &streamContext) override
Definition: PoolSource.cc:254
std::array< std::vector< BranchID >, NumBranchTypes > branchIDsToReplace_
Definition: PoolSource.h:82
void add(std::string const &label, ParameterSetDescription const &psetDescription)
ProductList & productListUpdator()
void endJob() override
Definition: PoolSource.cc:154
void skip(int offset) override
Definition: PoolSource.cc:286
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:463
HLT enums.
void readAllFromSourceAndMergeImmediately(MergeableRunProductMetadata const *mergeableRunProductMetadata=nullptr)
Definition: Principal.cc:902
ItemTypeInfo state() const
Definition: InputSource.h:361
LuminosityBlockIndex index() const
ItemTypeInfo getNextItemType() override
Definition: PoolSource.cc:263
std::shared_ptr< std::recursive_mutex > mutexSharedWithDelayedReader_
Definition: PoolSource.h:96
static void fillDescription(ParameterSetDescription &desc)
Definition: InputSource.cc:117
RunAuxiliary const & aux() const
Definition: RunPrincipal.h:59
unsigned int RunNumber_t
LuminosityBlockAuxiliary const & aux() const
unsigned int value() const
Definition: StreamID.h:43
void rewind_() override
Definition: PoolSource.cc:283
std::pair< SharedResourcesAcquirer, std::shared_ptr< std::recursive_mutex > > createAcquirerForSourceDelayedReader()
edm::propagate_const< std::shared_ptr< LuminosityBlockPrincipal > > secondaryLumiPrincipal_
Definition: PoolSource.h:80
EventID const & id() const
void recombine(Principal &other, std::vector< BranchID > const &bids)
Definition: Principal.cc:824
edm::propagate_const< std::unique_ptr< RootPrimaryFileSequence > > primaryFileSequence_
Definition: PoolSource.h:97
def move(src, dest)
Definition: eostools.py:511
EventNumber_t event() const
Definition: EventID.h:40
Definition: event.py:1
std::shared_ptr< FileBlock > readFile_() override
Definition: PoolSource.cc:161
void closeFile_() override
Definition: PoolSource.cc:169