CMS 3D CMS Logo

PoolSource.cc
Go to the documentation of this file.
1 /*----------------------------------------------------------------------
2 ----------------------------------------------------------------------*/
3 #include "PoolSource.h"
4 #include "InputFile.h"
7 #include "RunHelper.h"
26 
27 #include <set>
28 
29 namespace edm {
30 
31  class BranchID;
32  class LuminosityBlockID;
33  class EventID;
34  class ThinnedAssociationsHelper;
35 
36  namespace {
37  void checkHistoryConsistency(Principal const& primary, Principal const& secondary) {
38  ProcessHistory const& ph1 = primary.processHistory();
39  ProcessHistory const& ph2 = secondary.processHistory();
40  if(ph1 != ph2 && !isAncestor(ph2, ph1)) {
41  throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency") <<
42  "The secondary file is not an ancestor of the primary file\n";
43  }
44  }
45  void checkConsistency(EventPrincipal const& primary, EventPrincipal const& secondary) {
46  if(!isSameEvent(primary, secondary)) {
47  throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency") <<
48  primary.id() << " has inconsistent EventAuxiliary data in the primary and secondary file\n";
49  }
50  }
51  void checkConsistency(LuminosityBlockAuxiliary const& primary, LuminosityBlockAuxiliary const& secondary) {
52  if(primary.id() != secondary.id()) {
53  throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency") <<
54  primary.id() << " has inconsistent LuminosityBlockAuxiliary data in the primary and secondary file\n";
55  }
56  }
57  void checkConsistency(RunAuxiliary const& primary, RunAuxiliary const& secondary) {
58  if(primary.id() != secondary.id()) {
59  throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency") <<
60  primary.id() << " has inconsistent RunAuxiliary data in the primary and secondary file\n";
61  }
62  }
63  }
64 
66  InputSource(pset, desc),
67  rootServiceChecker_(),
68  catalog_(pset.getUntrackedParameter<std::vector<std::string> >("fileNames"),
69  pset.getUntrackedParameter<std::string>("overrideCatalog", std::string())),
70  secondaryCatalog_(pset.getUntrackedParameter<std::vector<std::string> >("secondaryFileNames", std::vector<std::string>()),
71  pset.getUntrackedParameter<std::string>("overrideCatalog", std::string())),
72  secondaryRunPrincipal_(),
73  secondaryLumiPrincipal_(),
74  secondaryEventPrincipals_(),
75  branchIDsToReplace_(),
76  nStreams_(desc.allocations_->numberOfStreams()),
77  skipBadFiles_(pset.getUntrackedParameter<bool>("skipBadFiles")),
78  bypassVersionCheck_(pset.getUntrackedParameter<bool>("bypassVersionCheck")),
79  treeMaxVirtualSize_(pset.getUntrackedParameter<int>("treeMaxVirtualSize")),
80  productSelectorRules_(pset, "inputCommands", "InputSource"),
81  dropDescendants_(pset.getUntrackedParameter<bool>("dropDescendantsOfDroppedBranches")),
82  labelRawDataLikeMC_(pset.getUntrackedParameter<bool>("labelRawDataLikeMC")),
83  delayReadingEventProducts_(pset.getUntrackedParameter<bool>("delayReadingEventProducts")),
84  runHelper_(makeRunHelper(pset)),
85  resourceSharedWithDelayedReaderPtr_(),
86  // Note: primaryFileSequence_ and secondaryFileSequence_ need to be initialized last, because they use data members
87  // initialized previously in their own initialization.
88  primaryFileSequence_(new RootPrimaryFileSequence(pset, *this, catalog_)),
89  secondaryFileSequence_(secondaryCatalog_.empty() ? nullptr :
90  new RootSecondaryFileSequence(pset, *this, secondaryCatalog_))
91  {
93  resourceSharedWithDelayedReaderPtr_ = std::make_unique<SharedResourcesAcquirer>(std::move(resources.first));
95 
96  if (secondaryCatalog_.empty() && pset.getUntrackedParameter<bool>("needSecondaryFileNames", false)) {
97  throw Exception(errors::Configuration, "PoolSource") << "'secondaryFileNames' must be specified\n";
98  }
101  for(unsigned int index = 0; index < nStreams_; ++index) {
102  secondaryEventPrincipals_.emplace_back(new EventPrincipal(secondaryFileSequence_->fileProductRegistry(),
103  secondaryFileSequence_->fileBranchIDListHelper(),
104  std::make_shared<ThinnedAssociationsHelper const>(),
106  nullptr,
107  index));
108  }
109  std::array<std::set<BranchID>, NumBranchTypes> idsToReplace;
110  ProductRegistry::ProductList const& secondary = secondaryFileSequence_->fileProductRegistry()->productList();
111  ProductRegistry::ProductList const& primary = primaryFileSequence_->fileProductRegistry()->productList();
112  std::set<BranchID> associationsFromSecondary;
113  //this is the registry used by the 'outside' world and only has the primary file information in it at present
115  for(auto const& item : secondary) {
116  if(item.second.present()) {
117  idsToReplace[item.second.branchType()].insert(item.second.branchID());
118  if(item.second.branchType() == InEvent &&
119  item.second.unwrappedType() == typeid(ThinnedAssociation)) {
120  associationsFromSecondary.insert(item.second.branchID());
121  }
122  //now make sure this is marked as not dropped else the product will not be 'get'table from the Event
123  auto itFound = fullList.find(item.first);
124  if(itFound != fullList.end()) {
125  itFound->second.setDropped(false);
126  }
127  }
128  }
129  for(auto const& item : primary) {
130  if(item.second.present()) {
131  idsToReplace[item.second.branchType()].erase(item.second.branchID());
132  associationsFromSecondary.erase(item.second.branchID());
133  }
134  }
135  if(idsToReplace[InEvent].empty() && idsToReplace[InLumi].empty() && idsToReplace[InRun].empty()) {
136  secondaryFileSequence_ = nullptr; // propagate_const<T> has no reset() function
137  } else {
138  for(int i = InEvent; i < NumBranchTypes; ++i) {
139  branchIDsToReplace_[i].reserve(idsToReplace[i].size());
140  for(auto const& id : idsToReplace[i]) {
141  branchIDsToReplace_[i].push_back(id);
142  }
143  }
144  secondaryFileSequence_->initAssociationsFromSecondary(associationsFromSecondary);
145  }
146  }
147  }
148 
150 
151  void
154  primaryFileSequence_->endJob();
156  }
157 
158  std::unique_ptr<FileBlock>
160  std::unique_ptr<FileBlock> fb = primaryFileSequence_->readFile_();
162  fb->setNotFastClonable(FileBlock::HasSecondaryFileSequence);
163  }
164  return fb;
165  }
166 
168  primaryFileSequence_->closeFile_();
169  }
170 
171  std::shared_ptr<RunAuxiliary>
173  return primaryFileSequence_->readRunAuxiliary_();
174  }
175 
176  std::shared_ptr<LuminosityBlockAuxiliary>
178  return primaryFileSequence_->readLuminosityBlockAuxiliary_();
179  }
180 
181  void
183  primaryFileSequence_->readRun_(runPrincipal);
185  bool found = secondaryFileSequence_->skipToItem(runPrincipal.run(), 0U, 0U);
186  if(found) {
187  std::shared_ptr<RunAuxiliary> secondaryAuxiliary = secondaryFileSequence_->readRunAuxiliary_();
188  checkConsistency(runPrincipal.aux(), *secondaryAuxiliary);
189  secondaryRunPrincipal_ = std::make_shared<RunPrincipal>(secondaryAuxiliary,
190  secondaryFileSequence_->fileProductRegistry(),
192  nullptr,
193  runPrincipal.index());
194  secondaryFileSequence_->readRun_(*secondaryRunPrincipal_);
195  checkHistoryConsistency(runPrincipal, *secondaryRunPrincipal_);
196  runPrincipal.recombine(*secondaryRunPrincipal_, branchIDsToReplace_[InRun]);
197  } else {
198  throw Exception(errors::MismatchedInputFiles, "PoolSource::readRun_")
199  << " Run " << runPrincipal.run()
200  << " is not found in the secondary input files\n";
201  }
202  }
203  }
204 
205  void
207  primaryFileSequence_->readLuminosityBlock_(lumiPrincipal);
209  bool found = secondaryFileSequence_->skipToItem(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0U);
210  if(found) {
211  std::shared_ptr<LuminosityBlockAuxiliary> secondaryAuxiliary = secondaryFileSequence_->readLuminosityBlockAuxiliary_();
212  checkConsistency(lumiPrincipal.aux(), *secondaryAuxiliary);
213  secondaryLumiPrincipal_ = std::make_shared<LuminosityBlockPrincipal>(
214  secondaryFileSequence_->fileProductRegistry(),
216  nullptr,
217  lumiPrincipal.index());
218  secondaryLumiPrincipal_->setAux(*secondaryAuxiliary);
219  secondaryFileSequence_->readLuminosityBlock_(*secondaryLumiPrincipal_);
220  checkHistoryConsistency(lumiPrincipal, *secondaryLumiPrincipal_);
221  lumiPrincipal.recombine(*secondaryLumiPrincipal_, branchIDsToReplace_[InLumi]);
222  } else {
223  throw Exception(errors::MismatchedInputFiles, "PoolSource::readLuminosityBlock_")
224  << " Run " << lumiPrincipal.run()
225  << " LuminosityBlock " << lumiPrincipal.luminosityBlock()
226  << " is not found in the secondary input files\n";
227  }
228  }
229  }
230 
231  void
233  primaryFileSequence_->readEvent(eventPrincipal);
235  bool found = secondaryFileSequence_->skipToItem(eventPrincipal.run(),
236  eventPrincipal.luminosityBlock(),
237  eventPrincipal.id().event());
238  if(found) {
239  EventPrincipal& secondaryEventPrincipal = *secondaryEventPrincipals_[eventPrincipal.streamID().value()];
240  secondaryFileSequence_->readEvent(secondaryEventPrincipal);
241  checkConsistency(eventPrincipal, secondaryEventPrincipal);
242  checkHistoryConsistency(eventPrincipal, secondaryEventPrincipal);
243  eventPrincipal.recombine(secondaryEventPrincipal, branchIDsToReplace_[InEvent]);
244  eventPrincipal.mergeProvenanceRetrievers(secondaryEventPrincipal);
245  secondaryEventPrincipal.clearPrincipal();
246  } else {
247  throw Exception(errors::MismatchedInputFiles, "PoolSource::readEvent_") <<
248  eventPrincipal.id() << " is not found in the secondary input files\n";
249  }
250  }
252  eventPrincipal.readAllFromSourceAndMergeImmediately();
253  }
254  }
255 
256  bool
257  PoolSource::readIt(EventID const& id, EventPrincipal& eventPrincipal, StreamContext& streamContext) {
258  bool found = primaryFileSequence_->skipToItem(id.run(), id.luminosityBlock(), id.event());
259  if(!found) return false;
260  EventSourceSentry sentry(*this, streamContext);
261  readEvent_(eventPrincipal);
262  return true;
263  }
264 
270  InputSource::ItemType itemType = primaryFileSequence_->getNextItemType(run, lumi, event);
272  if(itemType == IsRun || itemType == IsLumi || itemType == IsEvent) {
273  if(!secondaryFileSequence_->containedInCurrentFile(run, lumi, event)) {
274  return IsSynchronize;
275  }
276  }
277  }
278  return runHelper_->nextItemType(state(), itemType);
279  }
280 
281  std::pair<SharedResourcesAcquirer*,std::recursive_mutex*>
283  return std::make_pair(resourceSharedWithDelayedReaderPtr_.get(), mutexSharedWithDelayedReader_.get());
284  }
285 
286  // Rewind to before the first event that was read.
287  void
289  primaryFileSequence_->rewind_();
290  }
291 
292  // Advance "offset" events. Offset can be positive or negative (or zero).
293  void
295  primaryFileSequence_->skipEvents(offset);
296  }
297 
298  bool
299  PoolSource::goToEvent_(EventID const& eventID) {
300  return primaryFileSequence_->goToEvent(eventID);
301  }
302 
303  void
305 
307 
308  std::vector<std::string> defaultStrings;
309  desc.setComment("Reads EDM/Root files.");
310  desc.addUntracked<std::vector<std::string> >("fileNames")
311  ->setComment("Names of files to be processed.");
312  desc.addUntracked<std::vector<std::string> >("secondaryFileNames", defaultStrings)
313  ->setComment("Names of secondary files to be processed.");
314  desc.addUntracked<bool>("needSecondaryFileNames", false)
315  ->setComment("If True, 'secondaryFileNames' must be specified and be non-empty.");
316  desc.addUntracked<std::string>("overrideCatalog", std::string());
317  desc.addUntracked<bool>("skipBadFiles", false)
318  ->setComment("True: Ignore any missing or unopenable input file.\n"
319  "False: Throw exception if missing or unopenable input file.");
320  desc.addUntracked<bool>("bypassVersionCheck", false)
321  ->setComment("True: Bypass release version check.\n"
322  "False: Throw exception if reading file in a release prior to the release in which the file was written.");
323  desc.addUntracked<int>("treeMaxVirtualSize", -1)
324  ->setComment("Size of ROOT TTree TBasket cache. Affects performance.");
325  desc.addUntracked<bool>("dropDescendantsOfDroppedBranches", true)
326  ->setComment("If True, also drop on input any descendent of any branch dropped on input.");
327  desc.addUntracked<bool>("labelRawDataLikeMC", true)
328  ->setComment("If True: replace module label for raw data to match MC. Also use 'LHC' as process.");
329  desc.addUntracked<bool>("delayReadingEventProducts",true)->setComment("If True: do not read a data product from the file until it is requested. If False: all event data products are read upfront.");
330  ProductSelectorRules::fillDescription(desc, "inputCommands");
334 
335  descriptions.add("source", desc);
336  }
337 
338  bool
340  return true;
341  }
342 
345  return primaryFileSequence_->forwardState();
346  }
347 
350  return primaryFileSequence_->reverseState();
351  }
352 }
size
Write out results.
edm::propagate_const< std::unique_ptr< RootSecondaryFileSequence > > secondaryFileSequence_
Definition: PoolSource.h:93
PoolSource(ParameterSet const &pset, InputSourceDescription const &desc)
Definition: PoolSource.cc:65
EventNumber_t event() const
Definition: EventID.h:41
void clearPrincipal()
Definition: Principal.cc:333
T getUntrackedParameter(std::string const &, T const &) const
bool goToEvent_(EventID const &eventID) override
Definition: PoolSource.cc:299
bool isSameEvent(EventAuxiliary const &a, EventAuxiliary const &b)
std::unique_ptr< SharedResourcesAcquirer > resourceSharedWithDelayedReaderPtr_
Definition: PoolSource.h:90
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:334
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
RunNumber_t run() const
edm::propagate_const< std::unique_ptr< RunHelperBase > > runHelper_
Definition: PoolSource.h:89
ProcessingController::ReverseState reverseState_() const override
Definition: PoolSource.cc:349
std::map< BranchKey, BranchDescription > ProductList
static void fillDescription(ParameterSetDescription &desc)
Definition: RunHelper.cc:174
edm::propagate_const< std::shared_ptr< RunPrincipal > > secondaryRunPrincipal_
Definition: PoolSource.h:75
EventID const & id() const
unsigned long long EventNumber_t
LuminosityBlockAuxiliary const & aux() const
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:497
void readLuminosityBlock_(LuminosityBlockPrincipal &lumiPrincipal) override
Definition: PoolSource.cc:206
LuminosityBlockIndex index() const
static void fillDescription(ParameterSetDescription &desc, char const *parameterName, std::vector< std::string > const &defaultStrings=defaultSelectionStrings())
LuminosityBlockNumber_t luminosityBlock() const
RunNumber_t run() const
Definition: RunPrincipal.h:61
static void fillDescription(ParameterSetDescription &desc)
#define nullptr
unsigned int LuminosityBlockNumber_t
unsigned int nStreams_
Definition: PoolSource.h:80
static constexpr EventNumber_t invalidEvent
std::shared_ptr< LuminosityBlockAuxiliary > readLuminosityBlockAuxiliary_() override
Definition: PoolSource.cc:177
~PoolSource() override
Definition: PoolSource.cc:149
std::pair< SharedResourcesAcquirer *, std::recursive_mutex * > resourceSharedWithDelayedReader_() override
Definition: PoolSource.cc:282
void readAllFromSourceAndMergeImmediately()
Definition: Principal.cc:909
void mergeProvenanceRetrievers(EventPrincipal &other)
bool delayReadingEventProducts_
Definition: PoolSource.h:87
void setComment(std::string const &value)
void readRun_(RunPrincipal &runPrincipal) override
Definition: PoolSource.cc:182
std::vector< edm::propagate_const< std::unique_ptr< EventPrincipal > > > secondaryEventPrincipals_
Definition: PoolSource.h:77
bool isAncestor(ProcessHistory const &a, ProcessHistory const &b)
LuminosityBlockNumber_t luminosityBlock() const
void readEvent_(EventPrincipal &eventPrincipal) override
Definition: PoolSource.cc:232
static constexpr RunNumber_t invalidRun
static SharedResourcesRegistry * instance()
static void reportReadBranches()
Definition: InputFile.cc:122
StreamID streamID() const
std::shared_ptr< RunAuxiliary > readRunAuxiliary_() override
Definition: PoolSource.cc:172
bool randomAccess_() const override
Definition: PoolSource.cc:339
RunAuxiliary const & aux() const
Definition: RunPrincipal.h:57
ItemType getNextItemType() override
Definition: PoolSource.cc:266
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
ItemType state() const
Definition: InputSource.h:336
std::unique_ptr< RunHelperBase > makeRunHelper(ParameterSet const &pset)
Definition: RunHelper.cc:13
InputFileCatalog secondaryCatalog_
Definition: PoolSource.h:74
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:503
static void fillDescriptions(ConfigurationDescriptions &descriptions)
Definition: PoolSource.cc:304
unsigned int value() const
Definition: StreamID.h:46
static constexpr LuminosityBlockNumber_t invalidLumi
bool readIt(EventID const &id, EventPrincipal &eventPrincipal, StreamContext &streamContext) override
Definition: PoolSource.cc:257
std::array< std::vector< BranchID >, NumBranchTypes > branchIDsToReplace_
Definition: PoolSource.h:78
void add(std::string const &label, ParameterSetDescription const &psetDescription)
ProductList & productListUpdator()
void endJob() override
Definition: PoolSource.cc:152
RunIndex index() const
Definition: RunPrincipal.h:53
void skip(int offset) override
Definition: PoolSource.cc:294
HLT enums.
std::unique_ptr< FileBlock > readFile_() override
Definition: PoolSource.cc:159
std::shared_ptr< std::recursive_mutex > mutexSharedWithDelayedReader_
Definition: PoolSource.h:91
static void fillDescription(ParameterSetDescription &desc)
Definition: InputSource.cc:122
unsigned int RunNumber_t
void rewind_() override
Definition: PoolSource.cc:288
ProcessConfiguration const & processConfiguration() const
Accessor for Process Configuration.
Definition: InputSource.h:208
std::pair< SharedResourcesAcquirer, std::shared_ptr< std::recursive_mutex > > createAcquirerForSourceDelayedReader()
edm::propagate_const< std::shared_ptr< LuminosityBlockPrincipal > > secondaryLumiPrincipal_
Definition: PoolSource.h:76
void recombine(Principal &other, std::vector< BranchID > const &bids)
Definition: Principal.cc:838
edm::propagate_const< std::unique_ptr< RootPrimaryFileSequence > > primaryFileSequence_
Definition: PoolSource.h:92
def move(src, dest)
Definition: eostools.py:510
Definition: event.py:1
void closeFile_() override
Definition: PoolSource.cc:167
ProcessingController::ForwardState forwardState_() const override
Definition: PoolSource.cc:344