CMS 3D CMS Logo

/data/git/CMSSW_5_3_11_patch5/src/IOPool/Input/src/PoolSource.cc

Go to the documentation of this file.
00001 /*----------------------------------------------------------------------
00002 ----------------------------------------------------------------------*/
00003 #include "PoolSource.h"
00004 #include "InputFile.h"
00005 #include "InputType.h"
00006 #include "RootInputFileSequence.h"
00007 #include "DataFormats/Provenance/interface/ProductRegistry.h"
00008 #include "FWCore/Framework/interface/EventPrincipal.h"
00009 #include "FWCore/Framework/interface/FileBlock.h"
00010 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
00011 #include "FWCore/Framework/interface/MessageReceiverForSource.h"
00012 #include "FWCore/Framework/interface/RunPrincipal.h"
00013 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
00014 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
00015 #include "FWCore/Utilities/interface/EDMException.h"
00016 #include "FWCore/Utilities/interface/Exception.h"
00017 
00018 #include <set>
00019 
00020 namespace edm {
00021 
00022   class LuminosityBlockID;
00023   class EventID;
00024 
00025   namespace {
00026     void checkHistoryConsistency(Principal const& primary, Principal const& secondary) {
00027       ProcessHistory const& ph1 = primary.processHistory();
00028       ProcessHistory const& ph2 = secondary.processHistory();
00029       if(ph1 != ph2 && !isAncestor(ph2, ph1)) {
00030         throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency") <<
00031           "The secondary file is not an ancestor of the primary file\n";
00032       }
00033     }
00034     void checkConsistency(EventPrincipal const& primary, EventPrincipal const& secondary) {
00035       if(!isSameEvent(primary, secondary)) {
00036         throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency") <<
00037           primary.id() << " has inconsistent EventAuxiliary data in the primary and secondary file\n";
00038       }
00039     }
00040     void checkConsistency(LuminosityBlockAuxiliary const& primary, LuminosityBlockAuxiliary const& secondary) {
00041       if(primary.id() != secondary.id()) {
00042         throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency") <<
00043           primary.id() << " has inconsistent LuminosityBlockAuxiliary data in the primary and secondary file\n";
00044       }
00045     }
00046     void checkConsistency(RunAuxiliary const& primary, RunAuxiliary const& secondary) {
00047       if(primary.id() != secondary.id()) {
00048         throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency") <<
00049           primary.id() << " has inconsistent RunAuxiliary data in the primary and secondary file\n";
00050       }
00051     }
00052   }
00053 
00054   PoolSource::PoolSource(ParameterSet const& pset, InputSourceDescription const& desc) :
00055     VectorInputSource(pset, desc),
00056     rootServiceChecker_(),
00057     primaryFileSequence_(new RootInputFileSequence(pset, *this, catalog(), principalCache(),
00058                                                    primary() ? InputType::Primary : InputType::SecondarySource)),
00059     secondaryFileSequence_(catalog(1).empty() ? 0 :
00060                            new RootInputFileSequence(pset, *this, catalog(1), principalCache(), InputType::SecondaryFile)),
00061     secondaryRunPrincipal_(),
00062     secondaryLumiPrincipal_(),
00063     secondaryEventPrincipal_(secondaryFileSequence_ ? new EventPrincipal(secondaryFileSequence_->fileProductRegistry(), processConfiguration()) : 0),
00064     branchIDsToReplace_(),
00065     numberOfEventsBeforeBigSkip_(0) {
00066     if(secondaryFileSequence_) {
00067       assert(primary());
00068       std::array<std::set<BranchID>, NumBranchTypes> idsToReplace;
00069       ProductRegistry::ProductList const& secondary = secondaryFileSequence_->fileProductRegistry()->productList();
00070       ProductRegistry::ProductList const& primary = primaryFileSequence_->fileProductRegistry()->productList();
00071       typedef ProductRegistry::ProductList::const_iterator const_iterator;
00072       typedef ProductRegistry::ProductList::iterator iterator;
00073       //this is the registry used by the 'outside' world and only has the primary file information in it at present
00074       ProductRegistry::ProductList& fullList = productRegistryUpdate().productListUpdator();
00075       for(const_iterator it = secondary.begin(), itEnd = secondary.end(); it != itEnd; ++it) {
00076         if(it->second.present()) {
00077           idsToReplace[it->second.branchType()].insert(it->second.branchID());
00078           //now make sure this is marked as not dropped else the product will not be 'get'table from the Event
00079           iterator itFound = fullList.find(it->first);
00080           if(itFound != fullList.end()) {
00081             itFound->second.dropped()=false;
00082           }
00083         }
00084       }
00085       for(const_iterator it = primary.begin(), itEnd = primary.end(); it != itEnd; ++it) {
00086         if(it->second.present()) idsToReplace[it->second.branchType()].erase(it->second.branchID());
00087       }
00088       if(idsToReplace[InEvent].empty() && idsToReplace[InLumi].empty() && idsToReplace[InRun].empty()) {
00089         secondaryFileSequence_.reset();
00090       } else {
00091         for(int i = InEvent; i < NumBranchTypes; ++i) {
00092           branchIDsToReplace_[i].reserve(idsToReplace[i].size());
00093           for(std::set<BranchID>::const_iterator it = idsToReplace[i].begin(), itEnd = idsToReplace[i].end();
00094                it != itEnd; ++it) {
00095             branchIDsToReplace_[i].push_back(*it);
00096           }
00097         }
00098       }
00099     }
00100   }
00101 
00102   PoolSource::~PoolSource() {}
00103 
00104   void
00105   PoolSource::endJob() {
00106     if(secondaryFileSequence_) secondaryFileSequence_->endJob();
00107     primaryFileSequence_->endJob();
00108     InputFile::reportReadBranches();
00109   }
00110 
00111   boost::shared_ptr<FileBlock>
00112   PoolSource::readFile_() {
00113     boost::shared_ptr<FileBlock> fb = primaryFileSequence_->readFile_(principalCache());
00114     if(secondaryFileSequence_) {
00115       fb->setNotFastClonable(FileBlock::HasSecondaryFileSequence);
00116     }
00117     return fb;
00118   }
00119 
00120   void PoolSource::closeFile_() {
00121     primaryFileSequence_->closeFile_();
00122   }
00123 
00124   boost::shared_ptr<RunAuxiliary>
00125   PoolSource::readRunAuxiliary_() {
00126     return primaryFileSequence_->readRunAuxiliary_();
00127   }
00128 
00129   boost::shared_ptr<LuminosityBlockAuxiliary>
00130   PoolSource::readLuminosityBlockAuxiliary_() {
00131     return primaryFileSequence_->readLuminosityBlockAuxiliary_();
00132   }
00133 
00134   boost::shared_ptr<RunPrincipal>
00135   PoolSource::readRun_(boost::shared_ptr<RunPrincipal> rpCache) {
00136     if(secondaryFileSequence_ && !branchIDsToReplace_[InRun].empty()) {
00137       boost::shared_ptr<RunPrincipal> primaryPrincipal = primaryFileSequence_->readRun_(rpCache);
00138       bool found = secondaryFileSequence_->skipToItem(primaryPrincipal->run(), 0U, 0U);
00139       if(found) {
00140         boost::shared_ptr<RunAuxiliary> secondaryAuxiliary = secondaryFileSequence_->readRunAuxiliary_();
00141         checkConsistency(primaryPrincipal->aux(), *secondaryAuxiliary);
00142         boost::shared_ptr<RunPrincipal> rp(new RunPrincipal(secondaryAuxiliary, secondaryFileSequence_->fileProductRegistry(), processConfiguration()));
00143         secondaryRunPrincipal_ = secondaryFileSequence_->readRun_(rp);
00144         checkHistoryConsistency(*primaryPrincipal, *secondaryRunPrincipal_);
00145         primaryPrincipal->recombine(*secondaryRunPrincipal_, branchIDsToReplace_[InRun]);
00146       } else {
00147         throw Exception(errors::MismatchedInputFiles, "PoolSource::readRun_")
00148           << " Run " << primaryPrincipal->run()
00149           << " is not found in the secondary input files\n";
00150       }
00151       return primaryPrincipal;
00152     }
00153     return primaryFileSequence_->readRun_(rpCache);
00154   }
00155 
00156   boost::shared_ptr<LuminosityBlockPrincipal>
00157   PoolSource::readLuminosityBlock_(boost::shared_ptr<LuminosityBlockPrincipal> lbCache) {
00158     if(secondaryFileSequence_ && !branchIDsToReplace_[InLumi].empty()) {
00159       boost::shared_ptr<LuminosityBlockPrincipal> primaryPrincipal = primaryFileSequence_->readLuminosityBlock_(lbCache);
00160       bool found = secondaryFileSequence_->skipToItem(primaryPrincipal->run(), primaryPrincipal->luminosityBlock(), 0U);
00161       if(found) {
00162         boost::shared_ptr<LuminosityBlockAuxiliary> secondaryAuxiliary = secondaryFileSequence_->readLuminosityBlockAuxiliary_();
00163         checkConsistency(primaryPrincipal->aux(), *secondaryAuxiliary);
00164         boost::shared_ptr<LuminosityBlockPrincipal> lbp(new LuminosityBlockPrincipal(secondaryAuxiliary, secondaryFileSequence_->fileProductRegistry(), processConfiguration(), secondaryRunPrincipal_));
00165         secondaryLumiPrincipal_ = secondaryFileSequence_->readLuminosityBlock_(lbp);
00166         checkHistoryConsistency(*primaryPrincipal, *secondaryLumiPrincipal_);
00167         primaryPrincipal->recombine(*secondaryLumiPrincipal_, branchIDsToReplace_[InLumi]);
00168       } else {
00169         throw Exception(errors::MismatchedInputFiles, "PoolSource::readLuminosityBlock_")
00170           << " Run " << primaryPrincipal->run()
00171           << " LuminosityBlock " << primaryPrincipal->luminosityBlock()
00172           << " is not found in the secondary input files\n";
00173       }
00174       return primaryPrincipal;
00175     }
00176     return primaryFileSequence_->readLuminosityBlock_(lbCache);
00177   }
00178 
00179   EventPrincipal*
00180   PoolSource::readEvent_() {
00181     EventSourceSentry(*this);
00182     EventPrincipal* primaryPrincipal = primaryFileSequence_->readEvent(*eventPrincipalCache(), luminosityBlockPrincipal());
00183     if(secondaryFileSequence_ && !branchIDsToReplace_[InEvent].empty()) {
00184       bool found = secondaryFileSequence_->skipToItem(primaryPrincipal->run(),
00185                                                       primaryPrincipal->luminosityBlock(),
00186                                                       primaryPrincipal->id().event());
00187       if(found) {
00188         EventPrincipal* secondaryPrincipal = secondaryFileSequence_->readEvent(*secondaryEventPrincipal_, secondaryLumiPrincipal_);
00189         checkConsistency(*primaryPrincipal, *secondaryPrincipal);
00190         checkHistoryConsistency(*primaryPrincipal, *secondaryPrincipal);
00191         primaryPrincipal->recombine(*secondaryPrincipal, branchIDsToReplace_[InEvent]);
00192         primaryPrincipal->mergeMappers(*secondaryPrincipal);
00193         secondaryEventPrincipal_->clearPrincipal();
00194       } else {
00195         throw Exception(errors::MismatchedInputFiles, "PoolSource::readEvent_") <<
00196           primaryPrincipal->id() << " is not found in the secondary input files\n";
00197       }
00198     }
00199     if(receiver_) {
00200       --numberOfEventsBeforeBigSkip_;
00201     }
00202     return primaryPrincipal;
00203   }
00204 
00205   EventPrincipal*
00206   PoolSource::readIt(EventID const& id) {
00207     bool found = primaryFileSequence_->skipToItem(id.run(), id.luminosityBlock(), id.event());
00208     if(!found) return 0;
00209     return readEvent_();
00210   }
00211 
00212   InputSource::ItemType
00213   PoolSource::getNextItemType() {
00214     if(receiver_ &&
00215        0 == numberOfEventsBeforeBigSkip_) {
00216       receiver_->receive();
00217       unsigned long toSkip = receiver_->numberToSkip();
00218       if(0 != toSkip) {
00219         primaryFileSequence_->skipEvents(toSkip, principalCache());
00220         decreaseRemainingEventsBy(toSkip);
00221       }
00222       numberOfEventsBeforeBigSkip_ = receiver_->numberOfConsecutiveIndices();
00223       if(0 == numberOfEventsBeforeBigSkip_ or 0==remainingEvents()) {
00224         return IsStop;
00225       }
00226     }
00227     return primaryFileSequence_->getNextItemType();;
00228   }
00229 
00230   void
00231   PoolSource::preForkReleaseResources() {
00232     primaryFileSequence_->closeFile_();
00233   }
00234 
00235   void
00236   PoolSource::postForkReacquireResources(boost::shared_ptr<edm::multicore::MessageReceiverForSource> iReceiver) {
00237     receiver_ = iReceiver;
00238     receiver_->receive();
00239     primaryFileSequence_->reset(principalCache());
00240     rewind();
00241     decreaseRemainingEventsBy(receiver_->numberToSkip());
00242   }
00243 
00244   // Rewind to before the first event that was read.
00245   void
00246   PoolSource::rewind_() {
00247     primaryFileSequence_->rewind_();
00248     if(receiver_) {
00249       unsigned int numberToSkip = receiver_->numberToSkip();
00250       if(0 != numberToSkip) {
00251         primaryFileSequence_->skipEvents(numberToSkip, principalCache());
00252       }
00253       numberOfEventsBeforeBigSkip_ = receiver_->numberOfConsecutiveIndices();
00254     }
00255 
00256   }
00257 
00258   // Advance "offset" events.  Offset can be positive or negative (or zero).
00259   void
00260   PoolSource::skip(int offset) {
00261     primaryFileSequence_->skipEvents(offset, principalCache());
00262   }
00263 
00264   bool
00265   PoolSource::goToEvent_(EventID const& eventID) {
00266     return primaryFileSequence_->goToEvent(eventID);
00267   }
00268 
00269   EventPrincipal*
00270   PoolSource::readOneRandom() {
00271     assert(!secondaryFileSequence_);
00272     return primaryFileSequence_->readOneRandom();
00273   }
00274 
00275   EventPrincipal*
00276   PoolSource::readOneRandomWithID(LuminosityBlockID const& lumiID) {
00277     assert(!secondaryFileSequence_);
00278     return primaryFileSequence_->readOneRandomWithID(lumiID);
00279   }
00280 
00281   EventPrincipal*
00282   PoolSource::readOneSequential() {
00283     assert(!secondaryFileSequence_);
00284     return primaryFileSequence_->readOneSequential();
00285   }
00286 
00287   EventPrincipal*
00288   PoolSource::readOneSequentialWithID(LuminosityBlockID const& lumiID) {
00289     assert(!secondaryFileSequence_);
00290     return primaryFileSequence_->readOneSequentialWithID(lumiID);
00291   }
00292 
00293   EventPrincipal*
00294   PoolSource::readOneSpecified(EventID const& id) {
00295     assert(!secondaryFileSequence_);
00296     return primaryFileSequence_->readOneSpecified(id);
00297   }
00298 
00299   void
00300   PoolSource::dropUnwantedBranches_(std::vector<std::string> const& wantedBranches) {
00301     assert(!secondaryFileSequence_);
00302     primaryFileSequence_->dropUnwantedBranches_(wantedBranches);
00303   }
00304 
00305   void
00306   PoolSource::fillDescriptions(ConfigurationDescriptions & descriptions) {
00307 
00308     ParameterSetDescription desc;
00309 
00310     desc.setComment("Reads EDM/Root files.");
00311     VectorInputSource::fillDescription(desc);
00312     RootInputFileSequence::fillDescription(desc);
00313 
00314     descriptions.add("source", desc);
00315   }
00316 
00317   bool
00318   PoolSource::randomAccess_() const {
00319     return true;
00320   }
00321 
00322   ProcessingController::ForwardState
00323   PoolSource::forwardState_() const {
00324     return primaryFileSequence_->forwardState();
00325   }
00326 
00327   ProcessingController::ReverseState
00328   PoolSource::reverseState_() const {
00329     return primaryFileSequence_->reverseState();
00330   }
00331 }