CMS 3D CMS Logo

PoolSource.cc
Go to the documentation of this file.
1 /*----------------------------------------------------------------------
2 ----------------------------------------------------------------------*/
3 #include "PoolSource.h"
4 #include "InputFile.h"
7 #include "RunHelper.h"
26 
27 #include <set>
28 
29 namespace edm {
30 
31  class BranchID;
32  class LuminosityBlockID;
33  class EventID;
34  class ThinnedAssociationsHelper;
35 
36  namespace {
37  void checkHistoryConsistency(Principal const& primary, Principal const& secondary) {
38  ProcessHistory const& ph1 = primary.processHistory();
39  ProcessHistory const& ph2 = secondary.processHistory();
40  if (ph1 != ph2 && !isAncestor(ph2, ph1)) {
41  throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency")
42  << "The secondary file is not an ancestor of the primary file\n";
43  }
44  }
45  void checkConsistency(EventPrincipal const& primary, EventPrincipal const& secondary) {
46  if (!isSameEvent(primary, secondary)) {
47  throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency")
48  << primary.id() << " has inconsistent EventAuxiliary data in the primary and secondary file\n";
49  }
50  }
51  void checkConsistency(LuminosityBlockAuxiliary const& primary, LuminosityBlockAuxiliary const& secondary) {
52  if (primary.id() != secondary.id()) {
53  throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency")
54  << primary.id() << " has inconsistent LuminosityBlockAuxiliary data in the primary and secondary file\n";
55  }
56  }
57  void checkConsistency(RunAuxiliary const& primary, RunAuxiliary const& secondary) {
58  if (primary.id() != secondary.id()) {
59  throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency")
60  << primary.id() << " has inconsistent RunAuxiliary data in the primary and secondary file\n";
61  }
62  }
63  } // namespace
64 
66  : InputSource(pset, desc),
67  rootServiceChecker_(),
68  catalog_(pset.getUntrackedParameter<std::vector<std::string> >("fileNames"),
69  pset.getUntrackedParameter<std::string>("overrideCatalog", std::string())),
70  secondaryCatalog_(
71  pset.getUntrackedParameter<std::vector<std::string> >("secondaryFileNames", std::vector<std::string>()),
72  pset.getUntrackedParameter<std::string>("overrideCatalog", std::string())),
73  secondaryRunPrincipal_(),
74  secondaryLumiPrincipal_(),
75  secondaryEventPrincipals_(),
76  branchIDsToReplace_(),
77  nStreams_(desc.allocations_->numberOfStreams()),
78  skipBadFiles_(pset.getUntrackedParameter<bool>("skipBadFiles")),
79  bypassVersionCheck_(pset.getUntrackedParameter<bool>("bypassVersionCheck")),
80  treeMaxVirtualSize_(pset.getUntrackedParameter<int>("treeMaxVirtualSize")),
81  productSelectorRules_(pset, "inputCommands", "InputSource"),
82  dropDescendants_(pset.getUntrackedParameter<bool>("dropDescendantsOfDroppedBranches")),
83  labelRawDataLikeMC_(pset.getUntrackedParameter<bool>("labelRawDataLikeMC")),
84  delayReadingEventProducts_(pset.getUntrackedParameter<bool>("delayReadingEventProducts")),
85  runHelper_(makeRunHelper(pset)),
86  resourceSharedWithDelayedReaderPtr_(),
87  // Note: primaryFileSequence_ and secondaryFileSequence_ need to be initialized last, because they use data members
88  // initialized previously in their own initialization.
89  primaryFileSequence_(new RootPrimaryFileSequence(pset, *this, catalog_)),
90  secondaryFileSequence_(
91  secondaryCatalog_.empty() ? nullptr : new RootSecondaryFileSequence(pset, *this, secondaryCatalog_)) {
93  resourceSharedWithDelayedReaderPtr_ = std::make_unique<SharedResourcesAcquirer>(std::move(resources.first));
95 
96  if (secondaryCatalog_.empty() && pset.getUntrackedParameter<bool>("needSecondaryFileNames", false)) {
97  throw Exception(errors::Configuration, "PoolSource") << "'secondaryFileNames' must be specified\n";
98  }
101  for (unsigned int index = 0; index < nStreams_; ++index) {
102  secondaryEventPrincipals_.emplace_back(new EventPrincipal(secondaryFileSequence_->fileProductRegistry(),
103  secondaryFileSequence_->fileBranchIDListHelper(),
104  std::make_shared<ThinnedAssociationsHelper const>(),
106  nullptr,
107  index));
108  }
109  std::array<std::set<BranchID>, NumBranchTypes> idsToReplace;
110  ProductRegistry::ProductList const& secondary = secondaryFileSequence_->fileProductRegistry()->productList();
111  ProductRegistry::ProductList const& primary = primaryFileSequence_->fileProductRegistry()->productList();
112  std::set<BranchID> associationsFromSecondary;
113  //this is the registry used by the 'outside' world and only has the primary file information in it at present
115  for (auto const& item : secondary) {
116  if (item.second.present()) {
117  idsToReplace[item.second.branchType()].insert(item.second.branchID());
118  if (item.second.branchType() == InEvent && item.second.unwrappedType() == typeid(ThinnedAssociation)) {
119  associationsFromSecondary.insert(item.second.branchID());
120  }
121  //now make sure this is marked as not dropped else the product will not be 'get'table from the Event
122  auto itFound = fullList.find(item.first);
123  if (itFound != fullList.end()) {
124  itFound->second.setDropped(false);
125  }
126  }
127  }
128  for (auto const& item : primary) {
129  if (item.second.present()) {
130  idsToReplace[item.second.branchType()].erase(item.second.branchID());
131  associationsFromSecondary.erase(item.second.branchID());
132  }
133  }
134  if (idsToReplace[InEvent].empty() && idsToReplace[InLumi].empty() && idsToReplace[InRun].empty()) {
135  secondaryFileSequence_ = nullptr; // propagate_const<T> has no reset() function
136  } else {
137  for (int i = InEvent; i < NumBranchTypes; ++i) {
138  branchIDsToReplace_[i].reserve(idsToReplace[i].size());
139  for (auto const& id : idsToReplace[i]) {
140  branchIDsToReplace_[i].push_back(id);
141  }
142  }
143  secondaryFileSequence_->initAssociationsFromSecondary(associationsFromSecondary);
144  }
145  }
146  }
147 
149 
152  secondaryFileSequence_->endJob();
153  primaryFileSequence_->endJob();
155  }
156 
157  std::unique_ptr<FileBlock> PoolSource::readFile_() {
158  std::unique_ptr<FileBlock> fb = primaryFileSequence_->readFile_();
160  fb->setNotFastClonable(FileBlock::HasSecondaryFileSequence);
161  }
162  return fb;
163  }
164 
166 
167  std::shared_ptr<RunAuxiliary> PoolSource::readRunAuxiliary_() { return primaryFileSequence_->readRunAuxiliary_(); }
168 
169  std::shared_ptr<LuminosityBlockAuxiliary> PoolSource::readLuminosityBlockAuxiliary_() {
170  return primaryFileSequence_->readLuminosityBlockAuxiliary_();
171  }
172 
173  void PoolSource::readRun_(RunPrincipal& runPrincipal) {
174  primaryFileSequence_->readRun_(runPrincipal);
176  bool found = secondaryFileSequence_->skipToItem(runPrincipal.run(), 0U, 0U);
177  if (found) {
178  std::shared_ptr<RunAuxiliary> secondaryAuxiliary = secondaryFileSequence_->readRunAuxiliary_();
179  checkConsistency(runPrincipal.aux(), *secondaryAuxiliary);
180  secondaryRunPrincipal_ = std::make_shared<RunPrincipal>(secondaryAuxiliary,
181  secondaryFileSequence_->fileProductRegistry(),
183  nullptr,
184  runPrincipal.index());
185  secondaryFileSequence_->readRun_(*secondaryRunPrincipal_);
186  checkHistoryConsistency(runPrincipal, *secondaryRunPrincipal_);
187  runPrincipal.recombine(*secondaryRunPrincipal_, branchIDsToReplace_[InRun]);
188  } else {
189  throw Exception(errors::MismatchedInputFiles, "PoolSource::readRun_")
190  << " Run " << runPrincipal.run() << " is not found in the secondary input files\n";
191  }
192  }
193  }
194 
196  primaryFileSequence_->readLuminosityBlock_(lumiPrincipal);
198  bool found = secondaryFileSequence_->skipToItem(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0U);
199  if (found) {
200  std::shared_ptr<LuminosityBlockAuxiliary> secondaryAuxiliary =
201  secondaryFileSequence_->readLuminosityBlockAuxiliary_();
202  checkConsistency(lumiPrincipal.aux(), *secondaryAuxiliary);
203  secondaryLumiPrincipal_ = std::make_shared<LuminosityBlockPrincipal>(
204  secondaryFileSequence_->fileProductRegistry(), processConfiguration(), nullptr, lumiPrincipal.index());
205  secondaryLumiPrincipal_->setAux(*secondaryAuxiliary);
206  secondaryFileSequence_->readLuminosityBlock_(*secondaryLumiPrincipal_);
207  checkHistoryConsistency(lumiPrincipal, *secondaryLumiPrincipal_);
208  lumiPrincipal.recombine(*secondaryLumiPrincipal_, branchIDsToReplace_[InLumi]);
209  } else {
210  throw Exception(errors::MismatchedInputFiles, "PoolSource::readLuminosityBlock_")
211  << " Run " << lumiPrincipal.run() << " LuminosityBlock " << lumiPrincipal.luminosityBlock()
212  << " is not found in the secondary input files\n";
213  }
214  }
215  }
216 
217  void PoolSource::readEvent_(EventPrincipal& eventPrincipal) {
218  primaryFileSequence_->readEvent(eventPrincipal);
220  bool found = secondaryFileSequence_->skipToItem(
221  eventPrincipal.run(), eventPrincipal.luminosityBlock(), eventPrincipal.id().event());
222  if (found) {
223  EventPrincipal& secondaryEventPrincipal = *secondaryEventPrincipals_[eventPrincipal.streamID().value()];
224  secondaryFileSequence_->readEvent(secondaryEventPrincipal);
225  checkConsistency(eventPrincipal, secondaryEventPrincipal);
226  checkHistoryConsistency(eventPrincipal, secondaryEventPrincipal);
227  eventPrincipal.recombine(secondaryEventPrincipal, branchIDsToReplace_[InEvent]);
228  eventPrincipal.mergeProvenanceRetrievers(secondaryEventPrincipal);
229  secondaryEventPrincipal.clearPrincipal();
230  } else {
231  throw Exception(errors::MismatchedInputFiles, "PoolSource::readEvent_")
232  << eventPrincipal.id() << " is not found in the secondary input files\n";
233  }
234  }
235  if (not delayReadingEventProducts_) {
236  eventPrincipal.readAllFromSourceAndMergeImmediately();
237  }
238  }
239 
240  bool PoolSource::readIt(EventID const& id, EventPrincipal& eventPrincipal, StreamContext& streamContext) {
241  bool found = primaryFileSequence_->skipToItem(id.run(), id.luminosityBlock(), id.event());
242  if (!found)
243  return false;
244  EventSourceSentry sentry(*this, streamContext);
245  readEvent_(eventPrincipal);
246  return true;
247  }
248 
253  InputSource::ItemType itemType = primaryFileSequence_->getNextItemType(run, lumi, event);
255  if (itemType == IsRun || itemType == IsLumi || itemType == IsEvent) {
256  if (!secondaryFileSequence_->containedInCurrentFile(run, lumi, event)) {
257  return IsSynchronize;
258  }
259  }
260  }
261  return runHelper_->nextItemType(state(), itemType);
262  }
263 
264  std::pair<SharedResourcesAcquirer*, std::recursive_mutex*> PoolSource::resourceSharedWithDelayedReader_() {
265  return std::make_pair(resourceSharedWithDelayedReaderPtr_.get(), mutexSharedWithDelayedReader_.get());
266  }
267 
268  // Rewind to before the first event that was read.
270 
271  // Advance "offset" events. Offset can be positive or negative (or zero).
272  void PoolSource::skip(int offset) { primaryFileSequence_->skipEvents(offset); }
273 
274  bool PoolSource::goToEvent_(EventID const& eventID) { return primaryFileSequence_->goToEvent(eventID); }
275 
278 
279  std::vector<std::string> defaultStrings;
280  desc.setComment("Reads EDM/Root files.");
281  desc.addUntracked<std::vector<std::string> >("fileNames")->setComment("Names of files to be processed.");
282  desc.addUntracked<std::vector<std::string> >("secondaryFileNames", defaultStrings)
283  ->setComment("Names of secondary files to be processed.");
284  desc.addUntracked<bool>("needSecondaryFileNames", false)
285  ->setComment("If True, 'secondaryFileNames' must be specified and be non-empty.");
286  desc.addUntracked<std::string>("overrideCatalog", std::string());
287  desc.addUntracked<bool>("skipBadFiles", false)
288  ->setComment(
289  "True: Ignore any missing or unopenable input file.\n"
290  "False: Throw exception if missing or unopenable input file.");
291  desc.addUntracked<bool>("bypassVersionCheck", false)
292  ->setComment(
293  "True: Bypass release version check.\n"
294  "False: Throw exception if reading file in a release prior to the release in which the file was written.");
295  desc.addUntracked<int>("treeMaxVirtualSize", -1)
296  ->setComment("Size of ROOT TTree TBasket cache. Affects performance.");
297  desc.addUntracked<bool>("dropDescendantsOfDroppedBranches", true)
298  ->setComment("If True, also drop on input any descendent of any branch dropped on input.");
299  desc.addUntracked<bool>("labelRawDataLikeMC", true)
300  ->setComment("If True: replace module label for raw data to match MC. Also use 'LHC' as process.");
301  desc.addUntracked<bool>("delayReadingEventProducts", true)
302  ->setComment(
303  "If True: do not read a data product from the file until it is requested. If False: all event data "
304  "products are read upfront.");
305  ProductSelectorRules::fillDescription(desc, "inputCommands");
309 
310  descriptions.add("source", desc);
311  }
312 
313  bool PoolSource::randomAccess_() const { return true; }
314 
316 
318 } // namespace edm
size
Write out results.
edm::propagate_const< std::unique_ptr< RootSecondaryFileSequence > > secondaryFileSequence_
Definition: PoolSource.h:95
PoolSource(ParameterSet const &pset, InputSourceDescription const &desc)
Definition: PoolSource.cc:65
EventNumber_t event() const
Definition: EventID.h:41
void clearPrincipal()
Definition: Principal.cc:374
T getUntrackedParameter(std::string const &, T const &) const
bool goToEvent_(EventID const &eventID) override
Definition: PoolSource.cc:274
bool isSameEvent(EventAuxiliary const &a, EventAuxiliary const &b)
std::unique_ptr< SharedResourcesAcquirer > resourceSharedWithDelayedReaderPtr_
Definition: PoolSource.h:92
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:326
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
RunNumber_t run() const
edm::propagate_const< std::unique_ptr< RunHelperBase > > runHelper_
Definition: PoolSource.h:90
ProcessingController::ReverseState reverseState_() const override
Definition: PoolSource.cc:317
std::map< BranchKey, BranchDescription > ProductList
static void fillDescription(ParameterSetDescription &desc)
Definition: RunHelper.cc:158
edm::propagate_const< std::shared_ptr< RunPrincipal > > secondaryRunPrincipal_
Definition: PoolSource.h:76
#define nullptr
EventID const & id() const
unsigned long long EventNumber_t
LuminosityBlockAuxiliary const & aux() const
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:436
void readLuminosityBlock_(LuminosityBlockPrincipal &lumiPrincipal) override
Definition: PoolSource.cc:195
LuminosityBlockIndex index() const
static void fillDescription(ParameterSetDescription &desc, char const *parameterName, std::vector< std::string > const &defaultStrings=defaultSelectionStrings())
LuminosityBlockNumber_t luminosityBlock() const
RunNumber_t run() const
Definition: RunPrincipal.h:60
static void fillDescription(ParameterSetDescription &desc)
unsigned int LuminosityBlockNumber_t
void readAllFromSourceAndMergeImmediately(MergeableRunProductMetadata const *mergeableRunProductMetadata=0)
Definition: Principal.cc:904
unsigned int nStreams_
Definition: PoolSource.h:81
static constexpr EventNumber_t invalidEvent
std::shared_ptr< LuminosityBlockAuxiliary > readLuminosityBlockAuxiliary_() override
Definition: PoolSource.cc:169
~PoolSource() override
Definition: PoolSource.cc:148
std::pair< SharedResourcesAcquirer *, std::recursive_mutex * > resourceSharedWithDelayedReader_() override
Definition: PoolSource.cc:264
void mergeProvenanceRetrievers(EventPrincipal &other)
bool delayReadingEventProducts_
Definition: PoolSource.h:88
void setComment(std::string const &value)
void readRun_(RunPrincipal &runPrincipal) override
Definition: PoolSource.cc:173
std::vector< edm::propagate_const< std::unique_ptr< EventPrincipal > > > secondaryEventPrincipals_
Definition: PoolSource.h:78
bool isAncestor(ProcessHistory const &a, ProcessHistory const &b)
LuminosityBlockNumber_t luminosityBlock() const
void readEvent_(EventPrincipal &eventPrincipal) override
Definition: PoolSource.cc:217
static constexpr RunNumber_t invalidRun
static SharedResourcesRegistry * instance()
static void reportReadBranches()
Definition: InputFile.cc:106
StreamID streamID() const
std::shared_ptr< RunAuxiliary > readRunAuxiliary_() override
Definition: PoolSource.cc:167
bool randomAccess_() const override
Definition: PoolSource.cc:313
RunAuxiliary const & aux() const
Definition: RunPrincipal.h:58
ItemType getNextItemType() override
Definition: PoolSource.cc:249
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
ItemType state() const
Definition: InputSource.h:328
std::unique_ptr< RunHelperBase > makeRunHelper(ParameterSet const &pset)
Definition: RunHelper.cc:12
InputFileCatalog secondaryCatalog_
Definition: PoolSource.h:75
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:441
static void fillDescriptions(ConfigurationDescriptions &descriptions)
Definition: PoolSource.cc:276
unsigned int value() const
Definition: StreamID.h:42
static constexpr LuminosityBlockNumber_t invalidLumi
bool readIt(EventID const &id, EventPrincipal &eventPrincipal, StreamContext &streamContext) override
Definition: PoolSource.cc:240
std::array< std::vector< BranchID >, NumBranchTypes > branchIDsToReplace_
Definition: PoolSource.h:79
void add(std::string const &label, ParameterSetDescription const &psetDescription)
ProductList & productListUpdator()
void endJob() override
Definition: PoolSource.cc:150
RunIndex index() const
Definition: RunPrincipal.h:56
void skip(int offset) override
Definition: PoolSource.cc:272
HLT enums.
std::unique_ptr< FileBlock > readFile_() override
Definition: PoolSource.cc:157
std::shared_ptr< std::recursive_mutex > mutexSharedWithDelayedReader_
Definition: PoolSource.h:93
static void fillDescription(ParameterSetDescription &desc)
Definition: InputSource.cc:115
unsigned int RunNumber_t
void rewind_() override
Definition: PoolSource.cc:269
ProcessConfiguration const & processConfiguration() const
Accessor for Process Configuration.
Definition: InputSource.h:201
std::pair< SharedResourcesAcquirer, std::shared_ptr< std::recursive_mutex > > createAcquirerForSourceDelayedReader()
edm::propagate_const< std::shared_ptr< LuminosityBlockPrincipal > > secondaryLumiPrincipal_
Definition: PoolSource.h:77
void recombine(Principal &other, std::vector< BranchID > const &bids)
Definition: Principal.cc:841
edm::propagate_const< std::unique_ptr< RootPrimaryFileSequence > > primaryFileSequence_
Definition: PoolSource.h:94
def move(src, dest)
Definition: eostools.py:511
Definition: event.py:1
void closeFile_() override
Definition: PoolSource.cc:165
ProcessingController::ForwardState forwardState_() const override
Definition: PoolSource.cc:315