CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
PoolSource.cc
Go to the documentation of this file.
1 /*----------------------------------------------------------------------
2 ----------------------------------------------------------------------*/
3 #include "PoolSource.h"
4 #include "InputFile.h"
25 
26 #include <set>
27 
28 namespace edm {
29 
30  class BranchID;
31  class LuminosityBlockID;
32  class EventID;
33  class ThinnedAssociationsHelper;
34 
35  namespace {
36  void checkHistoryConsistency(Principal const& primary, Principal const& secondary) {
37  ProcessHistory const& ph1 = primary.processHistory();
38  ProcessHistory const& ph2 = secondary.processHistory();
39  if(ph1 != ph2 && !isAncestor(ph2, ph1)) {
40  throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency") <<
41  "The secondary file is not an ancestor of the primary file\n";
42  }
43  }
44  void checkConsistency(EventPrincipal const& primary, EventPrincipal const& secondary) {
45  if(!isSameEvent(primary, secondary)) {
46  throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency") <<
47  primary.id() << " has inconsistent EventAuxiliary data in the primary and secondary file\n";
48  }
49  }
50  void checkConsistency(LuminosityBlockAuxiliary const& primary, LuminosityBlockAuxiliary const& secondary) {
51  if(primary.id() != secondary.id()) {
52  throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency") <<
53  primary.id() << " has inconsistent LuminosityBlockAuxiliary data in the primary and secondary file\n";
54  }
55  }
56  void checkConsistency(RunAuxiliary const& primary, RunAuxiliary const& secondary) {
57  if(primary.id() != secondary.id()) {
58  throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency") <<
59  primary.id() << " has inconsistent RunAuxiliary data in the primary and secondary file\n";
60  }
61  }
62  }
63 
65  InputSource(pset, desc),
66  rootServiceChecker_(),
67  catalog_(pset.getUntrackedParameter<std::vector<std::string> >("fileNames"),
68  pset.getUntrackedParameter<std::string>("overrideCatalog", std::string())),
69  secondaryCatalog_(pset.getUntrackedParameter<std::vector<std::string> >("secondaryFileNames", std::vector<std::string>()),
70  pset.getUntrackedParameter<std::string>("overrideCatalog", std::string())),
71  primaryFileSequence_(new RootPrimaryFileSequence(pset, *this, catalog_, desc.allocations_->numberOfStreams())),
72  secondaryFileSequence_(secondaryCatalog_.empty() ? nullptr :
73  new RootSecondaryFileSequence(pset, *this, secondaryCatalog_, desc.allocations_->numberOfStreams())),
74  secondaryRunPrincipal_(),
75  secondaryLumiPrincipal_(),
76  secondaryEventPrincipals_(),
77  branchIDsToReplace_(),
79  {
80  if (secondaryCatalog_.empty() && pset.getUntrackedParameter<bool>("needSecondaryFileNames", false)) {
81  throw Exception(errors::Configuration, "PoolSource") << "'secondaryFileNames' must be specified\n";
82  }
83  if(secondaryFileSequence_) {
84  unsigned int nStreams = desc.allocations_->numberOfStreams();
85  secondaryEventPrincipals_.reserve(nStreams);
86  for(unsigned int index = 0; index < nStreams; ++index) {
87  secondaryEventPrincipals_.emplace_back(new EventPrincipal(secondaryFileSequence_->fileProductRegistry(),
88  secondaryFileSequence_->fileBranchIDListHelper(),
89  std::make_shared<ThinnedAssociationsHelper const>(),
91  nullptr,
92  index));
93  }
94  std::array<std::set<BranchID>, NumBranchTypes> idsToReplace;
95  ProductRegistry::ProductList const& secondary = secondaryFileSequence_->fileProductRegistry()->productList();
96  ProductRegistry::ProductList const& primary = primaryFileSequence_->fileProductRegistry()->productList();
97  std::set<BranchID> associationsFromSecondary;
98  typedef ProductRegistry::ProductList::const_iterator const_iterator;
99  typedef ProductRegistry::ProductList::iterator iterator;
100  //this is the registry used by the 'outside' world and only has the primary file information in it at present
101  ProductRegistry::ProductList& fullList = productRegistryUpdate().productListUpdator();
102  for(const_iterator it = secondary.begin(), itEnd = secondary.end(); it != itEnd; ++it) {
103  if(it->second.present()) {
104  idsToReplace[it->second.branchType()].insert(it->second.branchID());
105  if(it->second.branchType() == InEvent &&
106  it->second.unwrappedType() == typeid(ThinnedAssociation)) {
107  associationsFromSecondary.insert(it->second.branchID());
108  }
109  //now make sure this is marked as not dropped else the product will not be 'get'table from the Event
110  iterator itFound = fullList.find(it->first);
111  if(itFound != fullList.end()) {
112  itFound->second.setDropped(false);
113  }
114  }
115  }
116  for(const_iterator it = primary.begin(), itEnd = primary.end(); it != itEnd; ++it) {
117  if(it->second.present()) {
118  idsToReplace[it->second.branchType()].erase(it->second.branchID());
119  associationsFromSecondary.erase(it->second.branchID());
120  }
121  }
122  if(idsToReplace[InEvent].empty() && idsToReplace[InLumi].empty() && idsToReplace[InRun].empty()) {
123  secondaryFileSequence_.reset();
124  } else {
125  for(int i = InEvent; i < NumBranchTypes; ++i) {
126  branchIDsToReplace_[i].reserve(idsToReplace[i].size());
127  for(std::set<BranchID>::const_iterator it = idsToReplace[i].begin(), itEnd = idsToReplace[i].end();
128  it != itEnd; ++it) {
129  branchIDsToReplace_[i].push_back(*it);
130  }
131  }
132  }
133  secondaryFileSequence_->initAssociationsFromSecondary(associationsFromSecondary);
134  }
135  }
136 
138 
139  void
142  primaryFileSequence_->endJob();
144  }
145 
146  std::unique_ptr<FileBlock>
148  std::unique_ptr<FileBlock> fb = primaryFileSequence_->readFile_();
150  fb->setNotFastClonable(FileBlock::HasSecondaryFileSequence);
151  }
152  return std::move(fb);
153  }
154 
156  primaryFileSequence_->closeFile_();
157  }
158 
159  std::shared_ptr<RunAuxiliary>
161  return primaryFileSequence_->readRunAuxiliary_();
162  }
163 
164  std::shared_ptr<LuminosityBlockAuxiliary>
166  return primaryFileSequence_->readLuminosityBlockAuxiliary_();
167  }
168 
169  void
171  primaryFileSequence_->readRun_(runPrincipal);
173  bool found = secondaryFileSequence_->skipToItem(runPrincipal.run(), 0U, 0U);
174  if(found) {
175  std::shared_ptr<RunAuxiliary> secondaryAuxiliary = secondaryFileSequence_->readRunAuxiliary_();
176  checkConsistency(runPrincipal.aux(), *secondaryAuxiliary);
177  secondaryRunPrincipal_ = std::make_shared<RunPrincipal>(secondaryAuxiliary,
178  secondaryFileSequence_->fileProductRegistry(),
180  nullptr,
181  runPrincipal.index());
182  secondaryFileSequence_->readRun_(*secondaryRunPrincipal_);
183  checkHistoryConsistency(runPrincipal, *secondaryRunPrincipal_);
184  runPrincipal.recombine(*secondaryRunPrincipal_, branchIDsToReplace_[InRun]);
185  } else {
186  throw Exception(errors::MismatchedInputFiles, "PoolSource::readRun_")
187  << " Run " << runPrincipal.run()
188  << " is not found in the secondary input files\n";
189  }
190  }
191  }
192 
193  void
195  primaryFileSequence_->readLuminosityBlock_(lumiPrincipal);
197  bool found = secondaryFileSequence_->skipToItem(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0U);
198  if(found) {
199  std::shared_ptr<LuminosityBlockAuxiliary> secondaryAuxiliary = secondaryFileSequence_->readLuminosityBlockAuxiliary_();
200  checkConsistency(lumiPrincipal.aux(), *secondaryAuxiliary);
201  secondaryLumiPrincipal_ = std::make_shared<LuminosityBlockPrincipal>(secondaryAuxiliary,
202  secondaryFileSequence_->fileProductRegistry(),
204  nullptr,
205  lumiPrincipal.index());
206  secondaryFileSequence_->readLuminosityBlock_(*secondaryLumiPrincipal_);
207  checkHistoryConsistency(lumiPrincipal, *secondaryLumiPrincipal_);
208  lumiPrincipal.recombine(*secondaryLumiPrincipal_, branchIDsToReplace_[InLumi]);
209  } else {
210  throw Exception(errors::MismatchedInputFiles, "PoolSource::readLuminosityBlock_")
211  << " Run " << lumiPrincipal.run()
212  << " LuminosityBlock " << lumiPrincipal.luminosityBlock()
213  << " is not found in the secondary input files\n";
214  }
215  }
216  }
217 
218  void
220  primaryFileSequence_->readEvent(eventPrincipal);
222  bool found = secondaryFileSequence_->skipToItem(eventPrincipal.run(),
223  eventPrincipal.luminosityBlock(),
224  eventPrincipal.id().event());
225  if(found) {
226  EventPrincipal& secondaryEventPrincipal = *secondaryEventPrincipals_[eventPrincipal.streamID().value()];
227  secondaryFileSequence_->readEvent(secondaryEventPrincipal);
228  checkConsistency(eventPrincipal, secondaryEventPrincipal);
229  checkHistoryConsistency(eventPrincipal, secondaryEventPrincipal);
230  eventPrincipal.recombine(secondaryEventPrincipal, branchIDsToReplace_[InEvent]);
231  eventPrincipal.mergeProvenanceRetrievers(secondaryEventPrincipal);
232  secondaryEventPrincipal.clearPrincipal();
233  } else {
234  throw Exception(errors::MismatchedInputFiles, "PoolSource::readEvent_") <<
235  eventPrincipal.id() << " is not found in the secondary input files\n";
236  }
237  }
238  }
239 
240  bool
241  PoolSource::readIt(EventID const& id, EventPrincipal& eventPrincipal, StreamContext& streamContext) {
242  bool found = primaryFileSequence_->skipToItem(id.run(), id.luminosityBlock(), id.event());
243  if(!found) return false;
244  EventSourceSentry sentry(*this, streamContext);
245  readEvent_(eventPrincipal);
246  return true;
247  }
248 
254  InputSource::ItemType itemType = primaryFileSequence_->getNextItemType(run, lumi, event);
256  if(itemType == IsRun || itemType == IsLumi || itemType == IsEvent) {
257  if(!secondaryFileSequence_->containedInCurrentFile(run, lumi, event)) {
258  return IsSynchronize;
259  }
260  }
261  }
262  return itemType;
263  }
264 
265  void
267  primaryFileSequence_->closeFile_();
268  }
269 
273  }
274 
275  // Rewind to before the first event that was read.
276  void
278  primaryFileSequence_->rewind_();
279  }
280 
281  // Advance "offset" events. Offset can be positive or negative (or zero).
282  void
284  primaryFileSequence_->skipEvents(offset);
285  }
286 
287  bool
288  PoolSource::goToEvent_(EventID const& eventID) {
289  return primaryFileSequence_->goToEvent(eventID);
290  }
291 
292  void
294 
296 
297  std::vector<std::string> defaultStrings;
298  desc.setComment("Reads EDM/Root files.");
299  desc.addUntracked<std::vector<std::string> >("fileNames")
300  ->setComment("Names of files to be processed.");
301  desc.addUntracked<std::vector<std::string> >("secondaryFileNames", defaultStrings)
302  ->setComment("Names of secondary files to be processed.");
303  desc.addUntracked<bool>("needSecondaryFileNames", false)
304  ->setComment("If True, 'secondaryFileNames' must be specified and be non-empty.");
305  desc.addUntracked<std::string>("overrideCatalog", std::string());
308 
309  descriptions.add("source", desc);
310  }
311 
312  bool
314  return true;
315  }
316 
319  return primaryFileSequence_->forwardState();
320  }
321 
324  return primaryFileSequence_->reverseState();
325  }
326 }
PoolSource(ParameterSet const &pset, InputSourceDescription const &desc)
Definition: PoolSource.cc:64
EventNumber_t event() const
Definition: EventID.h:41
void clearPrincipal()
Definition: Principal.cc:319
int i
Definition: DBlmapReader.cc:9
SharedResourcesAcquirer createAcquirerForSourceDelayedReader()
bool isSameEvent(EventAuxiliary const &a, EventAuxiliary const &b)
std::unique_ptr< SharedResourcesAcquirer > resourceSharedWithDelayedReaderPtr_
Definition: PoolSource.h:69
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
virtual std::unique_ptr< FileBlock > readFile_()
Definition: PoolSource.cc:147
RunNumber_t run() const
virtual ProcessingController::ForwardState forwardState_() const
Definition: PoolSource.cc:318
virtual bool goToEvent_(EventID const &eventID)
Definition: PoolSource.cc:288
std::shared_ptr< RunPrincipal > secondaryRunPrincipal_
Definition: PoolSource.h:64
tuple lumi
Definition: fjr2json.py:35
std::map< BranchKey, BranchDescription > ProductList
EventID const & id() const
unsigned long long EventNumber_t
processConfiguration
Definition: Schedule.cc:369
LuminosityBlockAuxiliary const & aux() const
void mergeProvenanceRetrievers(EventPrincipal const &other)
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:590
LuminosityBlockIndex index() const
LuminosityBlockNumber_t luminosityBlock() const
RunNumber_t run() const
Definition: RunPrincipal.h:61
static void fillDescription(ParameterSetDescription &desc)
#define nullptr
unsigned int LuminosityBlockNumber_t
static EventNumber_t const invalidEvent
std::unique_ptr< RootPrimaryFileSequence > primaryFileSequence_
Definition: PoolSource.h:62
void setComment(std::string const &value)
std::shared_ptr< LuminosityBlockPrincipal > secondaryLumiPrincipal_
Definition: PoolSource.h:65
bool isAncestor(ProcessHistory const &a, ProcessHistory const &b)
LuminosityBlockNumber_t luminosityBlock() const
virtual bool randomAccess_() const
Definition: PoolSource.cc:313
static RunNumber_t const invalidRun
virtual ProcessingController::ReverseState reverseState_() const
Definition: PoolSource.cc:323
static SharedResourcesRegistry * instance()
static void reportReadBranches()
Definition: InputFile.cc:113
StreamID streamID() const
RunAuxiliary const & aux() const
Definition: RunPrincipal.h:57
virtual ~PoolSource()
Definition: PoolSource.cc:137
#define end
Definition: vmac.h:37
SharedResourcesAcquirer * resourceSharedWithDelayedReader_() const override
Definition: PoolSource.cc:271
virtual void readRun_(RunPrincipal &runPrincipal)
Definition: PoolSource.cc:170
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
virtual std::shared_ptr< LuminosityBlockAuxiliary > readLuminosityBlockAuxiliary_()
Definition: PoolSource.cc:165
ItemType state() const
Definition: InputSource.h:347
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:596
static void fillDescriptions(ConfigurationDescriptions &descriptions)
Definition: PoolSource.cc:293
unsigned int value() const
Definition: StreamID.h:46
virtual void rewind_()
Definition: PoolSource.cc:277
static LuminosityBlockNumber_t const invalidLumi
virtual bool readIt(EventID const &id, EventPrincipal &eventPrincipal, StreamContext &streamContext) override
Definition: PoolSource.cc:241
virtual void readEvent_(EventPrincipal &eventPrincipal)
Definition: PoolSource.cc:219
std::array< std::vector< BranchID >, NumBranchTypes > branchIDsToReplace_
Definition: PoolSource.h:67
virtual void endJob()
Definition: PoolSource.cc:140
void add(std::string const &label, ParameterSetDescription const &psetDescription)
virtual void closeFile_()
Definition: PoolSource.cc:155
RunIndex index() const
Definition: RunPrincipal.h:53
#define begin
Definition: vmac.h:30
virtual ItemType getNextItemType()
Definition: PoolSource.cc:250
virtual void skip(int offset)
Definition: PoolSource.cc:283
static void fillDescription(ParameterSetDescription &desc)
Definition: InputSource.cc:127
virtual void preForkReleaseResources()
Definition: PoolSource.cc:266
unsigned int RunNumber_t
ProcessConfiguration const & processConfiguration() const
Accessor for Process Configuration.
Definition: InputSource.h:209
std::unique_ptr< RootSecondaryFileSequence > secondaryFileSequence_
Definition: PoolSource.h:63
void recombine(Principal &other, std::vector< BranchID > const &bids)
Definition: Principal.cc:806
std::vector< std::unique_ptr< EventPrincipal > > secondaryEventPrincipals_
Definition: PoolSource.h:66
tuple size
Write out results.
virtual std::shared_ptr< RunAuxiliary > readRunAuxiliary_()
Definition: PoolSource.cc:160
virtual void readLuminosityBlock_(LuminosityBlockPrincipal &lumiPrincipal)
Definition: PoolSource.cc:194