00001
00002
00003 #include "PoolSource.h"
00004 #include "RootInputFileSequence.h"
00005 #include "DataFormats/Provenance/interface/ProductRegistry.h"
00006 #include "FWCore/Framework/interface/EventPrincipal.h"
00007 #include "FWCore/Framework/interface/FileBlock.h"
00008 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
00009 #include "FWCore/Framework/interface/MessageReceiverForSource.h"
00010 #include "FWCore/Framework/interface/RunPrincipal.h"
00011 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
00012 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
00013 #include "FWCore/Utilities/interface/EDMException.h"
00014 #include "FWCore/Utilities/interface/Exception.h"
00015
00016 #include <set>
00017
00018 namespace edm {
00019
00020 class LuminosityBlockID;
00021 class EventID;
00022
00023 namespace {
00024 void checkHistoryConsistency(Principal const& primary, Principal const& secondary) {
00025 ProcessHistory const& ph1 = primary.processHistory();
00026 ProcessHistory const& ph2 = secondary.processHistory();
00027 if (ph1 != ph2 && !isAncestor(ph2, ph1)) {
00028 throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency") <<
00029 "The secondary file is not an ancestor of the primary file\n";
00030 }
00031 }
00032 void checkConsistency(EventPrincipal const& primary, EventPrincipal const& secondary) {
00033 if (!isSameEvent(primary, secondary)) {
00034 throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency") <<
00035 primary.id() << " has inconsistent EventAuxiliary data in the primary and secondary file\n";
00036 }
00037 }
00038 void checkConsistency(LuminosityBlockAuxiliary const& primary, LuminosityBlockAuxiliary const& secondary) {
00039 if (primary.id() != secondary.id()) {
00040 throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency") <<
00041 primary.id() << " has inconsistent LuminosityBlockAuxiliary data in the primary and secondary file\n";
00042 }
00043 }
00044 void checkConsistency(RunAuxiliary const& primary, RunAuxiliary const& secondary) {
00045 if (primary.id() != secondary.id()) {
00046 throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency") <<
00047 primary.id() << " has inconsistent RunAuxiliary data in the primary and secondary file\n";
00048 }
00049 }
00050 }
00051
00052 PoolSource::PoolSource(ParameterSet const& pset, InputSourceDescription const& desc) :
00053 VectorInputSource(pset, desc),
00054 rootServiceChecker_(),
00055 primaryFileSequence_(new RootInputFileSequence(pset, *this, catalog(), principalCache(), primary())),
00056 secondaryFileSequence_(catalog(1).empty() ? 0 :
00057 new RootInputFileSequence(pset, *this, catalog(1), principalCache(), false)),
00058 secondaryRunPrincipal_(),
00059 secondaryLumiPrincipal_(),
00060 secondaryEventPrincipal_(secondaryFileSequence_ ? new EventPrincipal(secondaryFileSequence_->fileProductRegistry(), processConfiguration()) : 0),
00061 branchIDsToReplace_(),
00062 numberOfEventsBeforeBigSkip_(0)
00063 {
00064 if (secondaryFileSequence_) {
00065 boost::array<std::set<BranchID>, NumBranchTypes> idsToReplace;
00066 ProductRegistry::ProductList const& secondary = secondaryFileSequence_->fileProductRegistry()->productList();
00067 ProductRegistry::ProductList const& primary = primaryFileSequence_->fileProductRegistry()->productList();
00068 typedef ProductRegistry::ProductList::const_iterator const_iterator;
00069 typedef ProductRegistry::ProductList::iterator iterator;
00070
00071 ProductRegistry::ProductList& fullList = productRegistryUpdate().productListUpdator();
00072 for (const_iterator it = secondary.begin(), itEnd = secondary.end(); it != itEnd; ++it) {
00073 if (it->second.present()) {
00074 idsToReplace[it->second.branchType()].insert(it->second.branchID());
00075
00076 iterator itFound = fullList.find(it->first);
00077 if(itFound != fullList.end()) {
00078 itFound->second.dropped()=false;
00079 }
00080 }
00081 }
00082 for (const_iterator it = primary.begin(), itEnd = primary.end(); it != itEnd; ++it) {
00083 if (it->second.present()) idsToReplace[it->second.branchType()].erase(it->second.branchID());
00084 }
00085 if (idsToReplace[InEvent].empty() && idsToReplace[InLumi].empty() && idsToReplace[InRun].empty()) {
00086 secondaryFileSequence_.reset();
00087 }
00088 else {
00089 for (int i = InEvent; i < NumBranchTypes; ++i) {
00090 branchIDsToReplace_[i].reserve(idsToReplace[i].size());
00091 for (std::set<BranchID>::const_iterator it = idsToReplace[i].begin(), itEnd = idsToReplace[i].end();
00092 it != itEnd; ++it) {
00093 branchIDsToReplace_[i].push_back(*it);
00094 }
00095 }
00096 }
00097 }
00098 }
00099
00100 PoolSource::~PoolSource() {}
00101
00102 void
00103 PoolSource::endJob() {
00104 if (secondaryFileSequence_) secondaryFileSequence_->endJob();
00105 primaryFileSequence_->endJob();
00106 }
00107
00108 boost::shared_ptr<FileBlock>
00109 PoolSource::readFile_() {
00110 boost::shared_ptr<FileBlock> fb = primaryFileSequence_->readFile_(principalCache());
00111 if (secondaryFileSequence_) {
00112 fb->setNotFastClonable(FileBlock::HasSecondaryFileSequence);
00113 }
00114 return fb;
00115 }
00116
00117 void PoolSource::closeFile_() {
00118 primaryFileSequence_->closeFile_();
00119 }
00120
00121 boost::shared_ptr<RunAuxiliary>
00122 PoolSource::readRunAuxiliary_() {
00123 return primaryFileSequence_->readRunAuxiliary_();
00124 }
00125
00126 boost::shared_ptr<LuminosityBlockAuxiliary>
00127 PoolSource::readLuminosityBlockAuxiliary_() {
00128 return primaryFileSequence_->readLuminosityBlockAuxiliary_();
00129 }
00130
00131 boost::shared_ptr<RunPrincipal>
00132 PoolSource::readRun_(boost::shared_ptr<RunPrincipal> rpCache) {
00133 if (secondaryFileSequence_ && !branchIDsToReplace_[InRun].empty()) {
00134 boost::shared_ptr<RunPrincipal> primaryPrincipal = primaryFileSequence_->readRun_(rpCache);
00135 bool found = secondaryFileSequence_->skipToItem(primaryPrincipal->run(), 0U, 0U);
00136 if (found) {
00137 boost::shared_ptr<RunAuxiliary> secondaryAuxiliary = secondaryFileSequence_->readRunAuxiliary_();
00138 checkConsistency(primaryPrincipal->aux(), *secondaryAuxiliary);
00139 boost::shared_ptr<RunPrincipal> rp(new RunPrincipal(secondaryAuxiliary, secondaryFileSequence_->fileProductRegistry(), processConfiguration()));
00140 secondaryRunPrincipal_ = secondaryFileSequence_->readRun_(rp);
00141 checkHistoryConsistency(*primaryPrincipal, *secondaryRunPrincipal_);
00142 primaryPrincipal->recombine(*secondaryRunPrincipal_, branchIDsToReplace_[InRun]);
00143 } else {
00144 throw Exception(errors::MismatchedInputFiles, "PoolSource::readRun_")
00145 << " Run " << primaryPrincipal->run()
00146 << " is not found in the secondary input files\n";
00147 }
00148 return primaryPrincipal;
00149 }
00150 return primaryFileSequence_->readRun_(rpCache);
00151 }
00152
00153 boost::shared_ptr<LuminosityBlockPrincipal>
00154 PoolSource::readLuminosityBlock_(boost::shared_ptr<LuminosityBlockPrincipal> lbCache) {
00155 if (secondaryFileSequence_ && !branchIDsToReplace_[InLumi].empty()) {
00156 boost::shared_ptr<LuminosityBlockPrincipal> primaryPrincipal = primaryFileSequence_->readLuminosityBlock_(lbCache);
00157 bool found = secondaryFileSequence_->skipToItem(primaryPrincipal->run(), primaryPrincipal->luminosityBlock(), 0U);
00158 if (found) {
00159 boost::shared_ptr<LuminosityBlockAuxiliary> secondaryAuxiliary = secondaryFileSequence_->readLuminosityBlockAuxiliary_();
00160 checkConsistency(primaryPrincipal->aux(), *secondaryAuxiliary);
00161 boost::shared_ptr<LuminosityBlockPrincipal> lbp(new LuminosityBlockPrincipal(secondaryAuxiliary, secondaryFileSequence_->fileProductRegistry(), processConfiguration(), secondaryRunPrincipal_));
00162 secondaryLumiPrincipal_ = secondaryFileSequence_->readLuminosityBlock_(lbp);
00163 checkHistoryConsistency(*primaryPrincipal, *secondaryLumiPrincipal_);
00164 primaryPrincipal->recombine(*secondaryLumiPrincipal_, branchIDsToReplace_[InLumi]);
00165 } else {
00166 throw Exception(errors::MismatchedInputFiles, "PoolSource::readLuminosityBlock_")
00167 << " Run " << primaryPrincipal->run()
00168 << " LuminosityBlock " << primaryPrincipal->luminosityBlock()
00169 << " is not found in the secondary input files\n";
00170 }
00171 return primaryPrincipal;
00172 }
00173 return primaryFileSequence_->readLuminosityBlock_(lbCache);
00174 }
00175
00176 EventPrincipal*
00177 PoolSource::readEvent_() {
00178 EventSourceSentry(*this);
00179 EventPrincipal* primaryPrincipal = primaryFileSequence_->readEvent(*eventPrincipalCache(), luminosityBlockPrincipal());
00180 if (secondaryFileSequence_ && !branchIDsToReplace_[InEvent].empty()) {
00181 bool found = secondaryFileSequence_->skipToItem(primaryPrincipal->run(),
00182 primaryPrincipal->luminosityBlock(),
00183 primaryPrincipal->id().event());
00184 if (found) {
00185 EventPrincipal* secondaryPrincipal = secondaryFileSequence_->readEvent(*secondaryEventPrincipal_, secondaryLumiPrincipal_);
00186 checkConsistency(*primaryPrincipal, *secondaryPrincipal);
00187 checkHistoryConsistency(*primaryPrincipal, *secondaryPrincipal);
00188 primaryPrincipal->recombine(*secondaryPrincipal, branchIDsToReplace_[InEvent]);
00189 secondaryEventPrincipal_->clearPrincipal();
00190 } else {
00191 throw Exception(errors::MismatchedInputFiles, "PoolSource::readEvent_") <<
00192 primaryPrincipal->id() << " is not found in the secondary input files\n";
00193 }
00194 }
00195 if(receiver_) {
00196 --numberOfEventsBeforeBigSkip_;
00197 }
00198 return primaryPrincipal;
00199 }
00200
00201 EventPrincipal*
00202 PoolSource::readIt(EventID const& id) {
00203 bool found = primaryFileSequence_->skipToItem(id.run(), id.luminosityBlock(), id.event());
00204 if (!found) return 0;
00205 return readEvent_();
00206 }
00207
00208 InputSource::ItemType
00209 PoolSource::getNextItemType() {
00210 if(receiver_ &&
00211 0 == numberOfEventsBeforeBigSkip_) {
00212 receiver_->receive();
00213 unsigned long toSkip = receiver_->numberToSkip();
00214 if (0 != toSkip) {
00215 primaryFileSequence_->skipEvents(toSkip, principalCache());
00216 decreaseRemainingEventsBy(toSkip);
00217 }
00218 numberOfEventsBeforeBigSkip_ = receiver_->numberOfConsecutiveIndices();
00219 if (0 == numberOfEventsBeforeBigSkip_ or 0==remainingEvents()) {
00220 return IsStop;
00221 }
00222 }
00223 return primaryFileSequence_->getNextItemType();;
00224 }
00225
00226 void
00227 PoolSource::preForkReleaseResources() {
00228 primaryFileSequence_->closeFile_();
00229 }
00230
00231 void
00232 PoolSource::postForkReacquireResources(boost::shared_ptr<edm::multicore::MessageReceiverForSource> iReceiver) {
00233 receiver_ = iReceiver;
00234 receiver_->receive();
00235 primaryFileSequence_->reset(principalCache());
00236 rewind();
00237 decreaseRemainingEventsBy(receiver_->numberToSkip());
00238 }
00239
00240
00241 void
00242 PoolSource::rewind_() {
00243 primaryFileSequence_->rewind_();
00244 if (receiver_) {
00245 unsigned int numberToSkip = receiver_->numberToSkip();
00246 if(0 != numberToSkip) {
00247 primaryFileSequence_->skipEvents(numberToSkip, principalCache());
00248 }
00249 numberOfEventsBeforeBigSkip_ = receiver_->numberOfConsecutiveIndices();
00250 }
00251
00252 }
00253
00254
00255 void
00256 PoolSource::skip(int offset) {
00257 primaryFileSequence_->skipEvents(offset, principalCache());
00258 }
00259
00260 bool
00261 PoolSource::goToEvent_(EventID const& eventID) {
00262 return primaryFileSequence_->goToEvent(eventID, principalCache());
00263 }
00264
00265 void
00266 PoolSource::readMany_(int number, EventPrincipalVector& result) {
00267 assert (!secondaryFileSequence_);
00268 primaryFileSequence_->readMany(number, result);
00269 }
00270
00271 void
00272 PoolSource::readManyRandom_(int number, EventPrincipalVector& result, unsigned int& fileSeqNumber) {
00273 assert (!secondaryFileSequence_);
00274 primaryFileSequence_->readManyRandom(number, result, fileSeqNumber);
00275 }
00276
00277 void
00278 PoolSource::readManySequential_(int number, EventPrincipalVector& result, unsigned int& fileSeqNumber) {
00279 assert (!secondaryFileSequence_);
00280 primaryFileSequence_->readManySequential(number, result, fileSeqNumber);
00281 }
00282
00283 void
00284 PoolSource::readManySpecified_(std::vector<EventID> const& events, EventPrincipalVector& result) {
00285 assert (!secondaryFileSequence_);
00286 primaryFileSequence_->readManySpecified(events, result);
00287 }
00288
00289 void
00290 PoolSource::dropUnwantedBranches_(std::vector<std::string> const& wantedBranches) {
00291 assert (!secondaryFileSequence_);
00292 primaryFileSequence_->dropUnwantedBranches_(wantedBranches);
00293 }
00294
00295 void
00296 PoolSource::fillDescriptions(ConfigurationDescriptions & descriptions) {
00297
00298 ParameterSetDescription desc;
00299
00300 desc.setComment("Reads EDM/Root files.");
00301 VectorInputSource::fillDescription(desc);
00302 RootInputFileSequence::fillDescription(desc);
00303
00304 descriptions.add("source", desc);
00305 }
00306
00307 bool
00308 PoolSource::randomAccess_() const {
00309 return true;
00310 }
00311
00312 ProcessingController::ForwardState
00313 PoolSource::forwardState_() const {
00314 return primaryFileSequence_->forwardState();
00315 }
00316
00317 ProcessingController::ReverseState
00318 PoolSource::reverseState_() const {
00319 return primaryFileSequence_->reverseState();
00320 }
00321 }