CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
PoolSource.cc
Go to the documentation of this file.
1 /*----------------------------------------------------------------------
2 ----------------------------------------------------------------------*/
3 #include "PoolSource.h"
15 
16 #include <set>
17 
18 namespace edm {
19 
20  class LuminosityBlockID;
21  class EventID;
22 
23  namespace {
24  void checkHistoryConsistency(Principal const& primary, Principal const& secondary) {
25  ProcessHistory const& ph1 = primary.processHistory();
26  ProcessHistory const& ph2 = secondary.processHistory();
27  if (ph1 != ph2 && !isAncestor(ph2, ph1)) {
28  throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency") <<
29  "The secondary file is not an ancestor of the primary file\n";
30  }
31  }
32  void checkConsistency(EventPrincipal const& primary, EventPrincipal const& secondary) {
33  if (!isSameEvent(primary, secondary)) {
34  throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency") <<
35  primary.id() << " has inconsistent EventAuxiliary data in the primary and secondary file\n";
36  }
37  }
38  void checkConsistency(LuminosityBlockAuxiliary const& primary, LuminosityBlockAuxiliary const& secondary) {
39  if (primary.id() != secondary.id()) {
40  throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency") <<
41  primary.id() << " has inconsistent LuminosityBlockAuxiliary data in the primary and secondary file\n";
42  }
43  }
44  void checkConsistency(RunAuxiliary const& primary, RunAuxiliary const& secondary) {
45  if (primary.id() != secondary.id()) {
46  throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency") <<
47  primary.id() << " has inconsistent RunAuxiliary data in the primary and secondary file\n";
48  }
49  }
50  }
51 
53  VectorInputSource(pset, desc),
54  rootServiceChecker_(),
55  primaryFileSequence_(new RootInputFileSequence(pset, *this, catalog(), principalCache(), primary())),
56  secondaryFileSequence_(catalog(1).empty() ? 0 :
57  new RootInputFileSequence(pset, *this, catalog(1), principalCache(), false)),
58  secondaryRunPrincipal_(),
59  secondaryLumiPrincipal_(),
60  secondaryEventPrincipal_(secondaryFileSequence_ ? new EventPrincipal(secondaryFileSequence_->fileProductRegistry(), processConfiguration()) : 0),
61  branchIDsToReplace_(),
62  numberOfEventsBeforeBigSkip_(0)
63  {
65  boost::array<std::set<BranchID>, NumBranchTypes> idsToReplace;
66  ProductRegistry::ProductList const& secondary = secondaryFileSequence_->fileProductRegistry()->productList();
67  ProductRegistry::ProductList const& primary = primaryFileSequence_->fileProductRegistry()->productList();
68  typedef ProductRegistry::ProductList::const_iterator const_iterator;
69  typedef ProductRegistry::ProductList::iterator iterator;
70  //this is the registry used by the 'outside' world and only has the primary file information in it at present
72  for (const_iterator it = secondary.begin(), itEnd = secondary.end(); it != itEnd; ++it) {
73  if (it->second.present()) {
74  idsToReplace[it->second.branchType()].insert(it->second.branchID());
75  //now make sure this is marked as not dropped else the product will not be 'get'table from the Event
76  iterator itFound = fullList.find(it->first);
77  if(itFound != fullList.end()) {
78  itFound->second.dropped()=false;
79  }
80  }
81  }
82  for (const_iterator it = primary.begin(), itEnd = primary.end(); it != itEnd; ++it) {
83  if (it->second.present()) idsToReplace[it->second.branchType()].erase(it->second.branchID());
84  }
85  if (idsToReplace[InEvent].empty() && idsToReplace[InLumi].empty() && idsToReplace[InRun].empty()) {
86  secondaryFileSequence_.reset();
87  }
88  else {
89  for (int i = InEvent; i < NumBranchTypes; ++i) {
90  branchIDsToReplace_[i].reserve(idsToReplace[i].size());
91  for (std::set<BranchID>::const_iterator it = idsToReplace[i].begin(), itEnd = idsToReplace[i].end();
92  it != itEnd; ++it) {
93  branchIDsToReplace_[i].push_back(*it);
94  }
95  }
96  }
97  }
98  }
99 
101 
102  void
105  primaryFileSequence_->endJob();
106  }
107 
108  boost::shared_ptr<FileBlock>
110  boost::shared_ptr<FileBlock> fb = primaryFileSequence_->readFile_(principalCache());
112  fb->setNotFastClonable(FileBlock::HasSecondaryFileSequence);
113  }
114  return fb;
115  }
116 
118  primaryFileSequence_->closeFile_();
119  }
120 
121  boost::shared_ptr<RunAuxiliary>
123  return primaryFileSequence_->readRunAuxiliary_();
124  }
125 
126  boost::shared_ptr<LuminosityBlockAuxiliary>
128  return primaryFileSequence_->readLuminosityBlockAuxiliary_();
129  }
130 
131  boost::shared_ptr<RunPrincipal>
132  PoolSource::readRun_(boost::shared_ptr<RunPrincipal> rpCache) {
134  boost::shared_ptr<RunPrincipal> primaryPrincipal = primaryFileSequence_->readRun_(rpCache);
135  bool found = secondaryFileSequence_->skipToItem(primaryPrincipal->run(), 0U, 0U);
136  if (found) {
137  boost::shared_ptr<RunAuxiliary> secondaryAuxiliary = secondaryFileSequence_->readRunAuxiliary_();
138  checkConsistency(primaryPrincipal->aux(), *secondaryAuxiliary);
139  boost::shared_ptr<RunPrincipal> rp(new RunPrincipal(secondaryAuxiliary, secondaryFileSequence_->fileProductRegistry(), processConfiguration()));
141  checkHistoryConsistency(*primaryPrincipal, *secondaryRunPrincipal_);
142  primaryPrincipal->recombine(*secondaryRunPrincipal_, branchIDsToReplace_[InRun]);
143  } else {
144  throw Exception(errors::MismatchedInputFiles, "PoolSource::readRun_")
145  << " Run " << primaryPrincipal->run()
146  << " is not found in the secondary input files\n";
147  }
148  return primaryPrincipal;
149  }
150  return primaryFileSequence_->readRun_(rpCache);
151  }
152 
153  boost::shared_ptr<LuminosityBlockPrincipal>
154  PoolSource::readLuminosityBlock_(boost::shared_ptr<LuminosityBlockPrincipal> lbCache) {
156  boost::shared_ptr<LuminosityBlockPrincipal> primaryPrincipal = primaryFileSequence_->readLuminosityBlock_(lbCache);
157  bool found = secondaryFileSequence_->skipToItem(primaryPrincipal->run(), primaryPrincipal->luminosityBlock(), 0U);
158  if (found) {
159  boost::shared_ptr<LuminosityBlockAuxiliary> secondaryAuxiliary = secondaryFileSequence_->readLuminosityBlockAuxiliary_();
160  checkConsistency(primaryPrincipal->aux(), *secondaryAuxiliary);
161  boost::shared_ptr<LuminosityBlockPrincipal> lbp(new LuminosityBlockPrincipal(secondaryAuxiliary, secondaryFileSequence_->fileProductRegistry(), processConfiguration(), secondaryRunPrincipal_));
162  secondaryLumiPrincipal_ = secondaryFileSequence_->readLuminosityBlock_(lbp);
163  checkHistoryConsistency(*primaryPrincipal, *secondaryLumiPrincipal_);
164  primaryPrincipal->recombine(*secondaryLumiPrincipal_, branchIDsToReplace_[InLumi]);
165  } else {
166  throw Exception(errors::MismatchedInputFiles, "PoolSource::readLuminosityBlock_")
167  << " Run " << primaryPrincipal->run()
168  << " LuminosityBlock " << primaryPrincipal->luminosityBlock()
169  << " is not found in the secondary input files\n";
170  }
171  return primaryPrincipal;
172  }
173  return primaryFileSequence_->readLuminosityBlock_(lbCache);
174  }
175 
178  EventSourceSentry(*this);
181  bool found = secondaryFileSequence_->skipToItem(primaryPrincipal->run(),
182  primaryPrincipal->luminosityBlock(),
183  primaryPrincipal->id().event());
184  if (found) {
186  checkConsistency(*primaryPrincipal, *secondaryPrincipal);
187  checkHistoryConsistency(*primaryPrincipal, *secondaryPrincipal);
188  primaryPrincipal->recombine(*secondaryPrincipal, branchIDsToReplace_[InEvent]);
189  secondaryEventPrincipal_->clearPrincipal();
190  } else {
191  throw Exception(errors::MismatchedInputFiles, "PoolSource::readEvent_") <<
192  primaryPrincipal->id() << " is not found in the secondary input files\n";
193  }
194  }
195  if(receiver_) {
197  }
198  return primaryPrincipal;
199  }
200 
203  bool found = primaryFileSequence_->skipToItem(id.run(), id.luminosityBlock(), id.event());
204  if (!found) return 0;
205  return readEvent_();
206  }
207 
210  if(receiver_ &&
212  receiver_->receive();
213  unsigned long toSkip = receiver_->numberToSkip();
214  if (0 != toSkip) {
215  primaryFileSequence_->skipEvents(toSkip, principalCache());
217  }
218  numberOfEventsBeforeBigSkip_ = receiver_->numberOfConsecutiveIndices();
220  return IsStop;
221  }
222  }
223  return primaryFileSequence_->getNextItemType();;
224  }
225 
226  void
228  primaryFileSequence_->closeFile_();
229  }
230 
231  void
232  PoolSource::postForkReacquireResources(boost::shared_ptr<edm::multicore::MessageReceiverForSource> iReceiver) {
233  receiver_ = iReceiver;
234  receiver_->receive();
236  rewind();
237  decreaseRemainingEventsBy(receiver_->numberToSkip());
238  }
239 
240  // Rewind to before the first event that was read.
241  void
243  primaryFileSequence_->rewind_();
244  if (receiver_) {
245  unsigned int numberToSkip = receiver_->numberToSkip();
246  if(0 != numberToSkip) {
247  primaryFileSequence_->skipEvents(numberToSkip, principalCache());
248  }
249  numberOfEventsBeforeBigSkip_ = receiver_->numberOfConsecutiveIndices();
250  }
251 
252  }
253 
254  // Advance "offset" events. Offset can be positive or negative (or zero).
255  void
257  primaryFileSequence_->skipEvents(offset, principalCache());
258  }
259 
260  bool
261  PoolSource::goToEvent_(EventID const& eventID) {
262  return primaryFileSequence_->goToEvent(eventID, principalCache());
263  }
264 
265  void
267  assert (!secondaryFileSequence_);
268  primaryFileSequence_->readMany(number, result);
269  }
270 
271  void
272  PoolSource::readManyRandom_(int number, EventPrincipalVector& result, unsigned int& fileSeqNumber) {
273  assert (!secondaryFileSequence_);
274  primaryFileSequence_->readManyRandom(number, result, fileSeqNumber);
275  }
276 
277  void
278  PoolSource::readManySequential_(int number, EventPrincipalVector& result, unsigned int& fileSeqNumber) {
279  assert (!secondaryFileSequence_);
280  primaryFileSequence_->readManySequential(number, result, fileSeqNumber);
281  }
282 
283  void
285  assert (!secondaryFileSequence_);
286  primaryFileSequence_->readManySpecified(events, result);
287  }
288 
289  void
290  PoolSource::dropUnwantedBranches_(std::vector<std::string> const& wantedBranches) {
291  assert (!secondaryFileSequence_);
292  primaryFileSequence_->dropUnwantedBranches_(wantedBranches);
293  }
294 
295  void
297 
299 
300  desc.setComment("Reads EDM/Root files.");
303 
304  descriptions.add("source", desc);
305  }
306 
307  bool
309  return true;
310  }
311 
314  return primaryFileSequence_->forwardState();
315  }
316 
319  return primaryFileSequence_->reverseState();
320  }
321 }
virtual boost::shared_ptr< RunAuxiliary > readRunAuxiliary_()
Definition: PoolSource.cc:122
PoolSource(ParameterSet const &pset, InputSourceDescription const &desc)
Definition: PoolSource.cc:52
EventNumber_t event() const
Definition: EventID.h:44
int i
Definition: DBlmapReader.cc:9
bool isSameEvent(EventAuxiliary const &a, EventAuxiliary const &b)
EventPrincipal *const eventPrincipalCache()
Definition: InputSource.cc:156
virtual void readMany_(int number, EventPrincipalVector &result)
Definition: PoolSource.cc:266
void decreaseRemainingEventsBy(int iSkipped)
Definition: InputSource.cc:472
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
RunNumber_t run() const
virtual ProcessingController::ForwardState forwardState_() const
Definition: PoolSource.cc:313
virtual boost::shared_ptr< LuminosityBlockPrincipal > readLuminosityBlock_(boost::shared_ptr< LuminosityBlockPrincipal > lbCache)
Definition: PoolSource.cc:154
virtual void readManySpecified_(std::vector< EventID > const &events, EventPrincipalVector &result)
Definition: PoolSource.cc:284
virtual bool goToEvent_(EventID const &eventID)
Definition: PoolSource.cc:261
virtual boost::shared_ptr< RunPrincipal > readRun_(boost::shared_ptr< RunPrincipal > rpCache)
Definition: PoolSource.cc:132
std::map< BranchKey, BranchDescription > ProductList
EventID const & id() const
static void fillDescription(ParameterSetDescription &desc)
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:582
LuminosityBlockNumber_t luminosityBlock() const
virtual void readManyRandom_(int number, EventPrincipalVector &result, unsigned int &fileSeqNumber)
Definition: PoolSource.cc:272
boost::scoped_ptr< EventPrincipal > secondaryEventPrincipal_
Definition: PoolSource.h:67
boost::shared_ptr< LuminosityBlockPrincipal > const luminosityBlockPrincipal() const
Definition: InputSource.cc:280
static void fillDescription(ParameterSetDescription &desc)
virtual void dropUnwantedBranches_(std::vector< std::string > const &wantedBranches)
Definition: PoolSource.cc:290
void setComment(std::string const &value)
bool isAncestor(ProcessHistory const &a, ProcessHistory const &b)
int remainingEvents() const
Definition: InputSource.h:176
virtual bool randomAccess_() const
Definition: PoolSource.cc:308
virtual ProcessingController::ReverseState reverseState_() const
Definition: PoolSource.cc:318
boost::shared_ptr< RunPrincipal > secondaryRunPrincipal_
Definition: PoolSource.h:65
void rewind()
Begin again at the first event.
Definition: InputSource.h:138
tuple result
Definition: query.py:137
std::vector< EventPrincipalVectorElement > EventPrincipalVector
unsigned int numberOfEventsBeforeBigSkip_
Definition: PoolSource.h:72
virtual EventPrincipal * readEvent_()
Definition: PoolSource.cc:177
virtual ~PoolSource()
Definition: PoolSource.cc:100
#define end
Definition: vmac.h:38
unsigned int offset(bool)
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
PrincipalCache const & principalCache() const
Definition: InputSource.h:316
virtual void readManySequential_(int number, EventPrincipalVector &result, unsigned int &fileSeqNumber)
Definition: PoolSource.cc:278
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:588
static void fillDescriptions(ConfigurationDescriptions &descriptions)
Definition: PoolSource.cc:296
virtual void postForkReacquireResources(boost::shared_ptr< edm::multicore::MessageReceiverForSource >)
Definition: PoolSource.cc:232
virtual void rewind_()
Definition: PoolSource.cc:242
ProductRegistry & productRegistryUpdate() const
Definition: InputSource.h:299
virtual void endJob()
Definition: PoolSource.cc:103
void add(std::string const &label, ParameterSetDescription const &psetDescription)
boost::array< std::vector< BranchID >, NumBranchTypes > branchIDsToReplace_
Definition: PoolSource.h:68
ProductList & productListUpdator()
boost::scoped_ptr< RootInputFileSequence > primaryFileSequence_
Definition: PoolSource.h:63
virtual void closeFile_()
Definition: PoolSource.cc:117
boost::shared_ptr< edm::multicore::MessageReceiverForSource > receiver_
Definition: PoolSource.h:71
#define begin
Definition: vmac.h:31
tuple events
Definition: patZpeak.py:19
virtual ItemType getNextItemType()
Definition: PoolSource.cc:209
virtual void skip(int offset)
Definition: PoolSource.cc:256
boost::shared_ptr< LuminosityBlockPrincipal > secondaryLumiPrincipal_
Definition: PoolSource.h:66
boost::scoped_ptr< RootInputFileSequence > secondaryFileSequence_
Definition: PoolSource.h:64
virtual void preForkReleaseResources()
Definition: PoolSource.cc:227
virtual boost::shared_ptr< FileBlock > readFile_()
Definition: PoolSource.cc:109
virtual EventPrincipal * readIt(EventID const &id)
Definition: PoolSource.cc:202
ProcessConfiguration const & processConfiguration() const
Accessor for Process Configuration.
Definition: InputSource.h:190
void recombine(Principal &other, std::vector< BranchID > const &bids)
Definition: Principal.cc:576
tuple size
Write out results.
virtual boost::shared_ptr< LuminosityBlockAuxiliary > readLuminosityBlockAuxiliary_()
Definition: PoolSource.cc:127