CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_4_4_5_patch3/src/IOPool/Input/src/PoolSource.cc

Go to the documentation of this file.
00001 /*----------------------------------------------------------------------
00002 ----------------------------------------------------------------------*/
00003 #include "PoolSource.h"
00004 #include "InputType.h"
00005 #include "RootInputFileSequence.h"
00006 #include "DataFormats/Provenance/interface/ProductRegistry.h"
00007 #include "FWCore/Framework/interface/EventPrincipal.h"
00008 #include "FWCore/Framework/interface/FileBlock.h"
00009 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
00010 #include "FWCore/Framework/interface/MessageReceiverForSource.h"
00011 #include "FWCore/Framework/interface/RunPrincipal.h"
00012 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
00013 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
00014 #include "FWCore/Utilities/interface/EDMException.h"
00015 #include "FWCore/Utilities/interface/Exception.h"
00016 
00017 #include <set>
00018 
00019 namespace edm {
00020 
00021   class LuminosityBlockID;
00022   class EventID;
00023 
00024   namespace {
00025     void checkHistoryConsistency(Principal const& primary, Principal const& secondary) {
00026       ProcessHistory const& ph1 = primary.processHistory();
00027       ProcessHistory const& ph2 = secondary.processHistory();
00028       if(ph1 != ph2 && !isAncestor(ph2, ph1)) {
00029         throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency") <<
00030           "The secondary file is not an ancestor of the primary file\n";
00031       }
00032     }
00033     void checkConsistency(EventPrincipal const& primary, EventPrincipal const& secondary) {
00034       if(!isSameEvent(primary, secondary)) {
00035         throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency") <<
00036           primary.id() << " has inconsistent EventAuxiliary data in the primary and secondary file\n";
00037       }
00038     }
00039     void checkConsistency(LuminosityBlockAuxiliary const& primary, LuminosityBlockAuxiliary const& secondary) {
00040       if(primary.id() != secondary.id()) {
00041         throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency") <<
00042           primary.id() << " has inconsistent LuminosityBlockAuxiliary data in the primary and secondary file\n";
00043       }
00044     }
00045     void checkConsistency(RunAuxiliary const& primary, RunAuxiliary const& secondary) {
00046       if(primary.id() != secondary.id()) {
00047         throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency") <<
00048           primary.id() << " has inconsistent RunAuxiliary data in the primary and secondary file\n";
00049       }
00050     }
00051   }
00052 
00053   PoolSource::PoolSource(ParameterSet const& pset, InputSourceDescription const& desc) :
00054     VectorInputSource(pset, desc),
00055     rootServiceChecker_(),
00056     primaryFileSequence_(new RootInputFileSequence(pset, *this, catalog(), principalCache(),
00057                                                    primary() ? InputType::Primary : InputType::SecondarySource)),
00058     secondaryFileSequence_(catalog(1).empty() ? 0 :
00059                            new RootInputFileSequence(pset, *this, catalog(1), principalCache(), InputType::SecondaryFile)),
00060     secondaryRunPrincipal_(),
00061     secondaryLumiPrincipal_(),
00062     secondaryEventPrincipal_(secondaryFileSequence_ ? new EventPrincipal(secondaryFileSequence_->fileProductRegistry(), processConfiguration()) : 0),
00063     branchIDsToReplace_(),
00064     numberOfEventsBeforeBigSkip_(0) {
00065     if(secondaryFileSequence_) {
00066       assert(primary());
00067       boost::array<std::set<BranchID>, NumBranchTypes> idsToReplace;
00068       ProductRegistry::ProductList const& secondary = secondaryFileSequence_->fileProductRegistry()->productList();
00069       ProductRegistry::ProductList const& primary = primaryFileSequence_->fileProductRegistry()->productList();
00070       typedef ProductRegistry::ProductList::const_iterator const_iterator;
00071       typedef ProductRegistry::ProductList::iterator iterator;
00072       //this is the registry used by the 'outside' world and only has the primary file information in it at present
00073       ProductRegistry::ProductList& fullList = productRegistryUpdate().productListUpdator();
00074       for(const_iterator it = secondary.begin(), itEnd = secondary.end(); it != itEnd; ++it) {
00075         if(it->second.present()) {
00076           idsToReplace[it->second.branchType()].insert(it->second.branchID());
00077           //now make sure this is marked as not dropped else the product will not be 'get'table from the Event
00078           iterator itFound = fullList.find(it->first);
00079           if(itFound != fullList.end()) {
00080             itFound->second.dropped()=false;
00081           }
00082         }
00083       }
00084       for(const_iterator it = primary.begin(), itEnd = primary.end(); it != itEnd; ++it) {
00085         if(it->second.present()) idsToReplace[it->second.branchType()].erase(it->second.branchID());
00086       }
00087       if(idsToReplace[InEvent].empty() && idsToReplace[InLumi].empty() && idsToReplace[InRun].empty()) {
00088         secondaryFileSequence_.reset();
00089       } else {
00090         for(int i = InEvent; i < NumBranchTypes; ++i) {
00091           branchIDsToReplace_[i].reserve(idsToReplace[i].size());
00092           for(std::set<BranchID>::const_iterator it = idsToReplace[i].begin(), itEnd = idsToReplace[i].end();
00093                it != itEnd; ++it) {
00094             branchIDsToReplace_[i].push_back(*it);
00095           }
00096         }
00097       }
00098     }
00099   }
00100 
00101   PoolSource::~PoolSource() {}
00102 
00103   void
00104   PoolSource::endJob() {
00105     if(secondaryFileSequence_) secondaryFileSequence_->endJob();
00106     primaryFileSequence_->endJob();
00107   }
00108 
00109   boost::shared_ptr<FileBlock>
00110   PoolSource::readFile_() {
00111     boost::shared_ptr<FileBlock> fb = primaryFileSequence_->readFile_(principalCache());
00112     if(secondaryFileSequence_) {
00113       fb->setNotFastClonable(FileBlock::HasSecondaryFileSequence);
00114     }
00115     return fb;
00116   }
00117 
00118   void PoolSource::closeFile_() {
00119     primaryFileSequence_->closeFile_();
00120   }
00121 
00122   boost::shared_ptr<RunAuxiliary>
00123   PoolSource::readRunAuxiliary_() {
00124     return primaryFileSequence_->readRunAuxiliary_();
00125   }
00126 
00127   boost::shared_ptr<LuminosityBlockAuxiliary>
00128   PoolSource::readLuminosityBlockAuxiliary_() {
00129     return primaryFileSequence_->readLuminosityBlockAuxiliary_();
00130   }
00131 
00132   boost::shared_ptr<RunPrincipal>
00133   PoolSource::readRun_(boost::shared_ptr<RunPrincipal> rpCache) {
00134     if(secondaryFileSequence_ && !branchIDsToReplace_[InRun].empty()) {
00135       boost::shared_ptr<RunPrincipal> primaryPrincipal = primaryFileSequence_->readRun_(rpCache);
00136       bool found = secondaryFileSequence_->skipToItem(primaryPrincipal->run(), 0U, 0U);
00137       if(found) {
00138         boost::shared_ptr<RunAuxiliary> secondaryAuxiliary = secondaryFileSequence_->readRunAuxiliary_();
00139         checkConsistency(primaryPrincipal->aux(), *secondaryAuxiliary);
00140         boost::shared_ptr<RunPrincipal> rp(new RunPrincipal(secondaryAuxiliary, secondaryFileSequence_->fileProductRegistry(), processConfiguration()));
00141         secondaryRunPrincipal_ = secondaryFileSequence_->readRun_(rp);
00142         checkHistoryConsistency(*primaryPrincipal, *secondaryRunPrincipal_);
00143         primaryPrincipal->recombine(*secondaryRunPrincipal_, branchIDsToReplace_[InRun]);
00144       } else {
00145         throw Exception(errors::MismatchedInputFiles, "PoolSource::readRun_")
00146           << " Run " << primaryPrincipal->run()
00147           << " is not found in the secondary input files\n";
00148       }
00149       return primaryPrincipal;
00150     }
00151     return primaryFileSequence_->readRun_(rpCache);
00152   }
00153 
00154   boost::shared_ptr<LuminosityBlockPrincipal>
00155   PoolSource::readLuminosityBlock_(boost::shared_ptr<LuminosityBlockPrincipal> lbCache) {
00156     if(secondaryFileSequence_ && !branchIDsToReplace_[InLumi].empty()) {
00157       boost::shared_ptr<LuminosityBlockPrincipal> primaryPrincipal = primaryFileSequence_->readLuminosityBlock_(lbCache);
00158       bool found = secondaryFileSequence_->skipToItem(primaryPrincipal->run(), primaryPrincipal->luminosityBlock(), 0U);
00159       if(found) {
00160         boost::shared_ptr<LuminosityBlockAuxiliary> secondaryAuxiliary = secondaryFileSequence_->readLuminosityBlockAuxiliary_();
00161         checkConsistency(primaryPrincipal->aux(), *secondaryAuxiliary);
00162         boost::shared_ptr<LuminosityBlockPrincipal> lbp(new LuminosityBlockPrincipal(secondaryAuxiliary, secondaryFileSequence_->fileProductRegistry(), processConfiguration(), secondaryRunPrincipal_));
00163         secondaryLumiPrincipal_ = secondaryFileSequence_->readLuminosityBlock_(lbp);
00164         checkHistoryConsistency(*primaryPrincipal, *secondaryLumiPrincipal_);
00165         primaryPrincipal->recombine(*secondaryLumiPrincipal_, branchIDsToReplace_[InLumi]);
00166       } else {
00167         throw Exception(errors::MismatchedInputFiles, "PoolSource::readLuminosityBlock_")
00168           << " Run " << primaryPrincipal->run()
00169           << " LuminosityBlock " << primaryPrincipal->luminosityBlock()
00170           << " is not found in the secondary input files\n";
00171       }
00172       return primaryPrincipal;
00173     }
00174     return primaryFileSequence_->readLuminosityBlock_(lbCache);
00175   }
00176 
00177   EventPrincipal*
00178   PoolSource::readEvent_() {
00179     EventSourceSentry(*this);
00180     EventPrincipal* primaryPrincipal = primaryFileSequence_->readEvent(*eventPrincipalCache(), luminosityBlockPrincipal());
00181     if(secondaryFileSequence_ && !branchIDsToReplace_[InEvent].empty()) {
00182       bool found = secondaryFileSequence_->skipToItem(primaryPrincipal->run(),
00183                                                       primaryPrincipal->luminosityBlock(),
00184                                                       primaryPrincipal->id().event());
00185       if(found) {
00186         EventPrincipal* secondaryPrincipal = secondaryFileSequence_->readEvent(*secondaryEventPrincipal_, secondaryLumiPrincipal_);
00187         checkConsistency(*primaryPrincipal, *secondaryPrincipal);
00188         checkHistoryConsistency(*primaryPrincipal, *secondaryPrincipal);
00189         primaryPrincipal->recombine(*secondaryPrincipal, branchIDsToReplace_[InEvent]);
00190         secondaryEventPrincipal_->clearPrincipal();
00191       } else {
00192         throw Exception(errors::MismatchedInputFiles, "PoolSource::readEvent_") <<
00193           primaryPrincipal->id() << " is not found in the secondary input files\n";
00194       }
00195     }
00196     if(receiver_) {
00197       --numberOfEventsBeforeBigSkip_;
00198     }
00199     return primaryPrincipal;
00200   }
00201 
00202   EventPrincipal*
00203   PoolSource::readIt(EventID const& id) {
00204     bool found = primaryFileSequence_->skipToItem(id.run(), id.luminosityBlock(), id.event());
00205     if(!found) return 0;
00206     return readEvent_();
00207   }
00208 
00209   InputSource::ItemType
00210   PoolSource::getNextItemType() {
00211     if(receiver_ &&
00212        0 == numberOfEventsBeforeBigSkip_) {
00213       receiver_->receive();
00214       unsigned long toSkip = receiver_->numberToSkip();
00215       if(0 != toSkip) {
00216         primaryFileSequence_->skipEvents(toSkip, principalCache());
00217         decreaseRemainingEventsBy(toSkip);
00218       }
00219       numberOfEventsBeforeBigSkip_ = receiver_->numberOfConsecutiveIndices();
00220       if(0 == numberOfEventsBeforeBigSkip_ or 0==remainingEvents()) {
00221         return IsStop;
00222       }
00223     }
00224     return primaryFileSequence_->getNextItemType();;
00225   }
00226 
00227   void
00228   PoolSource::preForkReleaseResources() {
00229     primaryFileSequence_->closeFile_();
00230   }
00231 
00232   void
00233   PoolSource::postForkReacquireResources(boost::shared_ptr<edm::multicore::MessageReceiverForSource> iReceiver) {
00234     receiver_ = iReceiver;
00235     receiver_->receive();
00236     primaryFileSequence_->reset(principalCache());
00237     rewind();
00238     decreaseRemainingEventsBy(receiver_->numberToSkip());
00239   }
00240 
00241   // Rewind to before the first event that was read.
00242   void
00243   PoolSource::rewind_() {
00244     primaryFileSequence_->rewind_();
00245     if(receiver_) {
00246       unsigned int numberToSkip = receiver_->numberToSkip();
00247       if(0 != numberToSkip) {
00248         primaryFileSequence_->skipEvents(numberToSkip, principalCache());
00249       }
00250       numberOfEventsBeforeBigSkip_ = receiver_->numberOfConsecutiveIndices();
00251     }
00252 
00253   }
00254 
00255   // Advance "offset" events.  Offset can be positive or negative (or zero).
00256   void
00257   PoolSource::skip(int offset) {
00258     primaryFileSequence_->skipEvents(offset, principalCache());
00259   }
00260 
00261   bool
00262   PoolSource::goToEvent_(EventID const& eventID) {
00263     return primaryFileSequence_->goToEvent(eventID);
00264   }
00265 
00266   EventPrincipal*
00267   PoolSource::readOneRandom() {
00268     assert(!secondaryFileSequence_);
00269     return primaryFileSequence_->readOneRandom();
00270   }
00271 
00272   EventPrincipal*
00273   PoolSource::readOneSequential() {
00274     assert(!secondaryFileSequence_);
00275     return primaryFileSequence_->readOneSequential();
00276   }
00277 
00278   EventPrincipal*
00279   PoolSource::readOneSpecified(EventID const& id) {
00280     assert(!secondaryFileSequence_);
00281     return primaryFileSequence_->readOneSpecified(id);
00282   }
00283 
00284   void
00285   PoolSource::dropUnwantedBranches_(std::vector<std::string> const& wantedBranches) {
00286     assert(!secondaryFileSequence_);
00287     primaryFileSequence_->dropUnwantedBranches_(wantedBranches);
00288   }
00289 
00290   void
00291   PoolSource::fillDescriptions(ConfigurationDescriptions & descriptions) {
00292 
00293     ParameterSetDescription desc;
00294 
00295     desc.setComment("Reads EDM/Root files.");
00296     VectorInputSource::fillDescription(desc);
00297     RootInputFileSequence::fillDescription(desc);
00298 
00299     descriptions.add("source", desc);
00300   }
00301 
00302   bool
00303   PoolSource::randomAccess_() const {
00304     return true;
00305   }
00306 
00307   ProcessingController::ForwardState
00308   PoolSource::forwardState_() const {
00309     return primaryFileSequence_->forwardState();
00310   }
00311 
00312   ProcessingController::ReverseState
00313   PoolSource::reverseState_() const {
00314     return primaryFileSequence_->reverseState();
00315   }
00316 }