CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
PoolSource.cc
Go to the documentation of this file.
1 /*----------------------------------------------------------------------
2 ----------------------------------------------------------------------*/
3 #include "PoolSource.h"
4 #include "InputFile.h"
5 #include "InputType.h"
17 
18 #include <set>
19 
20 namespace edm {
21 
22  class LuminosityBlockID;
23  class EventID;
24 
25  namespace {
26  void checkHistoryConsistency(Principal const& primary, Principal const& secondary) {
27  ProcessHistory const& ph1 = primary.processHistory();
28  ProcessHistory const& ph2 = secondary.processHistory();
29  if(ph1 != ph2 && !isAncestor(ph2, ph1)) {
30  throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency") <<
31  "The secondary file is not an ancestor of the primary file\n";
32  }
33  }
34  void checkConsistency(EventPrincipal const& primary, EventPrincipal const& secondary) {
35  if(!isSameEvent(primary, secondary)) {
36  throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency") <<
37  primary.id() << " has inconsistent EventAuxiliary data in the primary and secondary file\n";
38  }
39  }
40  void checkConsistency(LuminosityBlockAuxiliary const& primary, LuminosityBlockAuxiliary const& secondary) {
41  if(primary.id() != secondary.id()) {
42  throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency") <<
43  primary.id() << " has inconsistent LuminosityBlockAuxiliary data in the primary and secondary file\n";
44  }
45  }
46  void checkConsistency(RunAuxiliary const& primary, RunAuxiliary const& secondary) {
47  if(primary.id() != secondary.id()) {
48  throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency") <<
49  primary.id() << " has inconsistent RunAuxiliary data in the primary and secondary file\n";
50  }
51  }
52  }
53 
55  VectorInputSource(pset, desc),
56  rootServiceChecker_(),
57  primaryFileSequence_(new RootInputFileSequence(pset, *this, catalog(), principalCache(),
58  primary() ? InputType::Primary : InputType::SecondarySource)),
59  secondaryFileSequence_(catalog(1).empty() ? 0 :
60  new RootInputFileSequence(pset, *this, catalog(1), principalCache(), InputType::SecondaryFile)),
61  secondaryRunPrincipal_(),
62  secondaryLumiPrincipal_(),
63  secondaryEventPrincipal_(secondaryFileSequence_ ? new EventPrincipal(secondaryFileSequence_->fileProductRegistry(), processConfiguration()) : 0),
64  branchIDsToReplace_(),
65  numberOfEventsBeforeBigSkip_(0) {
67  assert(primary());
68  std::array<std::set<BranchID>, NumBranchTypes> idsToReplace;
69  ProductRegistry::ProductList const& secondary = secondaryFileSequence_->fileProductRegistry()->productList();
70  ProductRegistry::ProductList const& primary = primaryFileSequence_->fileProductRegistry()->productList();
71  typedef ProductRegistry::ProductList::const_iterator const_iterator;
72  typedef ProductRegistry::ProductList::iterator iterator;
73  //this is the registry used by the 'outside' world and only has the primary file information in it at present
75  for(const_iterator it = secondary.begin(), itEnd = secondary.end(); it != itEnd; ++it) {
76  if(it->second.present()) {
77  idsToReplace[it->second.branchType()].insert(it->second.branchID());
78  //now make sure this is marked as not dropped else the product will not be 'get'table from the Event
79  iterator itFound = fullList.find(it->first);
80  if(itFound != fullList.end()) {
81  itFound->second.dropped()=false;
82  }
83  }
84  }
85  for(const_iterator it = primary.begin(), itEnd = primary.end(); it != itEnd; ++it) {
86  if(it->second.present()) idsToReplace[it->second.branchType()].erase(it->second.branchID());
87  }
88  if(idsToReplace[InEvent].empty() && idsToReplace[InLumi].empty() && idsToReplace[InRun].empty()) {
89  secondaryFileSequence_.reset();
90  } else {
91  for(int i = InEvent; i < NumBranchTypes; ++i) {
92  branchIDsToReplace_[i].reserve(idsToReplace[i].size());
93  for(std::set<BranchID>::const_iterator it = idsToReplace[i].begin(), itEnd = idsToReplace[i].end();
94  it != itEnd; ++it) {
95  branchIDsToReplace_[i].push_back(*it);
96  }
97  }
98  }
99  }
100  }
101 
103 
104  void
107  primaryFileSequence_->endJob();
109  }
110 
111  boost::shared_ptr<FileBlock>
113  boost::shared_ptr<FileBlock> fb = primaryFileSequence_->readFile_(principalCache());
115  fb->setNotFastClonable(FileBlock::HasSecondaryFileSequence);
116  }
117  return fb;
118  }
119 
121  primaryFileSequence_->closeFile_();
122  }
123 
124  boost::shared_ptr<RunAuxiliary>
126  return primaryFileSequence_->readRunAuxiliary_();
127  }
128 
129  boost::shared_ptr<LuminosityBlockAuxiliary>
131  return primaryFileSequence_->readLuminosityBlockAuxiliary_();
132  }
133 
134  boost::shared_ptr<RunPrincipal>
135  PoolSource::readRun_(boost::shared_ptr<RunPrincipal> rpCache) {
137  boost::shared_ptr<RunPrincipal> primaryPrincipal = primaryFileSequence_->readRun_(rpCache);
138  bool found = secondaryFileSequence_->skipToItem(primaryPrincipal->run(), 0U, 0U);
139  if(found) {
140  boost::shared_ptr<RunAuxiliary> secondaryAuxiliary = secondaryFileSequence_->readRunAuxiliary_();
141  checkConsistency(primaryPrincipal->aux(), *secondaryAuxiliary);
142  boost::shared_ptr<RunPrincipal> rp(new RunPrincipal(secondaryAuxiliary, secondaryFileSequence_->fileProductRegistry(), processConfiguration()));
144  checkHistoryConsistency(*primaryPrincipal, *secondaryRunPrincipal_);
145  primaryPrincipal->recombine(*secondaryRunPrincipal_, branchIDsToReplace_[InRun]);
146  } else {
147  throw Exception(errors::MismatchedInputFiles, "PoolSource::readRun_")
148  << " Run " << primaryPrincipal->run()
149  << " is not found in the secondary input files\n";
150  }
151  return primaryPrincipal;
152  }
153  return primaryFileSequence_->readRun_(rpCache);
154  }
155 
156  boost::shared_ptr<LuminosityBlockPrincipal>
157  PoolSource::readLuminosityBlock_(boost::shared_ptr<LuminosityBlockPrincipal> lbCache) {
159  boost::shared_ptr<LuminosityBlockPrincipal> primaryPrincipal = primaryFileSequence_->readLuminosityBlock_(lbCache);
160  bool found = secondaryFileSequence_->skipToItem(primaryPrincipal->run(), primaryPrincipal->luminosityBlock(), 0U);
161  if(found) {
162  boost::shared_ptr<LuminosityBlockAuxiliary> secondaryAuxiliary = secondaryFileSequence_->readLuminosityBlockAuxiliary_();
163  checkConsistency(primaryPrincipal->aux(), *secondaryAuxiliary);
164  boost::shared_ptr<LuminosityBlockPrincipal> lbp(new LuminosityBlockPrincipal(secondaryAuxiliary, secondaryFileSequence_->fileProductRegistry(), processConfiguration(), secondaryRunPrincipal_));
165  secondaryLumiPrincipal_ = secondaryFileSequence_->readLuminosityBlock_(lbp);
166  checkHistoryConsistency(*primaryPrincipal, *secondaryLumiPrincipal_);
167  primaryPrincipal->recombine(*secondaryLumiPrincipal_, branchIDsToReplace_[InLumi]);
168  } else {
169  throw Exception(errors::MismatchedInputFiles, "PoolSource::readLuminosityBlock_")
170  << " Run " << primaryPrincipal->run()
171  << " LuminosityBlock " << primaryPrincipal->luminosityBlock()
172  << " is not found in the secondary input files\n";
173  }
174  return primaryPrincipal;
175  }
176  return primaryFileSequence_->readLuminosityBlock_(lbCache);
177  }
178 
181  EventSourceSentry(*this);
184  bool found = secondaryFileSequence_->skipToItem(primaryPrincipal->run(),
185  primaryPrincipal->luminosityBlock(),
186  primaryPrincipal->id().event());
187  if(found) {
189  checkConsistency(*primaryPrincipal, *secondaryPrincipal);
190  checkHistoryConsistency(*primaryPrincipal, *secondaryPrincipal);
191  primaryPrincipal->recombine(*secondaryPrincipal, branchIDsToReplace_[InEvent]);
192  primaryPrincipal->mergeMappers(*secondaryPrincipal);
193  secondaryEventPrincipal_->clearPrincipal();
194  } else {
195  throw Exception(errors::MismatchedInputFiles, "PoolSource::readEvent_") <<
196  primaryPrincipal->id() << " is not found in the secondary input files\n";
197  }
198  }
199  if(receiver_) {
201  }
202  return primaryPrincipal;
203  }
204 
207  bool found = primaryFileSequence_->skipToItem(id.run(), id.luminosityBlock(), id.event());
208  if(!found) return 0;
209  return readEvent_();
210  }
211 
214  if(receiver_ &&
216  receiver_->receive();
217  unsigned long toSkip = receiver_->numberToSkip();
218  if(0 != toSkip) {
219  primaryFileSequence_->skipEvents(toSkip, principalCache());
221  }
222  numberOfEventsBeforeBigSkip_ = receiver_->numberOfConsecutiveIndices();
224  return IsStop;
225  }
226  }
227  return primaryFileSequence_->getNextItemType();;
228  }
229 
230  void
232  primaryFileSequence_->closeFile_();
233  }
234 
235  void
236  PoolSource::postForkReacquireResources(boost::shared_ptr<edm::multicore::MessageReceiverForSource> iReceiver) {
237  receiver_ = iReceiver;
238  receiver_->receive();
240  rewind();
241  decreaseRemainingEventsBy(receiver_->numberToSkip());
242  }
243 
244  // Rewind to before the first event that was read.
245  void
247  primaryFileSequence_->rewind_();
248  if(receiver_) {
249  unsigned int numberToSkip = receiver_->numberToSkip();
250  if(0 != numberToSkip) {
251  primaryFileSequence_->skipEvents(numberToSkip, principalCache());
252  }
253  numberOfEventsBeforeBigSkip_ = receiver_->numberOfConsecutiveIndices();
254  }
255 
256  }
257 
258  // Advance "offset" events. Offset can be positive or negative (or zero).
259  void
261  primaryFileSequence_->skipEvents(offset, principalCache());
262  }
263 
264  bool
265  PoolSource::goToEvent_(EventID const& eventID) {
266  return primaryFileSequence_->goToEvent(eventID);
267  }
268 
271  assert(!secondaryFileSequence_);
272  return primaryFileSequence_->readOneRandom();
273  }
274 
277  assert(!secondaryFileSequence_);
278  return primaryFileSequence_->readOneRandomWithID(lumiID);
279  }
280 
283  assert(!secondaryFileSequence_);
284  return primaryFileSequence_->readOneSequential();
285  }
286 
289  assert(!secondaryFileSequence_);
290  return primaryFileSequence_->readOneSequentialWithID(lumiID);
291  }
292 
295  assert(!secondaryFileSequence_);
296  return primaryFileSequence_->readOneSpecified(id);
297  }
298 
299  void
300  PoolSource::dropUnwantedBranches_(std::vector<std::string> const& wantedBranches) {
301  assert(!secondaryFileSequence_);
302  primaryFileSequence_->dropUnwantedBranches_(wantedBranches);
303  }
304 
305  void
307 
309 
310  desc.setComment("Reads EDM/Root files.");
313 
314  descriptions.add("source", desc);
315  }
316 
317  bool
319  return true;
320  }
321 
324  return primaryFileSequence_->forwardState();
325  }
326 
329  return primaryFileSequence_->reverseState();
330  }
331 }
virtual boost::shared_ptr< RunAuxiliary > readRunAuxiliary_()
Definition: PoolSource.cc:125
PoolSource(ParameterSet const &pset, InputSourceDescription const &desc)
Definition: PoolSource.cc:54
EventNumber_t event() const
Definition: EventID.h:44
int i
Definition: DBlmapReader.cc:9
bool isSameEvent(EventAuxiliary const &a, EventAuxiliary const &b)
void decreaseRemainingEventsBy(int iSkipped)
Definition: InputSource.cc:482
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
RunNumber_t run() const
virtual ProcessingController::ForwardState forwardState_() const
Definition: PoolSource.cc:323
virtual boost::shared_ptr< LuminosityBlockPrincipal > readLuminosityBlock_(boost::shared_ptr< LuminosityBlockPrincipal > lbCache)
Definition: PoolSource.cc:157
virtual bool goToEvent_(EventID const &eventID)
Definition: PoolSource.cc:265
virtual boost::shared_ptr< RunPrincipal > readRun_(boost::shared_ptr< RunPrincipal > rpCache)
Definition: PoolSource.cc:135
std::map< BranchKey, BranchDescription > ProductList
EventID const & id() const
static void fillDescription(ParameterSetDescription &desc)
std::unique_ptr< RootInputFileSequence > secondaryFileSequence_
Definition: PoolSource.h:65
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:606
LuminosityBlockNumber_t luminosityBlock() const
boost::shared_ptr< LuminosityBlockPrincipal > const luminosityBlockPrincipal() const
Definition: InputSource.cc:281
static void fillDescription(ParameterSetDescription &desc)
virtual EventPrincipal * readOneRandomWithID(LuminosityBlockID const &lumiID)
Definition: PoolSource.cc:276
virtual void dropUnwantedBranches_(std::vector< std::string > const &wantedBranches)
Definition: PoolSource.cc:300
void setComment(std::string const &value)
bool isAncestor(ProcessHistory const &a, ProcessHistory const &b)
int remainingEvents() const
Definition: InputSource.h:170
virtual bool randomAccess_() const
Definition: PoolSource.cc:318
virtual ProcessingController::ReverseState reverseState_() const
Definition: PoolSource.cc:328
virtual EventPrincipal * readOneRandom()
Definition: PoolSource.cc:270
boost::shared_ptr< RunPrincipal > secondaryRunPrincipal_
Definition: PoolSource.h:66
virtual EventPrincipal * readOneSequential()
Definition: PoolSource.cc:282
unsigned int numberOfEventsBeforeBigSkip_
Definition: PoolSource.h:73
virtual EventPrincipal * readEvent_()
Definition: PoolSource.cc:180
virtual ~PoolSource()
Definition: PoolSource.cc:102
std::unique_ptr< EventPrincipal > secondaryEventPrincipal_
Definition: PoolSource.h:68
#define end
Definition: vmac.h:38
void mergeMappers(EventPrincipal const &other)
unsigned int offset(bool)
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
PrincipalCache const & principalCache() const
Definition: InputSource.h:312
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:612
static void fillDescriptions(ConfigurationDescriptions &descriptions)
Definition: PoolSource.cc:306
virtual void postForkReacquireResources(boost::shared_ptr< edm::multicore::MessageReceiverForSource >)
Definition: PoolSource.cc:236
virtual void rewind_()
Definition: PoolSource.cc:246
ProductRegistry & productRegistryUpdate() const
Definition: InputSource.h:295
std::array< std::vector< BranchID >, NumBranchTypes > branchIDsToReplace_
Definition: PoolSource.h:69
virtual void endJob()
Definition: PoolSource.cc:105
void add(std::string const &label, ParameterSetDescription const &psetDescription)
ProductList & productListUpdator()
bool primary() const
Accessor for primary input source flag.
Definition: InputSource.h:187
virtual EventPrincipal * readOneSequentialWithID(LuminosityBlockID const &lumiID)
Definition: PoolSource.cc:288
virtual void closeFile_()
Definition: PoolSource.cc:120
std::unique_ptr< RootInputFileSequence > primaryFileSequence_
Definition: PoolSource.h:64
virtual EventPrincipal * readOneSpecified(EventID const &id)
Definition: PoolSource.cc:294
boost::shared_ptr< edm::multicore::MessageReceiverForSource > receiver_
Definition: PoolSource.h:72
#define begin
Definition: vmac.h:31
virtual ItemType getNextItemType()
Definition: PoolSource.cc:213
virtual void skip(int offset)
Definition: PoolSource.cc:260
boost::shared_ptr< LuminosityBlockPrincipal > secondaryLumiPrincipal_
Definition: PoolSource.h:67
static void reportReadBranches()
Definition: InputFile.cc:100
virtual void preForkReleaseResources()
Definition: PoolSource.cc:231
virtual boost::shared_ptr< FileBlock > readFile_()
Definition: PoolSource.cc:112
void rewind()
Begin again at the first event.
Definition: InputSource.cc:407
virtual EventPrincipal * readIt(EventID const &id)
Definition: PoolSource.cc:206
ProcessConfiguration const & processConfiguration() const
Accessor for Process Configuration.
Definition: InputSource.h:184
EventPrincipal * eventPrincipalCache()
Definition: InputSource.cc:139
void recombine(Principal &other, std::vector< BranchID > const &bids)
Definition: Principal.cc:713
tuple size
Write out results.
virtual boost::shared_ptr< LuminosityBlockAuxiliary > readLuminosityBlockAuxiliary_()
Definition: PoolSource.cc:130