00001
00002
00003 #include "PoolSource.h"
00004 #include "InputType.h"
00005 #include "RootInputFileSequence.h"
00006 #include "DataFormats/Provenance/interface/ProductRegistry.h"
00007 #include "FWCore/Framework/interface/EventPrincipal.h"
00008 #include "FWCore/Framework/interface/FileBlock.h"
00009 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
00010 #include "FWCore/Framework/interface/MessageReceiverForSource.h"
00011 #include "FWCore/Framework/interface/RunPrincipal.h"
00012 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
00013 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
00014 #include "FWCore/Utilities/interface/EDMException.h"
00015 #include "FWCore/Utilities/interface/Exception.h"
00016
00017 #include <set>
00018
00019 namespace edm {
00020
00021 class LuminosityBlockID;
00022 class EventID;
00023
00024 namespace {
00025 void checkHistoryConsistency(Principal const& primary, Principal const& secondary) {
00026 ProcessHistory const& ph1 = primary.processHistory();
00027 ProcessHistory const& ph2 = secondary.processHistory();
00028 if(ph1 != ph2 && !isAncestor(ph2, ph1)) {
00029 throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency") <<
00030 "The secondary file is not an ancestor of the primary file\n";
00031 }
00032 }
00033 void checkConsistency(EventPrincipal const& primary, EventPrincipal const& secondary) {
00034 if(!isSameEvent(primary, secondary)) {
00035 throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency") <<
00036 primary.id() << " has inconsistent EventAuxiliary data in the primary and secondary file\n";
00037 }
00038 }
00039 void checkConsistency(LuminosityBlockAuxiliary const& primary, LuminosityBlockAuxiliary const& secondary) {
00040 if(primary.id() != secondary.id()) {
00041 throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency") <<
00042 primary.id() << " has inconsistent LuminosityBlockAuxiliary data in the primary and secondary file\n";
00043 }
00044 }
00045 void checkConsistency(RunAuxiliary const& primary, RunAuxiliary const& secondary) {
00046 if(primary.id() != secondary.id()) {
00047 throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency") <<
00048 primary.id() << " has inconsistent RunAuxiliary data in the primary and secondary file\n";
00049 }
00050 }
00051 }
00052
00053 PoolSource::PoolSource(ParameterSet const& pset, InputSourceDescription const& desc) :
00054 VectorInputSource(pset, desc),
00055 rootServiceChecker_(),
00056 primaryFileSequence_(new RootInputFileSequence(pset, *this, catalog(), principalCache(),
00057 primary() ? InputType::Primary : InputType::SecondarySource)),
00058 secondaryFileSequence_(catalog(1).empty() ? 0 :
00059 new RootInputFileSequence(pset, *this, catalog(1), principalCache(), InputType::SecondaryFile)),
00060 secondaryRunPrincipal_(),
00061 secondaryLumiPrincipal_(),
00062 secondaryEventPrincipal_(secondaryFileSequence_ ? new EventPrincipal(secondaryFileSequence_->fileProductRegistry(), processConfiguration()) : 0),
00063 branchIDsToReplace_(),
00064 numberOfEventsBeforeBigSkip_(0) {
00065 if(secondaryFileSequence_) {
00066 assert(primary());
00067 boost::array<std::set<BranchID>, NumBranchTypes> idsToReplace;
00068 ProductRegistry::ProductList const& secondary = secondaryFileSequence_->fileProductRegistry()->productList();
00069 ProductRegistry::ProductList const& primary = primaryFileSequence_->fileProductRegistry()->productList();
00070 typedef ProductRegistry::ProductList::const_iterator const_iterator;
00071 typedef ProductRegistry::ProductList::iterator iterator;
00072
00073 ProductRegistry::ProductList& fullList = productRegistryUpdate().productListUpdator();
00074 for(const_iterator it = secondary.begin(), itEnd = secondary.end(); it != itEnd; ++it) {
00075 if(it->second.present()) {
00076 idsToReplace[it->second.branchType()].insert(it->second.branchID());
00077
00078 iterator itFound = fullList.find(it->first);
00079 if(itFound != fullList.end()) {
00080 itFound->second.dropped()=false;
00081 }
00082 }
00083 }
00084 for(const_iterator it = primary.begin(), itEnd = primary.end(); it != itEnd; ++it) {
00085 if(it->second.present()) idsToReplace[it->second.branchType()].erase(it->second.branchID());
00086 }
00087 if(idsToReplace[InEvent].empty() && idsToReplace[InLumi].empty() && idsToReplace[InRun].empty()) {
00088 secondaryFileSequence_.reset();
00089 } else {
00090 for(int i = InEvent; i < NumBranchTypes; ++i) {
00091 branchIDsToReplace_[i].reserve(idsToReplace[i].size());
00092 for(std::set<BranchID>::const_iterator it = idsToReplace[i].begin(), itEnd = idsToReplace[i].end();
00093 it != itEnd; ++it) {
00094 branchIDsToReplace_[i].push_back(*it);
00095 }
00096 }
00097 }
00098 }
00099 }
00100
00101 PoolSource::~PoolSource() {}
00102
00103 void
00104 PoolSource::endJob() {
00105 if(secondaryFileSequence_) secondaryFileSequence_->endJob();
00106 primaryFileSequence_->endJob();
00107 }
00108
00109 boost::shared_ptr<FileBlock>
00110 PoolSource::readFile_() {
00111 boost::shared_ptr<FileBlock> fb = primaryFileSequence_->readFile_(principalCache());
00112 if(secondaryFileSequence_) {
00113 fb->setNotFastClonable(FileBlock::HasSecondaryFileSequence);
00114 }
00115 return fb;
00116 }
00117
00118 void PoolSource::closeFile_() {
00119 primaryFileSequence_->closeFile_();
00120 }
00121
00122 boost::shared_ptr<RunAuxiliary>
00123 PoolSource::readRunAuxiliary_() {
00124 return primaryFileSequence_->readRunAuxiliary_();
00125 }
00126
00127 boost::shared_ptr<LuminosityBlockAuxiliary>
00128 PoolSource::readLuminosityBlockAuxiliary_() {
00129 return primaryFileSequence_->readLuminosityBlockAuxiliary_();
00130 }
00131
00132 boost::shared_ptr<RunPrincipal>
00133 PoolSource::readRun_(boost::shared_ptr<RunPrincipal> rpCache) {
00134 if(secondaryFileSequence_ && !branchIDsToReplace_[InRun].empty()) {
00135 boost::shared_ptr<RunPrincipal> primaryPrincipal = primaryFileSequence_->readRun_(rpCache);
00136 bool found = secondaryFileSequence_->skipToItem(primaryPrincipal->run(), 0U, 0U);
00137 if(found) {
00138 boost::shared_ptr<RunAuxiliary> secondaryAuxiliary = secondaryFileSequence_->readRunAuxiliary_();
00139 checkConsistency(primaryPrincipal->aux(), *secondaryAuxiliary);
00140 boost::shared_ptr<RunPrincipal> rp(new RunPrincipal(secondaryAuxiliary, secondaryFileSequence_->fileProductRegistry(), processConfiguration()));
00141 secondaryRunPrincipal_ = secondaryFileSequence_->readRun_(rp);
00142 checkHistoryConsistency(*primaryPrincipal, *secondaryRunPrincipal_);
00143 primaryPrincipal->recombine(*secondaryRunPrincipal_, branchIDsToReplace_[InRun]);
00144 } else {
00145 throw Exception(errors::MismatchedInputFiles, "PoolSource::readRun_")
00146 << " Run " << primaryPrincipal->run()
00147 << " is not found in the secondary input files\n";
00148 }
00149 return primaryPrincipal;
00150 }
00151 return primaryFileSequence_->readRun_(rpCache);
00152 }
00153
00154 boost::shared_ptr<LuminosityBlockPrincipal>
00155 PoolSource::readLuminosityBlock_(boost::shared_ptr<LuminosityBlockPrincipal> lbCache) {
00156 if(secondaryFileSequence_ && !branchIDsToReplace_[InLumi].empty()) {
00157 boost::shared_ptr<LuminosityBlockPrincipal> primaryPrincipal = primaryFileSequence_->readLuminosityBlock_(lbCache);
00158 bool found = secondaryFileSequence_->skipToItem(primaryPrincipal->run(), primaryPrincipal->luminosityBlock(), 0U);
00159 if(found) {
00160 boost::shared_ptr<LuminosityBlockAuxiliary> secondaryAuxiliary = secondaryFileSequence_->readLuminosityBlockAuxiliary_();
00161 checkConsistency(primaryPrincipal->aux(), *secondaryAuxiliary);
00162 boost::shared_ptr<LuminosityBlockPrincipal> lbp(new LuminosityBlockPrincipal(secondaryAuxiliary, secondaryFileSequence_->fileProductRegistry(), processConfiguration(), secondaryRunPrincipal_));
00163 secondaryLumiPrincipal_ = secondaryFileSequence_->readLuminosityBlock_(lbp);
00164 checkHistoryConsistency(*primaryPrincipal, *secondaryLumiPrincipal_);
00165 primaryPrincipal->recombine(*secondaryLumiPrincipal_, branchIDsToReplace_[InLumi]);
00166 } else {
00167 throw Exception(errors::MismatchedInputFiles, "PoolSource::readLuminosityBlock_")
00168 << " Run " << primaryPrincipal->run()
00169 << " LuminosityBlock " << primaryPrincipal->luminosityBlock()
00170 << " is not found in the secondary input files\n";
00171 }
00172 return primaryPrincipal;
00173 }
00174 return primaryFileSequence_->readLuminosityBlock_(lbCache);
00175 }
00176
00177 EventPrincipal*
00178 PoolSource::readEvent_() {
00179 EventSourceSentry(*this);
00180 EventPrincipal* primaryPrincipal = primaryFileSequence_->readEvent(*eventPrincipalCache(), luminosityBlockPrincipal());
00181 if(secondaryFileSequence_ && !branchIDsToReplace_[InEvent].empty()) {
00182 bool found = secondaryFileSequence_->skipToItem(primaryPrincipal->run(),
00183 primaryPrincipal->luminosityBlock(),
00184 primaryPrincipal->id().event());
00185 if(found) {
00186 EventPrincipal* secondaryPrincipal = secondaryFileSequence_->readEvent(*secondaryEventPrincipal_, secondaryLumiPrincipal_);
00187 checkConsistency(*primaryPrincipal, *secondaryPrincipal);
00188 checkHistoryConsistency(*primaryPrincipal, *secondaryPrincipal);
00189 primaryPrincipal->recombine(*secondaryPrincipal, branchIDsToReplace_[InEvent]);
00190 secondaryEventPrincipal_->clearPrincipal();
00191 } else {
00192 throw Exception(errors::MismatchedInputFiles, "PoolSource::readEvent_") <<
00193 primaryPrincipal->id() << " is not found in the secondary input files\n";
00194 }
00195 }
00196 if(receiver_) {
00197 --numberOfEventsBeforeBigSkip_;
00198 }
00199 return primaryPrincipal;
00200 }
00201
00202 EventPrincipal*
00203 PoolSource::readIt(EventID const& id) {
00204 bool found = primaryFileSequence_->skipToItem(id.run(), id.luminosityBlock(), id.event());
00205 if(!found) return 0;
00206 return readEvent_();
00207 }
00208
00209 InputSource::ItemType
00210 PoolSource::getNextItemType() {
00211 if(receiver_ &&
00212 0 == numberOfEventsBeforeBigSkip_) {
00213 receiver_->receive();
00214 unsigned long toSkip = receiver_->numberToSkip();
00215 if(0 != toSkip) {
00216 primaryFileSequence_->skipEvents(toSkip, principalCache());
00217 decreaseRemainingEventsBy(toSkip);
00218 }
00219 numberOfEventsBeforeBigSkip_ = receiver_->numberOfConsecutiveIndices();
00220 if(0 == numberOfEventsBeforeBigSkip_ or 0==remainingEvents()) {
00221 return IsStop;
00222 }
00223 }
00224 return primaryFileSequence_->getNextItemType();;
00225 }
00226
00227 void
00228 PoolSource::preForkReleaseResources() {
00229 primaryFileSequence_->closeFile_();
00230 }
00231
00232 void
00233 PoolSource::postForkReacquireResources(boost::shared_ptr<edm::multicore::MessageReceiverForSource> iReceiver) {
00234 receiver_ = iReceiver;
00235 receiver_->receive();
00236 primaryFileSequence_->reset(principalCache());
00237 rewind();
00238 decreaseRemainingEventsBy(receiver_->numberToSkip());
00239 }
00240
00241
00242 void
00243 PoolSource::rewind_() {
00244 primaryFileSequence_->rewind_();
00245 if(receiver_) {
00246 unsigned int numberToSkip = receiver_->numberToSkip();
00247 if(0 != numberToSkip) {
00248 primaryFileSequence_->skipEvents(numberToSkip, principalCache());
00249 }
00250 numberOfEventsBeforeBigSkip_ = receiver_->numberOfConsecutiveIndices();
00251 }
00252
00253 }
00254
00255
00256 void
00257 PoolSource::skip(int offset) {
00258 primaryFileSequence_->skipEvents(offset, principalCache());
00259 }
00260
00261 bool
00262 PoolSource::goToEvent_(EventID const& eventID) {
00263 return primaryFileSequence_->goToEvent(eventID);
00264 }
00265
00266 EventPrincipal*
00267 PoolSource::readOneRandom() {
00268 assert(!secondaryFileSequence_);
00269 return primaryFileSequence_->readOneRandom();
00270 }
00271
00272 EventPrincipal*
00273 PoolSource::readOneSequential() {
00274 assert(!secondaryFileSequence_);
00275 return primaryFileSequence_->readOneSequential();
00276 }
00277
00278 EventPrincipal*
00279 PoolSource::readOneSpecified(EventID const& id) {
00280 assert(!secondaryFileSequence_);
00281 return primaryFileSequence_->readOneSpecified(id);
00282 }
00283
00284 void
00285 PoolSource::dropUnwantedBranches_(std::vector<std::string> const& wantedBranches) {
00286 assert(!secondaryFileSequence_);
00287 primaryFileSequence_->dropUnwantedBranches_(wantedBranches);
00288 }
00289
00290 void
00291 PoolSource::fillDescriptions(ConfigurationDescriptions & descriptions) {
00292
00293 ParameterSetDescription desc;
00294
00295 desc.setComment("Reads EDM/Root files.");
00296 VectorInputSource::fillDescription(desc);
00297 RootInputFileSequence::fillDescription(desc);
00298
00299 descriptions.add("source", desc);
00300 }
00301
00302 bool
00303 PoolSource::randomAccess_() const {
00304 return true;
00305 }
00306
00307 ProcessingController::ForwardState
00308 PoolSource::forwardState_() const {
00309 return primaryFileSequence_->forwardState();
00310 }
00311
00312 ProcessingController::ReverseState
00313 PoolSource::reverseState_() const {
00314 return primaryFileSequence_->reverseState();
00315 }
00316 }