CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_6_1_2_SLHC2/src/FWCore/Framework/src/InputSource.cc

Go to the documentation of this file.
00001 /*----------------------------------------------------------------------
00002 ----------------------------------------------------------------------*/
00003 #include "FWCore/Framework/interface/InputSource.h"
00004 
00005 #include "DataFormats/Provenance/interface/ProcessHistory.h"
00006 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
00007 #include "DataFormats/Provenance/interface/ProductRegistry.h"
00008 #include "DataFormats/Provenance/interface/FullHistoryToReducedHistoryMap.h"
00009 #include "FWCore/Framework/interface/Event.h"
00010 #include "FWCore/Framework/interface/EventPrincipal.h"
00011 #include "FWCore/Framework/interface/ExceptionHelpers.h"
00012 #include "FWCore/Framework/interface/FileBlock.h"
00013 #include "FWCore/Framework/interface/InputSourceDescription.h"
00014 #include "FWCore/Framework/interface/LuminosityBlock.h"
00015 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
00016 #include "FWCore/Framework/interface/MessageReceiverForSource.h"
00017 #include "FWCore/Framework/interface/Run.h"
00018 #include "FWCore/Framework/interface/RunPrincipal.h"
00019 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00020 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00021 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
00022 #include "FWCore/ServiceRegistry/interface/Service.h"
00023 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
00024 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
00025 #include "FWCore/Utilities/interface/do_nothing_deleter.h"
00026 #include "FWCore/Utilities/interface/GlobalIdentifier.h"
00027 #include "FWCore/Utilities/interface/RandomNumberGenerator.h"
00028 #include "FWCore/Utilities/interface/TimeOfDay.h"
00029 
00030 #include <cassert>
00031 #include <fstream>
00032 #include <iomanip>
00033 
00034 namespace edm {
00035 
00036   namespace {
00037         std::string const& suffix(int count) {
00038           static std::string const st("st");
00039           static std::string const nd("nd");
00040           static std::string const rd("rd");
00041           static std::string const th("th");
00042           // *0, *4 - *9 use "th".
00043           int lastDigit = count % 10;
00044           if(lastDigit >= 4 || lastDigit == 0) return th;
00045           // *11, *12, or *13 use "th".
00046           if(count % 100 - lastDigit == 10) return th;
00047           return (lastDigit == 1 ? st : (lastDigit == 2 ? nd : rd));
00048         }
00049         template <typename T>
00050         boost::shared_ptr<T> createSharedPtrToStatic(T* ptr) {
00051           return boost::shared_ptr<T>(ptr, do_nothing_deleter());
00052         }
00053   }
00054 
00055   InputSource::InputSource(ParameterSet const& pset, InputSourceDescription const& desc) :
00056       ProductRegistryHelper(),
00057       actReg_(desc.actReg_),
00058       maxEvents_(desc.maxEvents_),
00059       remainingEvents_(maxEvents_),
00060       maxLumis_(desc.maxLumis_),
00061       remainingLumis_(maxLumis_),
00062       readCount_(0),
00063       processingMode_(RunsLumisAndEvents),
00064       moduleDescription_(desc.moduleDescription_),
00065       productRegistry_(createSharedPtrToStatic<ProductRegistry const>(desc.productRegistry_)),
00066       branchIDListHelper_(desc.branchIDListHelper_),
00067       primary_(pset.getParameter<std::string>("@module_label") == std::string("@main_input")),
00068       processGUID_(primary_ ? createGlobalIdentifier() : std::string()),
00069       time_(),
00070       newRun_(true),
00071       newLumi_(true),
00072       eventCached_(false),
00073       state_(IsInvalid),
00074       runAuxiliary_(),
00075       lumiAuxiliary_(),
00076       statusFileName_(),
00077       receiver_(),
00078       numberOfEventsBeforeBigSkip_(0) {
00079 
00080     if(pset.getUntrackedParameter<bool>("writeStatusFile", false)) {
00081       std::ostringstream statusfilename;
00082       statusfilename << "source_" << getpid();
00083       statusFileName_ = statusfilename.str();
00084     }
00085 
00086     // Secondary input sources currently do not have a product registry.
00087     if(primary_) {
00088       assert(desc.productRegistry_ != 0);
00089     }
00090     std::string const defaultMode("RunsLumisAndEvents");
00091     std::string const runMode("Runs");
00092     std::string const runLumiMode("RunsAndLumis");
00093 
00094     // The default value provided as the second argument to the getUntrackedParameter function call
00095     // is not used when the ParameterSet has been validated and the parameters are not optional
00096     // in the description.  As soon as all primary input sources and all modules with a secondary
00097     // input sources have defined descriptions, the defaults in the getUntrackedParameterSet function
00098     // calls can and should be deleted from the code.
00099     std::string processingMode = pset.getUntrackedParameter<std::string>("processingMode", defaultMode);
00100     if(processingMode == runMode) {
00101       processingMode_ = Runs;
00102     } else if(processingMode == runLumiMode) {
00103       processingMode_ = RunsAndLumis;
00104     } else if(processingMode != defaultMode) {
00105       throw Exception(errors::Configuration)
00106         << "InputSource::InputSource()\n"
00107         << "The 'processingMode' parameter for sources has an illegal value '" << processingMode << "'\n"
00108         << "Legal values are '" << defaultMode << "', '" << runLumiMode << "', or '" << runMode << "'.\n";
00109     }
00110   }
00111 
00112   InputSource::~InputSource() {}
00113 
00114   void
00115   InputSource::fillDescriptions(ConfigurationDescriptions& descriptions) {
00116     ParameterSetDescription desc;
00117     desc.setUnknown();
00118     descriptions.addDefault(desc);
00119   }
00120 
00121   void
00122   InputSource::prevalidate(ConfigurationDescriptions& ) {
00123   }
00124   
00125 
00126   static std::string const kBaseType("Source");
00127 
00128   std::string const&
00129   InputSource::baseType() {
00130     return kBaseType;
00131   }
00132 
00133   void
00134   InputSource::fillDescription(ParameterSetDescription& desc) {
00135     std::string defaultString("RunsLumisAndEvents");
00136     desc.addUntracked<std::string>("processingMode", defaultString)->setComment(
00137     "'RunsLumisAndEvents': process runs, lumis, and events.\n"
00138     "'RunsAndLumis':       process runs and lumis (not events).\n"
00139     "'Runs':               process runs (not lumis or events).");
00140     desc.addUntracked<bool>("writeStatusFile", false)->setComment("Write a status file. Intended for use by workflow management.");
00141   }
00142 
00143   bool
00144   InputSource::skipForForking() {
00145     if(eventLimitReached()) {
00146       return false;
00147     }
00148     if(receiver_ && 0 == numberOfEventsBeforeBigSkip_) {
00149       receiver_->receive();
00150       unsigned long toSkip = receiver_->numberToSkip();
00151       if(0 != toSkip) {
00152         skipEvents(toSkip);
00153         decreaseRemainingEventsBy(toSkip);
00154       }
00155       numberOfEventsBeforeBigSkip_ = receiver_->numberOfConsecutiveIndices();
00156       if(0 == numberOfEventsBeforeBigSkip_ or 0 == remainingEvents() or 0 == remainingLuminosityBlocks()) {
00157         return false;
00158       }
00159     }
00160     return true;
00161   }
00162 
00163   // This next function is to guarantee that "runs only" mode does not return events or lumis,
00164   // and that "runs and lumis only" mode does not return events.
00165   // For input sources that are not random access (e.g. you need to read through the events
00166   // to get to the lumis and runs), this is all that is involved to implement these modes.
00167   // For input sources where events or lumis can be skipped, getNextItemType() should
00168   // implement the skipping internally, so that the performance gain is realized.
00169   // If this is done for a source, the 'if' blocks in this function will never be entered
00170   // for that source.
00171   InputSource::ItemType
00172   InputSource::nextItemType_() {
00173     ItemType itemType = callWithTryCatchAndPrint<ItemType>( [this](){ return getNextItemType(); }, "Calling InputSource::getNextItemType" );
00174 
00175     if(itemType == IsEvent && processingMode() != RunsLumisAndEvents) {
00176       skipEvents(1);
00177       return nextItemType_();
00178     }
00179     if(itemType == IsLumi && processingMode() == Runs) {
00180       // QQQ skipLuminosityBlock_();
00181       return nextItemType_();
00182     }
00183     return itemType;
00184   }
00185 
00186   InputSource::ItemType
00187   InputSource::nextItemType() {
00188     ItemType oldState = state_;
00189     if(eventLimitReached()) {
00190       // If the maximum event limit has been reached, stop.
00191       state_ = IsStop;
00192     } else if(lumiLimitReached()) {
00193       // If the maximum lumi limit has been reached, stop
00194       // when reaching a new file, run, or lumi.
00195       if(oldState == IsInvalid || oldState == IsFile || oldState == IsRun || processingMode() != RunsLumisAndEvents) {
00196         state_ = IsStop;
00197       } else {
00198         ItemType newState = nextItemType_();
00199         if(newState == IsEvent) {
00200           assert (processingMode() == RunsLumisAndEvents);
00201           state_ = IsEvent;
00202         } else {
00203           state_ = IsStop;
00204         }
00205       }
00206     } else {
00207       ItemType newState = nextItemType_();
00208       if(newState == IsStop) {
00209         state_ = IsStop;
00210       } else if(newState == IsFile || oldState == IsInvalid) {
00211         state_ = IsFile;
00212       } else if(newState == IsRun || oldState == IsFile) {
00213         runAuxiliary_ = readRunAuxiliary();
00214         state_ = IsRun;
00215       } else if(newState == IsLumi || oldState == IsRun) {
00216         assert (processingMode() != Runs);
00217         lumiAuxiliary_ = readLuminosityBlockAuxiliary();
00218         state_ = IsLumi;
00219       } else {
00220         assert (processingMode() == RunsLumisAndEvents);
00221         state_ = IsEvent;
00222       }
00223     }
00224     if(state_ == IsStop) {
00225       lumiAuxiliary_.reset();
00226       runAuxiliary_.reset();
00227     }
00228     return state_;
00229   }
00230 
00231   boost::shared_ptr<LuminosityBlockAuxiliary>
00232   InputSource::readLuminosityBlockAuxiliary() {
00233     return callWithTryCatchAndPrint<boost::shared_ptr<LuminosityBlockAuxiliary> >( [this](){ return readLuminosityBlockAuxiliary_(); },
00234                                                                                    "Calling InputSource::readLuminosityBlockAuxiliary_" );
00235   }
00236 
00237   boost::shared_ptr<RunAuxiliary>
00238   InputSource::readRunAuxiliary() {
00239     return callWithTryCatchAndPrint<boost::shared_ptr<RunAuxiliary> >( [this](){ return readRunAuxiliary_(); },
00240                                                                        "Calling InputSource::readRunAuxiliary_" );
00241   }
00242 
00243   void
00244   InputSource::doBeginJob() {
00245     this->beginJob();
00246   }
00247 
00248   void
00249   InputSource::doEndJob() {
00250     endJob();
00251   }
00252 
00253   void
00254   InputSource::registerProducts() {
00255     if(!typeLabelList().empty()) {
00256       addToRegistry(typeLabelList().begin(), typeLabelList().end(), moduleDescription(), productRegistryUpdate());
00257     }
00258   }
00259 
00260   // Return a dummy file block.
00261   boost::shared_ptr<FileBlock>
00262   InputSource::readFile() {
00263     assert(state_ == IsFile);
00264     assert(!limitReached());
00265     boost::shared_ptr<FileBlock> fb = callWithTryCatchAndPrint<boost::shared_ptr<FileBlock> >( [this](){ return readFile_(); },
00266                                                                                                "Calling InputSource::readFile_" );
00267     return fb;
00268   }
00269 
00270   void
00271   InputSource::closeFile(boost::shared_ptr<FileBlock> fb, bool cleaningUpAfterException) {
00272     if(fb) fb->close();
00273     callWithTryCatchAndPrint<void>( [this](){ closeFile_(); },
00274                                     "Calling InputSource::closeFile_",
00275                                     cleaningUpAfterException );
00276     return;
00277   }
00278 
00279   // Return a dummy file block.
00280   // This function must be overridden for any input source that reads a file
00281   // containing Products.
00282   boost::shared_ptr<FileBlock>
00283   InputSource::readFile_() {
00284     return boost::shared_ptr<FileBlock>(new FileBlock);
00285   }
00286 
00287   boost::shared_ptr<RunPrincipal>
00288   InputSource::readAndCacheRun(HistoryAppender& historyAppender) {
00289     RunSourceSentry sentry(*this);
00290     boost::shared_ptr<RunPrincipal> rp(new RunPrincipal(runAuxiliary(), productRegistry_, processConfiguration(), &historyAppender));
00291     callWithTryCatchAndPrint<boost::shared_ptr<RunPrincipal> >( [this,&rp](){ return readRun_(rp); }, "Calling InputSource::readRun_" );
00292     return rp;
00293   }
00294 
00295   void
00296   InputSource::readAndMergeRun(boost::shared_ptr<RunPrincipal> rp) {
00297     RunSourceSentry sentry(*this);
00298     callWithTryCatchAndPrint<boost::shared_ptr<RunPrincipal> >( [this,&rp](){ return readRun_(rp); }, "Calling InputSource::readRun_" );
00299   }
00300 
00301   boost::shared_ptr<LuminosityBlockPrincipal>
00302   InputSource::readAndCacheLumi(HistoryAppender& historyAppender) {
00303     LumiSourceSentry sentry(*this);
00304     boost::shared_ptr<LuminosityBlockPrincipal> lbp(
00305       new LuminosityBlockPrincipal(luminosityBlockAuxiliary(),
00306                                    productRegistry_,
00307                                    processConfiguration(),
00308                                    &historyAppender));
00309     callWithTryCatchAndPrint<boost::shared_ptr<LuminosityBlockPrincipal> >( [this,&lbp](){ return readLuminosityBlock_(lbp); },
00310                                                                             "Calling InputSource::readLuminosityBlock_" );
00311     if(remainingLumis_ > 0) {
00312       --remainingLumis_;
00313     }
00314     return lbp;
00315   }
00316 
00317   void
00318   InputSource::readAndMergeLumi(boost::shared_ptr<LuminosityBlockPrincipal> lbp) {
00319     LumiSourceSentry sentry(*this);
00320     callWithTryCatchAndPrint<boost::shared_ptr<LuminosityBlockPrincipal> >( [this,&lbp](){ return readLuminosityBlock_(lbp); },
00321                                                                             "Calling InputSource::readLuminosityBlock_" );
00322     if(remainingLumis_ > 0) {
00323       --remainingLumis_;
00324     }
00325   }
00326 
00327   boost::shared_ptr<RunPrincipal>
00328   InputSource::readRun_(boost::shared_ptr<RunPrincipal> runPrincipal) {
00329     // Note: For the moment, we do not support saving and restoring the state of the
00330     // random number generator if random numbers are generated during processing of runs
00331     // (e.g. beginRun(), endRun())
00332     runPrincipal->fillRunPrincipal();
00333     return runPrincipal;
00334   }
00335 
00336   boost::shared_ptr<LuminosityBlockPrincipal>
00337   InputSource::readLuminosityBlock_(boost::shared_ptr<LuminosityBlockPrincipal> lumiPrincipal) {
00338     lumiPrincipal->fillLuminosityBlockPrincipal();
00339     return lumiPrincipal;
00340   }
00341 
00342   EventPrincipal*
00343   InputSource::readEvent(EventPrincipal& ep) {
00344     assert(state_ == IsEvent);
00345     assert(!eventLimitReached());
00346 
00347     EventPrincipal* result = callWithTryCatchAndPrint<EventPrincipal*>( [this,&ep](){ return readEvent_(ep); }, "Calling InputSource::readEvent_" );
00348     if(receiver_) {
00349       --numberOfEventsBeforeBigSkip_;
00350     }
00351 
00352     if(result != 0) {
00353       Event event(*result, moduleDescription());
00354       postRead(event);
00355       if(remainingEvents_ > 0) --remainingEvents_;
00356       ++readCount_;
00357       setTimestamp(result->time());
00358       issueReports(result->id());
00359     }
00360     return result;
00361   }
00362 
00363   EventPrincipal*
00364   InputSource::readEvent(EventPrincipal& ep, EventID const& eventID) {
00365     EventPrincipal* result = 0;
00366 
00367     if(!limitReached()) {
00368       //result = callWithTryCatchAndPrint<EventPrincipal*>( [this,ep,&eventID](){ return readIt(eventID, ep); }, "Calling InputSource::readIt" );
00369       result = readIt(eventID, ep);
00370 
00371       if(result != 0) {
00372         Event event(*result, moduleDescription());
00373         postRead(event);
00374         if(remainingEvents_ > 0) --remainingEvents_;
00375         ++readCount_;
00376         issueReports(result->id());
00377       }
00378     }
00379     return result;
00380   }
00381 
00382   void
00383   InputSource::skipEvents(int offset) {
00384     callWithTryCatchAndPrint<void>( [this,&offset](){ skip(offset); }, "Calling InputSource::skip" );
00385   }
00386 
00387   bool
00388   InputSource::goToEvent(EventID const& eventID) {
00389     return callWithTryCatchAndPrint<bool>( [this,&eventID](){ return goToEvent_(eventID); }, "Calling InputSource::goToEvent_" );
00390   }
00391 
00392   void
00393   InputSource::rewind() {
00394     state_ = IsInvalid;
00395     remainingEvents_ = maxEvents_;
00396     setNewRun();
00397     setNewLumi();
00398     resetEventCached();
00399     callWithTryCatchAndPrint<void>( [this](){ rewind_(); }, "Calling InputSource::rewind_" );
00400     if(receiver_) {
00401       unsigned int numberToSkip = receiver_->numberToSkip();
00402       skip(numberToSkip);
00403       decreaseRemainingEventsBy(numberToSkip);
00404     }
00405   }
00406 
00407   void
00408   InputSource::issueReports(EventID const& eventID) {
00409     if(isInfoEnabled()) {
00410       LogVerbatim("FwkReport") << "Begin processing the " << readCount_
00411                                << suffix(readCount_) << " record. Run " << eventID.run()
00412                                << ", Event " << eventID.event()
00413                                << ", LumiSection " << eventID.luminosityBlock()
00414                                << " at " << std::setprecision(3) << TimeOfDay();
00415     }
00416     if(!statusFileName_.empty()) {
00417       std::ofstream statusFile(statusFileName_.c_str());
00418       statusFile << eventID << " time: " << std::setprecision(3) << TimeOfDay() << '\n';
00419       statusFile.close();
00420     }
00421 
00422     // At some point we may want to initiate checkpointing here
00423   }
00424 
00425   EventPrincipal*
00426   InputSource::readIt(EventID const&, EventPrincipal&) {
00427     throw Exception(errors::LogicError)
00428       << "InputSource::readIt()\n"
00429       << "Random access is not implemented for this type of Input Source\n"
00430       << "Contact a Framework Developer\n";
00431   }
00432 
00433   void
00434   InputSource::setRun(RunNumber_t) {
00435     throw Exception(errors::LogicError)
00436       << "InputSource::setRun()\n"
00437       << "Run number cannot be modified for this type of Input Source\n"
00438       << "Contact a Framework Developer\n";
00439   }
00440 
00441   void
00442   InputSource::setLumi(LuminosityBlockNumber_t) {
00443     throw Exception(errors::LogicError)
00444       << "InputSource::setLumi()\n"
00445       << "Luminosity Block ID cannot be modified for this type of Input Source\n"
00446       << "Contact a Framework Developer\n";
00447   }
00448 
00449   void
00450   InputSource::skip(int) {
00451     throw Exception(errors::LogicError)
00452       << "InputSource::skip()\n"
00453       << "Forking and random access are not implemented for this type of Input Source\n"
00454       << "Contact a Framework Developer\n";
00455   }
00456 
00457   bool
00458   InputSource::goToEvent_(EventID const&) {
00459     throw Exception(errors::LogicError)
00460       << "InputSource::goToEvent_()\n"
00461       << "Random access is not implemented for this type of Input Source\n"
00462       << "Contact a Framework Developer\n";
00463     return true;
00464   }
00465 
00466   void
00467   InputSource::rewind_() {
00468     throw Exception(errors::LogicError)
00469       << "InputSource::rewind()\n"
00470       << "Forking and random access are not implemented for this type of Input Source\n"
00471       << "Contact a Framework Developer\n";
00472   }
00473 
00474   void
00475   InputSource::decreaseRemainingEventsBy(int iSkipped) {
00476     if(-1 == remainingEvents_) {
00477       return;
00478     }
00479     if(iSkipped < remainingEvents_) {
00480       remainingEvents_ -= iSkipped;
00481     } else {
00482       remainingEvents_ = 0;
00483     }
00484   }
00485 
00486   void
00487   InputSource::postRead(Event& event) {
00488     Service<RandomNumberGenerator> rng;
00489     if(rng.isAvailable()) {
00490       rng->postEventRead(event);
00491     }
00492   }
00493 
00494   void
00495   InputSource::doBeginRun(RunPrincipal& rp) {
00496     Run run(rp, moduleDescription());
00497     callWithTryCatchAndPrint<void>( [this,&run](){ beginRun(run); }, "Calling InputSource::beginRun" );
00498     run.commit_();
00499   }
00500 
00501   void
00502   InputSource::doEndRun(RunPrincipal& rp, bool cleaningUpAfterException) {
00503     rp.setEndTime(time_);
00504     Run run(rp, moduleDescription());
00505     callWithTryCatchAndPrint<void>( [this,&run](){ endRun(run); }, "Calling InputSource::endRun", cleaningUpAfterException );
00506     run.commit_();
00507   }
00508 
00509   void
00510   InputSource::doBeginLumi(LuminosityBlockPrincipal& lbp) {
00511     LuminosityBlock lb(lbp, moduleDescription());
00512     callWithTryCatchAndPrint<void>( [this,&lb](){ beginLuminosityBlock(lb); }, "Calling InputSource::beginLuminosityBlock" );
00513     lb.commit_();
00514   }
00515 
00516   void
00517   InputSource::doEndLumi(LuminosityBlockPrincipal& lbp, bool cleaningUpAfterException) {
00518     lbp.setEndTime(time_);
00519     LuminosityBlock lb(lbp, moduleDescription());
00520     callWithTryCatchAndPrint<void>( [this,&lb](){ endLuminosityBlock(lb); }, "Calling InputSource::endLuminosityBlock", cleaningUpAfterException );
00521     lb.commit_();
00522   }
00523 
00524   void
00525   InputSource::doPreForkReleaseResources() {
00526     callWithTryCatchAndPrint<void>( [this](){ preForkReleaseResources(); }, "Calling InputSource::preForkReleaseResources" );
00527   }
00528 
00529   void
00530   InputSource::doPostForkReacquireResources(boost::shared_ptr<multicore::MessageReceiverForSource> iReceiver) {
00531     callWithTryCatchAndPrint<void>( [this, &iReceiver](){ postForkReacquireResources(iReceiver); },
00532                                     "Calling InputSource::postForkReacquireResources" );
00533   }
00534 
00535   bool
00536   InputSource::randomAccess() const {
00537     return callWithTryCatchAndPrint<bool>( [this](){ return randomAccess_(); },
00538                                            "Calling InputSource::randomAccess_" );
00539   }
00540 
00541   ProcessingController::ForwardState
00542   InputSource::forwardState() const {
00543     return callWithTryCatchAndPrint<ProcessingController::ForwardState>( [this](){ return forwardState_(); },
00544                                                                          "Calling InputSource::forwardState_" );
00545   }
00546 
00547   ProcessingController::ReverseState
00548   InputSource::reverseState() const {
00549     return callWithTryCatchAndPrint<ProcessingController::ReverseState>( [this](){ return reverseState_(); },
00550                                                                          "Calling InputSource::reverseState__" );
00551   }
00552 
00553   void
00554   InputSource::beginLuminosityBlock(LuminosityBlock&) {}
00555 
00556   void
00557   InputSource::endLuminosityBlock(LuminosityBlock&) {}
00558 
00559   void
00560   InputSource::beginRun(Run&) {}
00561 
00562   void
00563   InputSource::endRun(Run&) {}
00564 
00565   void
00566   InputSource::beginJob() {}
00567 
00568   void
00569   InputSource::endJob() {}
00570 
00571   void
00572   InputSource::preForkReleaseResources() {}
00573 
00574   void
00575   InputSource::postForkReacquireResources(boost::shared_ptr<multicore::MessageReceiverForSource> iReceiver) {
00576     receiver_ = iReceiver;
00577     receiver_->receive();
00578     numberOfEventsBeforeBigSkip_ = receiver_->numberOfConsecutiveIndices();
00579     rewind();
00580   }
00581 
00582   bool
00583   InputSource::randomAccess_() const {
00584     return false;
00585   }
00586 
00587   ProcessingController::ForwardState
00588   InputSource::forwardState_() const {
00589     return ProcessingController::kUnknownForward;
00590   }
00591 
00592   ProcessingController::ReverseState
00593   InputSource::reverseState_() const {
00594     return ProcessingController::kUnknownReverse;
00595   }
00596 
00597   ProcessHistoryID const&
00598   InputSource::reducedProcessHistoryID() const {
00599     assert(runAuxiliary());
00600     return ProcessHistoryRegistry::instance()->extra().reduceProcessHistoryID(runAuxiliary()->processHistoryID());
00601   }
00602 
00603   RunNumber_t
00604   InputSource::run() const {
00605     assert(runAuxiliary());
00606     return runAuxiliary()->run();
00607   }
00608 
00609   LuminosityBlockNumber_t
00610   InputSource::luminosityBlock() const {
00611     assert(luminosityBlockAuxiliary());
00612     return luminosityBlockAuxiliary()->luminosityBlock();
00613   }
00614 
00615   InputSource::SourceSentry::SourceSentry(Sig& pre, Sig& post) : post_(post) {
00616     pre();
00617   }
00618 
00619   InputSource::SourceSentry::~SourceSentry() {
00620     post_();
00621   }
00622 
00623   InputSource::EventSourceSentry::EventSourceSentry(InputSource const& source) :
00624      sentry_(source.actReg()->preSourceSignal_, source.actReg()->postSourceSignal_) {
00625   }
00626 
00627   InputSource::LumiSourceSentry::LumiSourceSentry(InputSource const& source) :
00628      sentry_(source.actReg()->preSourceLumiSignal_, source.actReg()->postSourceLumiSignal_) {
00629   }
00630 
00631   InputSource::RunSourceSentry::RunSourceSentry(InputSource const& source) :
00632      sentry_(source.actReg()->preSourceRunSignal_, source.actReg()->postSourceRunSignal_) {
00633   }
00634 
00635   InputSource::FileOpenSentry::FileOpenSentry(InputSource const& source) :
00636      sentry_(source.actReg()->preOpenFileSignal_, source.actReg()->postOpenFileSignal_) {
00637   }
00638 
00639   InputSource::FileCloseSentry::FileCloseSentry(InputSource const& source) :
00640      post_(source.actReg()->postCloseFileSignal_) {
00641      source.actReg()->preCloseFileSignal_("", false);
00642   }
00643 
00644   InputSource::FileCloseSentry::FileCloseSentry(InputSource const& source, std::string const& lfn, bool usedFallback) :
00645      post_(source.actReg()->postCloseFileSignal_) {
00646      source.actReg()->preCloseFileSignal_(lfn, usedFallback);
00647   }
00648 
00649   InputSource::FileCloseSentry::~FileCloseSentry() {
00650      post_();
00651   }
00652 }