CMS 3D CMS Logo

PoolSource.cc
Go to the documentation of this file.
1 /*----------------------------------------------------------------------
2 ----------------------------------------------------------------------*/
3 #include "PoolSource.h"
4 #include "InputFile.h"
7 #include "RunHelper.h"
26 
27 #include <set>
28 
29 namespace edm {
30 
31  class BranchID;
32  class LuminosityBlockID;
33  class EventID;
34  class ThinnedAssociationsHelper;
35 
36  namespace {
37  void checkHistoryConsistency(Principal const& primary, Principal const& secondary) {
38  ProcessHistory const& ph1 = primary.processHistory();
39  ProcessHistory const& ph2 = secondary.processHistory();
40  if (ph1 != ph2 && !isAncestor(ph2, ph1)) {
41  throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency")
42  << "The secondary file is not an ancestor of the primary file\n";
43  }
44  }
45  void checkConsistency(EventPrincipal const& primary, EventPrincipal const& secondary) {
46  if (!isSameEvent(primary, secondary)) {
47  throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency")
48  << primary.id() << " has inconsistent EventAuxiliary data in the primary and secondary file\n";
49  }
50  }
51  void checkConsistency(LuminosityBlockAuxiliary const& primary, LuminosityBlockAuxiliary const& secondary) {
52  if (primary.id() != secondary.id()) {
53  throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency")
54  << primary.id() << " has inconsistent LuminosityBlockAuxiliary data in the primary and secondary file\n";
55  }
56  }
57  void checkConsistency(RunAuxiliary const& primary, RunAuxiliary const& secondary) {
58  if (primary.id() != secondary.id()) {
59  throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency")
60  << primary.id() << " has inconsistent RunAuxiliary data in the primary and secondary file\n";
61  }
62  }
63  } // namespace
64 
66  : InputSource(pset, desc),
67  rootServiceChecker_(),
68  catalog_(pset.getUntrackedParameter<std::vector<std::string> >("fileNames"),
69  pset.getUntrackedParameter<std::string>("overrideCatalog", std::string())),
70  secondaryCatalog_(
71  pset.getUntrackedParameter<std::vector<std::string> >("secondaryFileNames", std::vector<std::string>()),
72  pset.getUntrackedParameter<std::string>("overrideCatalog", std::string())),
73  secondaryRunPrincipal_(),
74  secondaryLumiPrincipal_(),
75  secondaryEventPrincipals_(),
76  branchIDsToReplace_(),
77  nStreams_(desc.allocations_->numberOfStreams()),
78  skipBadFiles_(pset.getUntrackedParameter<bool>("skipBadFiles")),
79  bypassVersionCheck_(pset.getUntrackedParameter<bool>("bypassVersionCheck")),
80  treeMaxVirtualSize_(pset.getUntrackedParameter<int>("treeMaxVirtualSize")),
81  productSelectorRules_(pset, "inputCommands", "InputSource"),
82  dropDescendants_(pset.getUntrackedParameter<bool>("dropDescendantsOfDroppedBranches")),
83  labelRawDataLikeMC_(pset.getUntrackedParameter<bool>("labelRawDataLikeMC")),
84  delayReadingEventProducts_(pset.getUntrackedParameter<bool>("delayReadingEventProducts")),
85  runHelper_(makeRunHelper(pset)),
86  resourceSharedWithDelayedReaderPtr_(),
87  // Note: primaryFileSequence_ and secondaryFileSequence_ need to be initialized last, because they use data members
88  // initialized previously in their own initialization.
89  primaryFileSequence_(new RootPrimaryFileSequence(pset, *this, catalog_)),
90  secondaryFileSequence_(
91  secondaryCatalog_.empty() ? nullptr : new RootSecondaryFileSequence(pset, *this, secondaryCatalog_)) {
93  resourceSharedWithDelayedReaderPtr_ = std::make_unique<SharedResourcesAcquirer>(std::move(resources.first));
95 
96  if (secondaryCatalog_.empty() && pset.getUntrackedParameter<bool>("needSecondaryFileNames", false)) {
97  throw Exception(errors::Configuration, "PoolSource") << "'secondaryFileNames' must be specified\n";
98  }
101  for (unsigned int index = 0; index < nStreams_; ++index) {
102  secondaryEventPrincipals_.emplace_back(new EventPrincipal(secondaryFileSequence_->fileProductRegistry(),
103  secondaryFileSequence_->fileBranchIDListHelper(),
104  std::make_shared<ThinnedAssociationsHelper const>(),
106  nullptr,
107  index));
108  }
109  std::array<std::set<BranchID>, NumBranchTypes> idsToReplace;
110  ProductRegistry::ProductList const& secondary = secondaryFileSequence_->fileProductRegistry()->productList();
111  ProductRegistry::ProductList const& primary = primaryFileSequence_->fileProductRegistry()->productList();
112  std::set<BranchID> associationsFromSecondary;
113  //this is the registry used by the 'outside' world and only has the primary file information in it at present
115  for (auto const& item : secondary) {
116  if (item.second.present()) {
117  idsToReplace[item.second.branchType()].insert(item.second.branchID());
118  if (item.second.branchType() == InEvent && item.second.unwrappedType() == typeid(ThinnedAssociation)) {
119  associationsFromSecondary.insert(item.second.branchID());
120  }
121  //now make sure this is marked as not dropped else the product will not be 'get'table from the Event
122  auto itFound = fullList.find(item.first);
123  if (itFound != fullList.end()) {
124  itFound->second.setDropped(false);
125  }
126  }
127  }
128  for (auto const& item : primary) {
129  if (item.second.present()) {
130  idsToReplace[item.second.branchType()].erase(item.second.branchID());
131  associationsFromSecondary.erase(item.second.branchID());
132  }
133  }
134  if (idsToReplace[InEvent].empty() && idsToReplace[InLumi].empty() && idsToReplace[InRun].empty()) {
135  secondaryFileSequence_ = nullptr; // propagate_const<T> has no reset() function
136  } else {
137  for (int i = InEvent; i < NumBranchTypes; ++i) {
138  branchIDsToReplace_[i].reserve(idsToReplace[i].size());
139  for (auto const& id : idsToReplace[i]) {
140  branchIDsToReplace_[i].push_back(id);
141  }
142  }
143  secondaryFileSequence_->initAssociationsFromSecondary(associationsFromSecondary);
144  }
145  }
146  }
147 
149 
152  secondaryFileSequence_->endJob();
153  primaryFileSequence_->endJob();
155  }
156 
157  std::shared_ptr<FileBlock> PoolSource::readFile_() {
158  std::shared_ptr<FileBlock> fb = primaryFileSequence_->readFile_();
160  fb->setNotFastClonable(FileBlock::HasSecondaryFileSequence);
161  }
162  return fb;
163  }
164 
166 
167  std::shared_ptr<RunAuxiliary> PoolSource::readRunAuxiliary_() { return primaryFileSequence_->readRunAuxiliary_(); }
168 
169  std::shared_ptr<LuminosityBlockAuxiliary> PoolSource::readLuminosityBlockAuxiliary_() {
170  return primaryFileSequence_->readLuminosityBlockAuxiliary_();
171  }
172 
173  void PoolSource::fillProcessBlockHelper_() { primaryFileSequence_->fillProcessBlockHelper_(); }
174 
176  return primaryFileSequence_->nextProcessBlock_(processBlockPrincipal);
177  }
178 
180  primaryFileSequence_->readProcessBlock_(processBlockPrincipal);
181  }
182 
183  void PoolSource::readRun_(RunPrincipal& runPrincipal) {
184  bool shouldWeProcessRun = primaryFileSequence_->readRun_(runPrincipal);
185  if (secondaryFileSequence_ && shouldWeProcessRun && !branchIDsToReplace_[InRun].empty()) {
186  bool found = secondaryFileSequence_->skipToItem(runPrincipal.run(), 0U, 0U);
187  if (found) {
188  std::shared_ptr<RunAuxiliary> secondaryAuxiliary = secondaryFileSequence_->readRunAuxiliary_();
189  checkConsistency(runPrincipal.aux(), *secondaryAuxiliary);
190  secondaryRunPrincipal_ = std::make_shared<RunPrincipal>(secondaryAuxiliary,
191  secondaryFileSequence_->fileProductRegistry(),
193  nullptr,
194  runPrincipal.index());
196  checkHistoryConsistency(runPrincipal, *secondaryRunPrincipal_);
198  } else {
199  throw Exception(errors::MismatchedInputFiles, "PoolSource::readRun_")
200  << " Run " << runPrincipal.run() << " is not found in the secondary input files\n";
201  }
202  }
203  }
204 
206  bool shouldWeProcessLumi = primaryFileSequence_->readLuminosityBlock_(lumiPrincipal);
207  if (secondaryFileSequence_ && shouldWeProcessLumi && !branchIDsToReplace_[InLumi].empty()) {
208  bool found = secondaryFileSequence_->skipToItem(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0U);
209  if (found) {
210  std::shared_ptr<LuminosityBlockAuxiliary> secondaryAuxiliary =
211  secondaryFileSequence_->readLuminosityBlockAuxiliary_();
212  checkConsistency(lumiPrincipal.aux(), *secondaryAuxiliary);
213  secondaryLumiPrincipal_ = std::make_shared<LuminosityBlockPrincipal>(
214  secondaryFileSequence_->fileProductRegistry(), processConfiguration(), nullptr, lumiPrincipal.index());
215  secondaryLumiPrincipal_->setAux(*secondaryAuxiliary);
216  secondaryFileSequence_->readLuminosityBlock_(*secondaryLumiPrincipal_);
217  checkHistoryConsistency(lumiPrincipal, *secondaryLumiPrincipal_);
219  } else {
220  throw Exception(errors::MismatchedInputFiles, "PoolSource::readLuminosityBlock_")
221  << " Run " << lumiPrincipal.run() << " LuminosityBlock " << lumiPrincipal.luminosityBlock()
222  << " is not found in the secondary input files\n";
223  }
224  }
225  }
226 
227  void PoolSource::readEvent_(EventPrincipal& eventPrincipal) {
228  bool readEventSucceeded = primaryFileSequence_->readEvent(eventPrincipal);
229  assert(readEventSucceeded);
231  bool found = secondaryFileSequence_->skipToItem(
232  eventPrincipal.run(), eventPrincipal.luminosityBlock(), eventPrincipal.id().event());
233  if (found) {
234  EventPrincipal& secondaryEventPrincipal = *secondaryEventPrincipals_[eventPrincipal.streamID().value()];
235  bool readEventSucceeded = secondaryFileSequence_->readEvent(secondaryEventPrincipal);
236  checkConsistency(eventPrincipal, secondaryEventPrincipal);
237  checkHistoryConsistency(eventPrincipal, secondaryEventPrincipal);
238  assert(readEventSucceeded);
239  eventPrincipal.recombine(secondaryEventPrincipal, branchIDsToReplace_[InEvent]);
240  eventPrincipal.mergeProvenanceRetrievers(secondaryEventPrincipal);
241  secondaryEventPrincipal.clearPrincipal();
242  } else {
243  throw Exception(errors::MismatchedInputFiles, "PoolSource::readEvent_")
244  << eventPrincipal.id() << " is not found in the secondary input files\n";
245  }
246  }
247  if (not delayReadingEventProducts_) {
248  eventPrincipal.readAllFromSourceAndMergeImmediately();
249  }
250  }
251 
252  bool PoolSource::readIt(EventID const& id, EventPrincipal& eventPrincipal, StreamContext& streamContext) {
253  bool found = primaryFileSequence_->skipToItem(id.run(), id.luminosityBlock(), id.event());
254  if (!found)
255  return false;
256  EventSourceSentry sentry(*this, streamContext);
257  readEvent_(eventPrincipal);
258  return true;
259  }
260 
265  InputSource::ItemType itemType = primaryFileSequence_->getNextItemType(run, lumi, event);
267  if (itemType == IsRun || itemType == IsLumi || itemType == IsEvent) {
268  if (!secondaryFileSequence_->containedInCurrentFile(run, lumi, event)) {
269  return IsSynchronize;
270  }
271  }
272  }
273  return runHelper_->nextItemType(state(), itemType, run, lumi, event);
274  }
275 
276  std::pair<SharedResourcesAcquirer*, std::recursive_mutex*> PoolSource::resourceSharedWithDelayedReader_() {
277  return std::make_pair(resourceSharedWithDelayedReaderPtr_.get(), mutexSharedWithDelayedReader_.get());
278  }
279 
280  // Rewind to before the first event that was read.
282 
283  // Advance "offset" events. Offset can be positive or negative (or zero).
285 
286  bool PoolSource::goToEvent_(EventID const& eventID) { return primaryFileSequence_->goToEvent(eventID); }
287 
290 
291  std::vector<std::string> defaultStrings;
292  desc.setComment("Reads EDM/Root files.");
293  desc.addUntracked<std::vector<std::string> >("fileNames")->setComment("Names of files to be processed.");
294  desc.addUntracked<std::vector<std::string> >("secondaryFileNames", defaultStrings)
295  ->setComment("Names of secondary files to be processed.");
296  desc.addUntracked<bool>("needSecondaryFileNames", false)
297  ->setComment("If True, 'secondaryFileNames' must be specified and be non-empty.");
298  desc.addUntracked<std::string>("overrideCatalog", std::string());
299  desc.addUntracked<bool>("skipBadFiles", false)
300  ->setComment(
301  "True: Ignore any missing or unopenable input file.\n"
302  "False: Throw exception if missing or unopenable input file.");
303  desc.addUntracked<bool>("bypassVersionCheck", false)
304  ->setComment(
305  "True: Bypass release version check.\n"
306  "False: Throw exception if reading file in a release prior to the release in which the file was written.");
307  desc.addUntracked<int>("treeMaxVirtualSize", -1)
308  ->setComment("Size of ROOT TTree TBasket cache. Affects performance.");
309  desc.addUntracked<bool>("dropDescendantsOfDroppedBranches", true)
310  ->setComment("If True, also drop on input any descendent of any branch dropped on input.");
311  desc.addUntracked<bool>("labelRawDataLikeMC", true)
312  ->setComment("If True: replace module label for raw data to match MC. Also use 'LHC' as process.");
313  desc.addUntracked<bool>("delayReadingEventProducts", true)
314  ->setComment(
315  "If True: do not read a data product from the file until it is requested. If False: all event data "
316  "products are read upfront.");
321 
322  descriptions.add("source", desc);
323  }
324 
325  bool PoolSource::randomAccess_() const { return true; }
326 
328 
330 } // namespace edm
LuminosityBlockNumber_t luminosityBlock() const
size
Write out results.
void fillProcessBlockHelper_() override
Definition: PoolSource.cc:173
edm::propagate_const< std::unique_ptr< RootSecondaryFileSequence > > secondaryFileSequence_
Definition: PoolSource.h:98
PoolSource(ParameterSet const &pset, InputSourceDescription const &desc)
Definition: PoolSource.cc:65
void clearPrincipal()
Definition: Principal.cc:382
bool goToEvent_(EventID const &eventID) override
Definition: PoolSource.cc:286
bool isSameEvent(EventAuxiliary const &a, EventAuxiliary const &b)
std::unique_ptr< SharedResourcesAcquirer > resourceSharedWithDelayedReaderPtr_
Definition: PoolSource.h:95
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:328
bool nextProcessBlock_(ProcessBlockPrincipal &) override
Definition: PoolSource.cc:175
RunNumber_t run() const
edm::propagate_const< std::unique_ptr< RunHelperBase > > runHelper_
Definition: PoolSource.h:93
std::map< BranchKey, BranchDescription > ProductList
static void fillDescription(ParameterSetDescription &desc)
Definition: RunHelper.cc:240
edm::propagate_const< std::shared_ptr< RunPrincipal > > secondaryRunPrincipal_
Definition: PoolSource.h:79
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:457
RunNumber_t run() const
Definition: RunPrincipal.h:61
unsigned long long EventNumber_t
void readProcessBlock_(ProcessBlockPrincipal &) override
Definition: PoolSource.cc:179
void readLuminosityBlock_(LuminosityBlockPrincipal &lumiPrincipal) override
Definition: PoolSource.cc:205
static void fillDescription(ParameterSetDescription &desc, char const *parameterName, std::vector< std::string > const &defaultStrings=defaultSelectionStrings())
static void fillDescription(ParameterSetDescription &desc)
StreamID streamID() const
assert(be >=bs)
unsigned int LuminosityBlockNumber_t
unsigned int nStreams_
Definition: PoolSource.h:84
static constexpr EventNumber_t invalidEvent
std::shared_ptr< LuminosityBlockAuxiliary > readLuminosityBlockAuxiliary_() override
Definition: PoolSource.cc:169
LuminosityBlockNumber_t luminosityBlock() const
~PoolSource() override
Definition: PoolSource.cc:148
std::pair< SharedResourcesAcquirer *, std::recursive_mutex * > resourceSharedWithDelayedReader_() override
Definition: PoolSource.cc:276
bool randomAccess_() const override
Definition: PoolSource.cc:325
void mergeProvenanceRetrievers(EventPrincipal &other)
ProcessingController::ReverseState reverseState_() const override
Definition: PoolSource.cc:329
bool delayReadingEventProducts_
Definition: PoolSource.h:91
void readRun_(RunPrincipal &runPrincipal) override
Definition: PoolSource.cc:183
std::vector< edm::propagate_const< std::unique_ptr< EventPrincipal > > > secondaryEventPrincipals_
Definition: PoolSource.h:81
bool isAncestor(ProcessHistory const &a, ProcessHistory const &b)
void readEvent_(EventPrincipal &eventPrincipal) override
Definition: PoolSource.cc:227
static constexpr RunNumber_t invalidRun
static SharedResourcesRegistry * instance()
static void reportReadBranches()
Definition: InputFile.cc:106
std::shared_ptr< RunAuxiliary > readRunAuxiliary_() override
Definition: PoolSource.cc:167
RunIndex index() const
Definition: RunPrincipal.h:57
ItemType getNextItemType() override
Definition: PoolSource.cc:261
ProcessConfiguration const & processConfiguration() const
Accessor for Process Configuration.
Definition: InputSource.h:192
std::unique_ptr< RunHelperBase > makeRunHelper(ParameterSet const &pset)
Definition: RunHelper.cc:12
InputFileCatalog secondaryCatalog_
Definition: PoolSource.h:78
static void fillDescriptions(ConfigurationDescriptions &descriptions)
Definition: PoolSource.cc:288
ProcessingController::ForwardState forwardState_() const override
Definition: PoolSource.cc:327
static constexpr LuminosityBlockNumber_t invalidLumi
bool readIt(EventID const &id, EventPrincipal &eventPrincipal, StreamContext &streamContext) override
Definition: PoolSource.cc:252
std::array< std::vector< BranchID >, NumBranchTypes > branchIDsToReplace_
Definition: PoolSource.h:82
void add(std::string const &label, ParameterSetDescription const &psetDescription)
ProductList & productListUpdator()
void endJob() override
Definition: PoolSource.cc:150
ItemType state() const
Definition: InputSource.h:330
void skip(int offset) override
Definition: PoolSource.cc:284
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:462
HLT enums.
void readAllFromSourceAndMergeImmediately(MergeableRunProductMetadata const *mergeableRunProductMetadata=nullptr)
Definition: Principal.cc:968
LuminosityBlockIndex index() const
std::shared_ptr< std::recursive_mutex > mutexSharedWithDelayedReader_
Definition: PoolSource.h:96
static void fillDescription(ParameterSetDescription &desc)
Definition: InputSource.cc:117
RunAuxiliary const & aux() const
Definition: RunPrincipal.h:59
unsigned int RunNumber_t
LuminosityBlockAuxiliary const & aux() const
unsigned int value() const
Definition: StreamID.h:43
void rewind_() override
Definition: PoolSource.cc:281
std::pair< SharedResourcesAcquirer, std::shared_ptr< std::recursive_mutex > > createAcquirerForSourceDelayedReader()
edm::propagate_const< std::shared_ptr< LuminosityBlockPrincipal > > secondaryLumiPrincipal_
Definition: PoolSource.h:80
EventID const & id() const
void recombine(Principal &other, std::vector< BranchID > const &bids)
Definition: Principal.cc:890
edm::propagate_const< std::unique_ptr< RootPrimaryFileSequence > > primaryFileSequence_
Definition: PoolSource.h:97
def move(src, dest)
Definition: eostools.py:511
EventNumber_t event() const
Definition: EventID.h:40
Definition: event.py:1
std::shared_ptr< FileBlock > readFile_() override
Definition: PoolSource.cc:157
void closeFile_() override
Definition: PoolSource.cc:165