CMS 3D CMS Logo

PoolSource.cc
Go to the documentation of this file.
1 /*----------------------------------------------------------------------
2 ----------------------------------------------------------------------*/
3 #include "PoolSource.h"
4 #include "InputFile.h"
7 #include "RunHelper.h"
26 
27 #include <set>
28 
29 namespace edm {
30 
31  class BranchID;
32  class LuminosityBlockID;
33  class EventID;
34  class ThinnedAssociationsHelper;
35 
36  namespace {
37  void checkHistoryConsistency(Principal const& primary, Principal const& secondary) {
38  ProcessHistory const& ph1 = primary.processHistory();
39  ProcessHistory const& ph2 = secondary.processHistory();
40  if(ph1 != ph2 && !isAncestor(ph2, ph1)) {
41  throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency") <<
42  "The secondary file is not an ancestor of the primary file\n";
43  }
44  }
45  void checkConsistency(EventPrincipal const& primary, EventPrincipal const& secondary) {
46  if(!isSameEvent(primary, secondary)) {
47  throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency") <<
48  primary.id() << " has inconsistent EventAuxiliary data in the primary and secondary file\n";
49  }
50  }
51  void checkConsistency(LuminosityBlockAuxiliary const& primary, LuminosityBlockAuxiliary const& secondary) {
52  if(primary.id() != secondary.id()) {
53  throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency") <<
54  primary.id() << " has inconsistent LuminosityBlockAuxiliary data in the primary and secondary file\n";
55  }
56  }
57  void checkConsistency(RunAuxiliary const& primary, RunAuxiliary const& secondary) {
58  if(primary.id() != secondary.id()) {
59  throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency") <<
60  primary.id() << " has inconsistent RunAuxiliary data in the primary and secondary file\n";
61  }
62  }
63  }
64 
66  InputSource(pset, desc),
67  rootServiceChecker_(),
68  catalog_(pset.getUntrackedParameter<std::vector<std::string> >("fileNames"),
69  pset.getUntrackedParameter<std::string>("overrideCatalog", std::string())),
70  secondaryCatalog_(pset.getUntrackedParameter<std::vector<std::string> >("secondaryFileNames", std::vector<std::string>()),
71  pset.getUntrackedParameter<std::string>("overrideCatalog", std::string())),
72  secondaryRunPrincipal_(),
73  secondaryLumiPrincipal_(),
74  secondaryEventPrincipals_(),
75  branchIDsToReplace_(),
76  nStreams_(desc.allocations_->numberOfStreams()),
77  skipBadFiles_(pset.getUntrackedParameter<bool>("skipBadFiles")),
78  bypassVersionCheck_(pset.getUntrackedParameter<bool>("bypassVersionCheck")),
79  treeMaxVirtualSize_(pset.getUntrackedParameter<int>("treeMaxVirtualSize")),
80  productSelectorRules_(pset, "inputCommands", "InputSource"),
81  dropDescendants_(pset.getUntrackedParameter<bool>("dropDescendantsOfDroppedBranches")),
82  labelRawDataLikeMC_(pset.getUntrackedParameter<bool>("labelRawDataLikeMC")),
83  runHelper_(makeRunHelper(pset)),
84  resourceSharedWithDelayedReaderPtr_(),
85  // Note: primaryFileSequence_ and secondaryFileSequence_ need to be initialized last, because they use data members
86  // initialized previously in their own initialization.
87  primaryFileSequence_(new RootPrimaryFileSequence(pset, *this, catalog_)),
88  secondaryFileSequence_(secondaryCatalog_.empty() ? nullptr :
89  new RootSecondaryFileSequence(pset, *this, secondaryCatalog_))
90  {
92  resourceSharedWithDelayedReaderPtr_ = std::make_unique<SharedResourcesAcquirer>(std::move(resources.first));
94 
95  if (secondaryCatalog_.empty() && pset.getUntrackedParameter<bool>("needSecondaryFileNames", false)) {
96  throw Exception(errors::Configuration, "PoolSource") << "'secondaryFileNames' must be specified\n";
97  }
100  for(unsigned int index = 0; index < nStreams_; ++index) {
101  secondaryEventPrincipals_.emplace_back(new EventPrincipal(secondaryFileSequence_->fileProductRegistry(),
102  secondaryFileSequence_->fileBranchIDListHelper(),
103  std::make_shared<ThinnedAssociationsHelper const>(),
105  nullptr,
106  index));
107  }
108  std::array<std::set<BranchID>, NumBranchTypes> idsToReplace;
109  ProductRegistry::ProductList const& secondary = secondaryFileSequence_->fileProductRegistry()->productList();
110  ProductRegistry::ProductList const& primary = primaryFileSequence_->fileProductRegistry()->productList();
111  std::set<BranchID> associationsFromSecondary;
112  //this is the registry used by the 'outside' world and only has the primary file information in it at present
114  for(auto const& item : secondary) {
115  if(item.second.present()) {
116  idsToReplace[item.second.branchType()].insert(item.second.branchID());
117  if(item.second.branchType() == InEvent &&
118  item.second.unwrappedType() == typeid(ThinnedAssociation)) {
119  associationsFromSecondary.insert(item.second.branchID());
120  }
121  //now make sure this is marked as not dropped else the product will not be 'get'table from the Event
122  auto itFound = fullList.find(item.first);
123  if(itFound != fullList.end()) {
124  itFound->second.setDropped(false);
125  }
126  }
127  }
128  for(auto const& item : primary) {
129  if(item.second.present()) {
130  idsToReplace[item.second.branchType()].erase(item.second.branchID());
131  associationsFromSecondary.erase(item.second.branchID());
132  }
133  }
134  if(idsToReplace[InEvent].empty() && idsToReplace[InLumi].empty() && idsToReplace[InRun].empty()) {
135  secondaryFileSequence_ = nullptr; // propagate_const<T> has no reset() function
136  } else {
137  for(int i = InEvent; i < NumBranchTypes; ++i) {
138  branchIDsToReplace_[i].reserve(idsToReplace[i].size());
139  for(auto const& id : idsToReplace[i]) {
140  branchIDsToReplace_[i].push_back(id);
141  }
142  }
143  secondaryFileSequence_->initAssociationsFromSecondary(associationsFromSecondary);
144  }
145  }
146  }
147 
149 
150  void
153  primaryFileSequence_->endJob();
155  }
156 
157  std::unique_ptr<FileBlock>
159  std::unique_ptr<FileBlock> fb = primaryFileSequence_->readFile_();
161  fb->setNotFastClonable(FileBlock::HasSecondaryFileSequence);
162  }
163  return fb;
164  }
165 
167  primaryFileSequence_->closeFile_();
168  }
169 
170  std::shared_ptr<RunAuxiliary>
172  return primaryFileSequence_->readRunAuxiliary_();
173  }
174 
175  std::shared_ptr<LuminosityBlockAuxiliary>
177  return primaryFileSequence_->readLuminosityBlockAuxiliary_();
178  }
179 
180  void
182  primaryFileSequence_->readRun_(runPrincipal);
184  bool found = secondaryFileSequence_->skipToItem(runPrincipal.run(), 0U, 0U);
185  if(found) {
186  std::shared_ptr<RunAuxiliary> secondaryAuxiliary = secondaryFileSequence_->readRunAuxiliary_();
187  checkConsistency(runPrincipal.aux(), *secondaryAuxiliary);
188  secondaryRunPrincipal_ = std::make_shared<RunPrincipal>(secondaryAuxiliary,
189  secondaryFileSequence_->fileProductRegistry(),
191  nullptr,
192  runPrincipal.index());
193  secondaryFileSequence_->readRun_(*secondaryRunPrincipal_);
194  checkHistoryConsistency(runPrincipal, *secondaryRunPrincipal_);
195  runPrincipal.recombine(*secondaryRunPrincipal_, branchIDsToReplace_[InRun]);
196  } else {
197  throw Exception(errors::MismatchedInputFiles, "PoolSource::readRun_")
198  << " Run " << runPrincipal.run()
199  << " is not found in the secondary input files\n";
200  }
201  }
202  }
203 
204  void
206  primaryFileSequence_->readLuminosityBlock_(lumiPrincipal);
208  bool found = secondaryFileSequence_->skipToItem(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0U);
209  if(found) {
210  std::shared_ptr<LuminosityBlockAuxiliary> secondaryAuxiliary = secondaryFileSequence_->readLuminosityBlockAuxiliary_();
211  checkConsistency(lumiPrincipal.aux(), *secondaryAuxiliary);
212  secondaryLumiPrincipal_ = std::make_shared<LuminosityBlockPrincipal>(secondaryAuxiliary,
213  secondaryFileSequence_->fileProductRegistry(),
215  nullptr,
216  lumiPrincipal.index());
217  secondaryFileSequence_->readLuminosityBlock_(*secondaryLumiPrincipal_);
218  checkHistoryConsistency(lumiPrincipal, *secondaryLumiPrincipal_);
219  lumiPrincipal.recombine(*secondaryLumiPrincipal_, branchIDsToReplace_[InLumi]);
220  } else {
221  throw Exception(errors::MismatchedInputFiles, "PoolSource::readLuminosityBlock_")
222  << " Run " << lumiPrincipal.run()
223  << " LuminosityBlock " << lumiPrincipal.luminosityBlock()
224  << " is not found in the secondary input files\n";
225  }
226  }
227  }
228 
229  void
231  primaryFileSequence_->readEvent(eventPrincipal);
233  bool found = secondaryFileSequence_->skipToItem(eventPrincipal.run(),
234  eventPrincipal.luminosityBlock(),
235  eventPrincipal.id().event());
236  if(found) {
237  EventPrincipal& secondaryEventPrincipal = *secondaryEventPrincipals_[eventPrincipal.streamID().value()];
238  secondaryFileSequence_->readEvent(secondaryEventPrincipal);
239  checkConsistency(eventPrincipal, secondaryEventPrincipal);
240  checkHistoryConsistency(eventPrincipal, secondaryEventPrincipal);
241  eventPrincipal.recombine(secondaryEventPrincipal, branchIDsToReplace_[InEvent]);
242  eventPrincipal.mergeProvenanceRetrievers(secondaryEventPrincipal);
243  secondaryEventPrincipal.clearPrincipal();
244  } else {
245  throw Exception(errors::MismatchedInputFiles, "PoolSource::readEvent_") <<
246  eventPrincipal.id() << " is not found in the secondary input files\n";
247  }
248  }
249  }
250 
251  bool
252  PoolSource::readIt(EventID const& id, EventPrincipal& eventPrincipal, StreamContext& streamContext) {
253  bool found = primaryFileSequence_->skipToItem(id.run(), id.luminosityBlock(), id.event());
254  if(!found) return false;
255  EventSourceSentry sentry(*this, streamContext);
256  readEvent_(eventPrincipal);
257  return true;
258  }
259 
265  InputSource::ItemType itemType = primaryFileSequence_->getNextItemType(run, lumi, event);
267  if(itemType == IsRun || itemType == IsLumi || itemType == IsEvent) {
268  if(!secondaryFileSequence_->containedInCurrentFile(run, lumi, event)) {
269  return IsSynchronize;
270  }
271  }
272  }
273  return runHelper_->nextItemType(state(), itemType);
274  }
275 
276  void
278  primaryFileSequence_->closeFile_();
279  }
280 
281  std::pair<SharedResourcesAcquirer*,std::recursive_mutex*>
283  return std::make_pair(resourceSharedWithDelayedReaderPtr_.get(), mutexSharedWithDelayedReader_.get());
284  }
285 
286  // Rewind to before the first event that was read.
287  void
289  primaryFileSequence_->rewind_();
290  }
291 
292  // Advance "offset" events. Offset can be positive or negative (or zero).
293  void
295  primaryFileSequence_->skipEvents(offset);
296  }
297 
298  bool
299  PoolSource::goToEvent_(EventID const& eventID) {
300  return primaryFileSequence_->goToEvent(eventID);
301  }
302 
303  void
305 
307 
308  std::vector<std::string> defaultStrings;
309  desc.setComment("Reads EDM/Root files.");
310  desc.addUntracked<std::vector<std::string> >("fileNames")
311  ->setComment("Names of files to be processed.");
312  desc.addUntracked<std::vector<std::string> >("secondaryFileNames", defaultStrings)
313  ->setComment("Names of secondary files to be processed.");
314  desc.addUntracked<bool>("needSecondaryFileNames", false)
315  ->setComment("If True, 'secondaryFileNames' must be specified and be non-empty.");
316  desc.addUntracked<std::string>("overrideCatalog", std::string());
317  desc.addUntracked<bool>("skipBadFiles", false)
318  ->setComment("True: Ignore any missing or unopenable input file.\n"
319  "False: Throw exception if missing or unopenable input file.");
320  desc.addUntracked<bool>("bypassVersionCheck", false)
321  ->setComment("True: Bypass release version check.\n"
322  "False: Throw exception if reading file in a release prior to the release in which the file was written.");
323  desc.addUntracked<int>("treeMaxVirtualSize", -1)
324  ->setComment("Size of ROOT TTree TBasket cache. Affects performance.");
325  desc.addUntracked<bool>("dropDescendantsOfDroppedBranches", true)
326  ->setComment("If True, also drop on input any descendent of any branch dropped on input.");
327  desc.addUntracked<bool>("labelRawDataLikeMC", true)
328  ->setComment("If True: replace module label for raw data to match MC. Also use 'LHC' as process.");
329  ProductSelectorRules::fillDescription(desc, "inputCommands");
333 
334  descriptions.add("source", desc);
335  }
336 
337  bool
339  return true;
340  }
341 
344  return primaryFileSequence_->forwardState();
345  }
346 
349  return primaryFileSequence_->reverseState();
350  }
351 }
size
Write out results.
virtual void preForkReleaseResources() override
Definition: PoolSource.cc:277
edm::propagate_const< std::unique_ptr< RootSecondaryFileSequence > > secondaryFileSequence_
Definition: PoolSource.h:92
PoolSource(ParameterSet const &pset, InputSourceDescription const &desc)
Definition: PoolSource.cc:65
EventNumber_t event() const
Definition: EventID.h:41
void clearPrincipal()
Definition: Principal.cc:299
T getUntrackedParameter(std::string const &, T const &) const
virtual bool goToEvent_(EventID const &eventID) override
Definition: PoolSource.cc:299
bool isSameEvent(EventAuxiliary const &a, EventAuxiliary const &b)
std::unique_ptr< SharedResourcesAcquirer > resourceSharedWithDelayedReaderPtr_
Definition: PoolSource.h:89
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:356
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
RunNumber_t run() const
edm::propagate_const< std::unique_ptr< RunHelperBase > > runHelper_
Definition: PoolSource.h:88
virtual ProcessingController::ReverseState reverseState_() const override
Definition: PoolSource.cc:348
std::map< BranchKey, BranchDescription > ProductList
static void fillDescription(ParameterSetDescription &desc)
Definition: RunHelper.cc:174
edm::propagate_const< std::shared_ptr< RunPrincipal > > secondaryRunPrincipal_
Definition: PoolSource.h:75
EventID const & id() const
unsigned long long EventNumber_t
LuminosityBlockAuxiliary const & aux() const
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:586
virtual void readLuminosityBlock_(LuminosityBlockPrincipal &lumiPrincipal) override
Definition: PoolSource.cc:205
LuminosityBlockIndex index() const
static void fillDescription(ParameterSetDescription &desc, char const *parameterName, std::vector< std::string > const &defaultStrings=defaultSelectionStrings())
LuminosityBlockNumber_t luminosityBlock() const
RunNumber_t run() const
Definition: RunPrincipal.h:61
static void fillDescription(ParameterSetDescription &desc)
#define nullptr
unsigned int LuminosityBlockNumber_t
unsigned int nStreams_
Definition: PoolSource.h:80
static EventNumber_t const invalidEvent
virtual std::shared_ptr< LuminosityBlockAuxiliary > readLuminosityBlockAuxiliary_() override
Definition: PoolSource.cc:176
std::pair< SharedResourcesAcquirer *, std::recursive_mutex * > resourceSharedWithDelayedReader_() override
Definition: PoolSource.cc:282
void mergeProvenanceRetrievers(EventPrincipal &other)
void setComment(std::string const &value)
virtual void readRun_(RunPrincipal &runPrincipal) override
Definition: PoolSource.cc:181
std::vector< edm::propagate_const< std::unique_ptr< EventPrincipal > > > secondaryEventPrincipals_
Definition: PoolSource.h:77
bool isAncestor(ProcessHistory const &a, ProcessHistory const &b)
LuminosityBlockNumber_t luminosityBlock() const
virtual void readEvent_(EventPrincipal &eventPrincipal) override
Definition: PoolSource.cc:230
static RunNumber_t const invalidRun
static SharedResourcesRegistry * instance()
static void reportReadBranches()
Definition: InputFile.cc:120
StreamID streamID() const
virtual std::shared_ptr< RunAuxiliary > readRunAuxiliary_() override
Definition: PoolSource.cc:171
virtual bool randomAccess_() const override
Definition: PoolSource.cc:338
RunAuxiliary const & aux() const
Definition: RunPrincipal.h:57
virtual ~PoolSource()
Definition: PoolSource.cc:148
virtual ItemType getNextItemType() override
Definition: PoolSource.cc:261
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
ItemType state() const
Definition: InputSource.h:358
std::unique_ptr< RunHelperBase > makeRunHelper(ParameterSet const &pset)
Definition: RunHelper.cc:13
InputFileCatalog secondaryCatalog_
Definition: PoolSource.h:74
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:592
static void fillDescriptions(ConfigurationDescriptions &descriptions)
Definition: PoolSource.cc:304
unsigned int value() const
Definition: StreamID.h:46
static LuminosityBlockNumber_t const invalidLumi
virtual bool readIt(EventID const &id, EventPrincipal &eventPrincipal, StreamContext &streamContext) override
Definition: PoolSource.cc:252
std::array< std::vector< BranchID >, NumBranchTypes > branchIDsToReplace_
Definition: PoolSource.h:78
void add(std::string const &label, ParameterSetDescription const &psetDescription)
ProductList & productListUpdator()
virtual void endJob() override
Definition: PoolSource.cc:151
RunIndex index() const
Definition: RunPrincipal.h:53
virtual void skip(int offset) override
Definition: PoolSource.cc:294
HLT enums.
virtual std::unique_ptr< FileBlock > readFile_() override
Definition: PoolSource.cc:158
std::shared_ptr< std::recursive_mutex > mutexSharedWithDelayedReader_
Definition: PoolSource.h:90
static void fillDescription(ParameterSetDescription &desc)
Definition: InputSource.cc:127
unsigned int RunNumber_t
virtual void rewind_() override
Definition: PoolSource.cc:288
ProcessConfiguration const & processConfiguration() const
Accessor for Process Configuration.
Definition: InputSource.h:216
std::pair< SharedResourcesAcquirer, std::shared_ptr< std::recursive_mutex > > createAcquirerForSourceDelayedReader()
edm::propagate_const< std::shared_ptr< LuminosityBlockPrincipal > > secondaryLumiPrincipal_
Definition: PoolSource.h:76
void recombine(Principal &other, std::vector< BranchID > const &bids)
Definition: Principal.cc:760
edm::propagate_const< std::unique_ptr< RootPrimaryFileSequence > > primaryFileSequence_
Definition: PoolSource.h:91
def move(src, dest)
Definition: eostools.py:510
Definition: event.py:1
virtual void closeFile_() override
Definition: PoolSource.cc:166
virtual ProcessingController::ForwardState forwardState_() const override
Definition: PoolSource.cc:343