00001
00002
00003 #include "PoolSource.h"
00004 #include "InputFile.h"
00005 #include "InputType.h"
00006 #include "RootInputFileSequence.h"
00007 #include "DataFormats/Provenance/interface/ProductRegistry.h"
00008 #include "FWCore/Framework/interface/EventPrincipal.h"
00009 #include "FWCore/Framework/interface/FileBlock.h"
00010 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
00011 #include "FWCore/Framework/interface/MessageReceiverForSource.h"
00012 #include "FWCore/Framework/interface/RunPrincipal.h"
00013 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
00014 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
00015 #include "FWCore/Utilities/interface/EDMException.h"
00016 #include "FWCore/Utilities/interface/Exception.h"
00017
00018 #include <set>
00019
00020 namespace edm {
00021
00022 class LuminosityBlockID;
00023 class EventID;
00024
00025 namespace {
00026 void checkHistoryConsistency(Principal const& primary, Principal const& secondary) {
00027 ProcessHistory const& ph1 = primary.processHistory();
00028 ProcessHistory const& ph2 = secondary.processHistory();
00029 if(ph1 != ph2 && !isAncestor(ph2, ph1)) {
00030 throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency") <<
00031 "The secondary file is not an ancestor of the primary file\n";
00032 }
00033 }
00034 void checkConsistency(EventPrincipal const& primary, EventPrincipal const& secondary) {
00035 if(!isSameEvent(primary, secondary)) {
00036 throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency") <<
00037 primary.id() << " has inconsistent EventAuxiliary data in the primary and secondary file\n";
00038 }
00039 }
00040 void checkConsistency(LuminosityBlockAuxiliary const& primary, LuminosityBlockAuxiliary const& secondary) {
00041 if(primary.id() != secondary.id()) {
00042 throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency") <<
00043 primary.id() << " has inconsistent LuminosityBlockAuxiliary data in the primary and secondary file\n";
00044 }
00045 }
00046 void checkConsistency(RunAuxiliary const& primary, RunAuxiliary const& secondary) {
00047 if(primary.id() != secondary.id()) {
00048 throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency") <<
00049 primary.id() << " has inconsistent RunAuxiliary data in the primary and secondary file\n";
00050 }
00051 }
00052 }
00053
00054 PoolSource::PoolSource(ParameterSet const& pset, InputSourceDescription const& desc) :
00055 VectorInputSource(pset, desc),
00056 rootServiceChecker_(),
00057 primaryFileSequence_(new RootInputFileSequence(pset, *this, catalog(), principalCache(),
00058 primary() ? InputType::Primary : InputType::SecondarySource)),
00059 secondaryFileSequence_(catalog(1).empty() ? 0 :
00060 new RootInputFileSequence(pset, *this, catalog(1), principalCache(), InputType::SecondaryFile)),
00061 secondaryRunPrincipal_(),
00062 secondaryLumiPrincipal_(),
00063 secondaryEventPrincipal_(secondaryFileSequence_ ? new EventPrincipal(secondaryFileSequence_->fileProductRegistry(), processConfiguration()) : 0),
00064 branchIDsToReplace_(),
00065 numberOfEventsBeforeBigSkip_(0) {
00066 if(secondaryFileSequence_) {
00067 assert(primary());
00068 std::array<std::set<BranchID>, NumBranchTypes> idsToReplace;
00069 ProductRegistry::ProductList const& secondary = secondaryFileSequence_->fileProductRegistry()->productList();
00070 ProductRegistry::ProductList const& primary = primaryFileSequence_->fileProductRegistry()->productList();
00071 typedef ProductRegistry::ProductList::const_iterator const_iterator;
00072 typedef ProductRegistry::ProductList::iterator iterator;
00073
00074 ProductRegistry::ProductList& fullList = productRegistryUpdate().productListUpdator();
00075 for(const_iterator it = secondary.begin(), itEnd = secondary.end(); it != itEnd; ++it) {
00076 if(it->second.present()) {
00077 idsToReplace[it->second.branchType()].insert(it->second.branchID());
00078
00079 iterator itFound = fullList.find(it->first);
00080 if(itFound != fullList.end()) {
00081 itFound->second.dropped()=false;
00082 }
00083 }
00084 }
00085 for(const_iterator it = primary.begin(), itEnd = primary.end(); it != itEnd; ++it) {
00086 if(it->second.present()) idsToReplace[it->second.branchType()].erase(it->second.branchID());
00087 }
00088 if(idsToReplace[InEvent].empty() && idsToReplace[InLumi].empty() && idsToReplace[InRun].empty()) {
00089 secondaryFileSequence_.reset();
00090 } else {
00091 for(int i = InEvent; i < NumBranchTypes; ++i) {
00092 branchIDsToReplace_[i].reserve(idsToReplace[i].size());
00093 for(std::set<BranchID>::const_iterator it = idsToReplace[i].begin(), itEnd = idsToReplace[i].end();
00094 it != itEnd; ++it) {
00095 branchIDsToReplace_[i].push_back(*it);
00096 }
00097 }
00098 }
00099 }
00100 }
00101
00102 PoolSource::~PoolSource() {}
00103
00104 void
00105 PoolSource::endJob() {
00106 if(secondaryFileSequence_) secondaryFileSequence_->endJob();
00107 primaryFileSequence_->endJob();
00108 InputFile::reportReadBranches();
00109 }
00110
00111 boost::shared_ptr<FileBlock>
00112 PoolSource::readFile_() {
00113 boost::shared_ptr<FileBlock> fb = primaryFileSequence_->readFile_(principalCache());
00114 if(secondaryFileSequence_) {
00115 fb->setNotFastClonable(FileBlock::HasSecondaryFileSequence);
00116 }
00117 return fb;
00118 }
00119
00120 void PoolSource::closeFile_() {
00121 primaryFileSequence_->closeFile_();
00122 }
00123
00124 boost::shared_ptr<RunAuxiliary>
00125 PoolSource::readRunAuxiliary_() {
00126 return primaryFileSequence_->readRunAuxiliary_();
00127 }
00128
00129 boost::shared_ptr<LuminosityBlockAuxiliary>
00130 PoolSource::readLuminosityBlockAuxiliary_() {
00131 return primaryFileSequence_->readLuminosityBlockAuxiliary_();
00132 }
00133
00134 boost::shared_ptr<RunPrincipal>
00135 PoolSource::readRun_(boost::shared_ptr<RunPrincipal> rpCache) {
00136 if(secondaryFileSequence_ && !branchIDsToReplace_[InRun].empty()) {
00137 boost::shared_ptr<RunPrincipal> primaryPrincipal = primaryFileSequence_->readRun_(rpCache);
00138 bool found = secondaryFileSequence_->skipToItem(primaryPrincipal->run(), 0U, 0U);
00139 if(found) {
00140 boost::shared_ptr<RunAuxiliary> secondaryAuxiliary = secondaryFileSequence_->readRunAuxiliary_();
00141 checkConsistency(primaryPrincipal->aux(), *secondaryAuxiliary);
00142 boost::shared_ptr<RunPrincipal> rp(new RunPrincipal(secondaryAuxiliary, secondaryFileSequence_->fileProductRegistry(), processConfiguration()));
00143 secondaryRunPrincipal_ = secondaryFileSequence_->readRun_(rp);
00144 checkHistoryConsistency(*primaryPrincipal, *secondaryRunPrincipal_);
00145 primaryPrincipal->recombine(*secondaryRunPrincipal_, branchIDsToReplace_[InRun]);
00146 } else {
00147 throw Exception(errors::MismatchedInputFiles, "PoolSource::readRun_")
00148 << " Run " << primaryPrincipal->run()
00149 << " is not found in the secondary input files\n";
00150 }
00151 return primaryPrincipal;
00152 }
00153 return primaryFileSequence_->readRun_(rpCache);
00154 }
00155
00156 boost::shared_ptr<LuminosityBlockPrincipal>
00157 PoolSource::readLuminosityBlock_(boost::shared_ptr<LuminosityBlockPrincipal> lbCache) {
00158 if(secondaryFileSequence_ && !branchIDsToReplace_[InLumi].empty()) {
00159 boost::shared_ptr<LuminosityBlockPrincipal> primaryPrincipal = primaryFileSequence_->readLuminosityBlock_(lbCache);
00160 bool found = secondaryFileSequence_->skipToItem(primaryPrincipal->run(), primaryPrincipal->luminosityBlock(), 0U);
00161 if(found) {
00162 boost::shared_ptr<LuminosityBlockAuxiliary> secondaryAuxiliary = secondaryFileSequence_->readLuminosityBlockAuxiliary_();
00163 checkConsistency(primaryPrincipal->aux(), *secondaryAuxiliary);
00164 boost::shared_ptr<LuminosityBlockPrincipal> lbp(new LuminosityBlockPrincipal(secondaryAuxiliary, secondaryFileSequence_->fileProductRegistry(), processConfiguration(), secondaryRunPrincipal_));
00165 secondaryLumiPrincipal_ = secondaryFileSequence_->readLuminosityBlock_(lbp);
00166 checkHistoryConsistency(*primaryPrincipal, *secondaryLumiPrincipal_);
00167 primaryPrincipal->recombine(*secondaryLumiPrincipal_, branchIDsToReplace_[InLumi]);
00168 } else {
00169 throw Exception(errors::MismatchedInputFiles, "PoolSource::readLuminosityBlock_")
00170 << " Run " << primaryPrincipal->run()
00171 << " LuminosityBlock " << primaryPrincipal->luminosityBlock()
00172 << " is not found in the secondary input files\n";
00173 }
00174 return primaryPrincipal;
00175 }
00176 return primaryFileSequence_->readLuminosityBlock_(lbCache);
00177 }
00178
00179 EventPrincipal*
00180 PoolSource::readEvent_() {
00181 EventSourceSentry(*this);
00182 EventPrincipal* primaryPrincipal = primaryFileSequence_->readEvent(*eventPrincipalCache(), luminosityBlockPrincipal());
00183 if(secondaryFileSequence_ && !branchIDsToReplace_[InEvent].empty()) {
00184 bool found = secondaryFileSequence_->skipToItem(primaryPrincipal->run(),
00185 primaryPrincipal->luminosityBlock(),
00186 primaryPrincipal->id().event());
00187 if(found) {
00188 EventPrincipal* secondaryPrincipal = secondaryFileSequence_->readEvent(*secondaryEventPrincipal_, secondaryLumiPrincipal_);
00189 checkConsistency(*primaryPrincipal, *secondaryPrincipal);
00190 checkHistoryConsistency(*primaryPrincipal, *secondaryPrincipal);
00191 primaryPrincipal->recombine(*secondaryPrincipal, branchIDsToReplace_[InEvent]);
00192 primaryPrincipal->mergeMappers(*secondaryPrincipal);
00193 secondaryEventPrincipal_->clearPrincipal();
00194 } else {
00195 throw Exception(errors::MismatchedInputFiles, "PoolSource::readEvent_") <<
00196 primaryPrincipal->id() << " is not found in the secondary input files\n";
00197 }
00198 }
00199 if(receiver_) {
00200 --numberOfEventsBeforeBigSkip_;
00201 }
00202 return primaryPrincipal;
00203 }
00204
00205 EventPrincipal*
00206 PoolSource::readIt(EventID const& id) {
00207 bool found = primaryFileSequence_->skipToItem(id.run(), id.luminosityBlock(), id.event());
00208 if(!found) return 0;
00209 return readEvent_();
00210 }
00211
00212 InputSource::ItemType
00213 PoolSource::getNextItemType() {
00214 if(receiver_ &&
00215 0 == numberOfEventsBeforeBigSkip_) {
00216 receiver_->receive();
00217 unsigned long toSkip = receiver_->numberToSkip();
00218 if(0 != toSkip) {
00219 primaryFileSequence_->skipEvents(toSkip, principalCache());
00220 decreaseRemainingEventsBy(toSkip);
00221 }
00222 numberOfEventsBeforeBigSkip_ = receiver_->numberOfConsecutiveIndices();
00223 if(0 == numberOfEventsBeforeBigSkip_ or 0==remainingEvents()) {
00224 return IsStop;
00225 }
00226 }
00227 return primaryFileSequence_->getNextItemType();;
00228 }
00229
00230 void
00231 PoolSource::preForkReleaseResources() {
00232 primaryFileSequence_->closeFile_();
00233 }
00234
00235 void
00236 PoolSource::postForkReacquireResources(boost::shared_ptr<edm::multicore::MessageReceiverForSource> iReceiver) {
00237 receiver_ = iReceiver;
00238 receiver_->receive();
00239 primaryFileSequence_->reset(principalCache());
00240 rewind();
00241 decreaseRemainingEventsBy(receiver_->numberToSkip());
00242 }
00243
00244
00245 void
00246 PoolSource::rewind_() {
00247 primaryFileSequence_->rewind_();
00248 if(receiver_) {
00249 unsigned int numberToSkip = receiver_->numberToSkip();
00250 if(0 != numberToSkip) {
00251 primaryFileSequence_->skipEvents(numberToSkip, principalCache());
00252 }
00253 numberOfEventsBeforeBigSkip_ = receiver_->numberOfConsecutiveIndices();
00254 }
00255
00256 }
00257
00258
00259 void
00260 PoolSource::skip(int offset) {
00261 primaryFileSequence_->skipEvents(offset, principalCache());
00262 }
00263
00264 bool
00265 PoolSource::goToEvent_(EventID const& eventID) {
00266 return primaryFileSequence_->goToEvent(eventID);
00267 }
00268
00269 EventPrincipal*
00270 PoolSource::readOneRandom() {
00271 assert(!secondaryFileSequence_);
00272 return primaryFileSequence_->readOneRandom();
00273 }
00274
00275 EventPrincipal*
00276 PoolSource::readOneRandomWithID(LuminosityBlockID const& lumiID) {
00277 assert(!secondaryFileSequence_);
00278 return primaryFileSequence_->readOneRandomWithID(lumiID);
00279 }
00280
00281 EventPrincipal*
00282 PoolSource::readOneSequential() {
00283 assert(!secondaryFileSequence_);
00284 return primaryFileSequence_->readOneSequential();
00285 }
00286
00287 EventPrincipal*
00288 PoolSource::readOneSequentialWithID(LuminosityBlockID const& lumiID) {
00289 assert(!secondaryFileSequence_);
00290 return primaryFileSequence_->readOneSequentialWithID(lumiID);
00291 }
00292
00293 EventPrincipal*
00294 PoolSource::readOneSpecified(EventID const& id) {
00295 assert(!secondaryFileSequence_);
00296 return primaryFileSequence_->readOneSpecified(id);
00297 }
00298
00299 void
00300 PoolSource::dropUnwantedBranches_(std::vector<std::string> const& wantedBranches) {
00301 assert(!secondaryFileSequence_);
00302 primaryFileSequence_->dropUnwantedBranches_(wantedBranches);
00303 }
00304
00305 void
00306 PoolSource::fillDescriptions(ConfigurationDescriptions & descriptions) {
00307
00308 ParameterSetDescription desc;
00309
00310 desc.setComment("Reads EDM/Root files.");
00311 VectorInputSource::fillDescription(desc);
00312 RootInputFileSequence::fillDescription(desc);
00313
00314 descriptions.add("source", desc);
00315 }
00316
00317 bool
00318 PoolSource::randomAccess_() const {
00319 return true;
00320 }
00321
00322 ProcessingController::ForwardState
00323 PoolSource::forwardState_() const {
00324 return primaryFileSequence_->forwardState();
00325 }
00326
00327 ProcessingController::ReverseState
00328 PoolSource::reverseState_() const {
00329 return primaryFileSequence_->reverseState();
00330 }
00331 }