CMS 3D CMS Logo

/afs/cern.ch/work/a/aaltunda/public/www/CMSSW_6_2_5/src/FWCore/Framework/src/InputSource.cc

Go to the documentation of this file.
00001 /*----------------------------------------------------------------------
00002 ----------------------------------------------------------------------*/
00003 #include "FWCore/Framework/interface/InputSource.h"
00004 
00005 #include "DataFormats/Provenance/interface/ProcessHistory.h"
00006 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
00007 #include "DataFormats/Provenance/interface/ProductRegistry.h"
00008 #include "DataFormats/Provenance/interface/FullHistoryToReducedHistoryMap.h"
00009 #include "FWCore/Framework/interface/Event.h"
00010 #include "FWCore/Framework/interface/EventPrincipal.h"
00011 #include "FWCore/Framework/interface/ExceptionHelpers.h"
00012 #include "FWCore/Framework/interface/FileBlock.h"
00013 #include "FWCore/Framework/interface/InputSourceDescription.h"
00014 #include "FWCore/Framework/interface/LuminosityBlock.h"
00015 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
00016 #include "FWCore/Framework/interface/MessageReceiverForSource.h"
00017 #include "FWCore/Framework/interface/Run.h"
00018 #include "FWCore/Framework/interface/RunPrincipal.h"
00019 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00020 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00021 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
00022 #include "FWCore/ServiceRegistry/interface/Service.h"
00023 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
00024 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
00025 #include "FWCore/Utilities/interface/do_nothing_deleter.h"
00026 #include "FWCore/Utilities/interface/GlobalIdentifier.h"
00027 #include "FWCore/Utilities/interface/RandomNumberGenerator.h"
00028 #include "FWCore/Utilities/interface/TimeOfDay.h"
00029 
00030 #include <cassert>
00031 #include <fstream>
00032 #include <iomanip>
00033 
00034 namespace edm {
00035 
00036   namespace {
00037         std::string const& suffix(int count) {
00038           static std::string const st("st");
00039           static std::string const nd("nd");
00040           static std::string const rd("rd");
00041           static std::string const th("th");
00042           // *0, *4 - *9 use "th".
00043           int lastDigit = count % 10;
00044           if(lastDigit >= 4 || lastDigit == 0) return th;
00045           // *11, *12, or *13 use "th".
00046           if(count % 100 - lastDigit == 10) return th;
00047           return (lastDigit == 1 ? st : (lastDigit == 2 ? nd : rd));
00048         }
00049         template <typename T>
00050         boost::shared_ptr<T> createSharedPtrToStatic(T* ptr) {
00051           return boost::shared_ptr<T>(ptr, do_nothing_deleter());
00052         }
00053   }
00054 
00055   InputSource::InputSource(ParameterSet const& pset, InputSourceDescription const& desc) :
00056       ProductRegistryHelper(),
00057       actReg_(desc.actReg_),
00058       maxEvents_(desc.maxEvents_),
00059       remainingEvents_(maxEvents_),
00060       maxLumis_(desc.maxLumis_),
00061       remainingLumis_(maxLumis_),
00062       readCount_(0),
00063       processingMode_(RunsLumisAndEvents),
00064       moduleDescription_(desc.moduleDescription_),
00065       productRegistry_(createSharedPtrToStatic<ProductRegistry const>(desc.productRegistry_)),
00066       branchIDListHelper_(desc.branchIDListHelper_),
00067       primary_(pset.getParameter<std::string>("@module_label") == std::string("@main_input")),
00068       processGUID_(primary_ ? createGlobalIdentifier() : std::string()),
00069       time_(),
00070       newRun_(true),
00071       newLumi_(true),
00072       eventCached_(false),
00073       state_(IsInvalid),
00074       runAuxiliary_(),
00075       lumiAuxiliary_(),
00076       statusFileName_(),
00077       receiver_(),
00078       numberOfEventsBeforeBigSkip_(0) {
00079 
00080     if(pset.getUntrackedParameter<bool>("writeStatusFile", false)) {
00081       std::ostringstream statusfilename;
00082       statusfilename << "source_" << getpid();
00083       statusFileName_ = statusfilename.str();
00084     }
00085 
00086     // Secondary input sources currently do not have a product registry.
00087     if(primary_) {
00088       assert(desc.productRegistry_ != 0);
00089     }
00090     std::string const defaultMode("RunsLumisAndEvents");
00091     std::string const runMode("Runs");
00092     std::string const runLumiMode("RunsAndLumis");
00093 
00094     // The default value provided as the second argument to the getUntrackedParameter function call
00095     // is not used when the ParameterSet has been validated and the parameters are not optional
00096     // in the description.  As soon as all primary input sources and all modules with a secondary
00097     // input sources have defined descriptions, the defaults in the getUntrackedParameterSet function
00098     // calls can and should be deleted from the code.
00099     std::string processingMode = pset.getUntrackedParameter<std::string>("processingMode", defaultMode);
00100     if(processingMode == runMode) {
00101       processingMode_ = Runs;
00102     } else if(processingMode == runLumiMode) {
00103       processingMode_ = RunsAndLumis;
00104     } else if(processingMode != defaultMode) {
00105       throw Exception(errors::Configuration)
00106         << "InputSource::InputSource()\n"
00107         << "The 'processingMode' parameter for sources has an illegal value '" << processingMode << "'\n"
00108         << "Legal values are '" << defaultMode << "', '" << runLumiMode << "', or '" << runMode << "'.\n";
00109     }
00110   }
00111 
00112   InputSource::~InputSource() {}
00113 
00114   void
00115   InputSource::fillDescriptions(ConfigurationDescriptions& descriptions) {
00116     ParameterSetDescription desc;
00117     desc.setUnknown();
00118     descriptions.addDefault(desc);
00119   }
00120 
00121   void
00122   InputSource::prevalidate(ConfigurationDescriptions& ) {
00123   }
00124   
00125 
00126   static std::string const kBaseType("Source");
00127 
00128   std::string const&
00129   InputSource::baseType() {
00130     return kBaseType;
00131   }
00132 
00133   void
00134   InputSource::fillDescription(ParameterSetDescription& desc) {
00135     std::string defaultString("RunsLumisAndEvents");
00136     desc.addUntracked<std::string>("processingMode", defaultString)->setComment(
00137     "'RunsLumisAndEvents': process runs, lumis, and events.\n"
00138     "'RunsAndLumis':       process runs and lumis (not events).\n"
00139     "'Runs':               process runs (not lumis or events).");
00140     desc.addUntracked<bool>("writeStatusFile", false)->setComment("Write a status file. Intended for use by workflow management.");
00141   }
00142 
00143   bool
00144   InputSource::skipForForking() {
00145     if(eventLimitReached()) {
00146       return false;
00147     }
00148     if(receiver_ && 0 == numberOfEventsBeforeBigSkip_) {
00149       receiver_->receive();
00150       unsigned long toSkip = receiver_->numberToSkip();
00151       if(0 != toSkip) {
00152         skipEvents(toSkip);
00153         decreaseRemainingEventsBy(toSkip);
00154       }
00155       numberOfEventsBeforeBigSkip_ = receiver_->numberOfConsecutiveIndices();
00156       if(0 == numberOfEventsBeforeBigSkip_ or 0 == remainingEvents() or 0 == remainingLuminosityBlocks()) {
00157         return false;
00158       }
00159     }
00160     return true;
00161   }
00162 
00163   // This next function is to guarantee that "runs only" mode does not return events or lumis,
00164   // and that "runs and lumis only" mode does not return events.
00165   // For input sources that are not random access (e.g. you need to read through the events
00166   // to get to the lumis and runs), this is all that is involved to implement these modes.
00167   // For input sources where events or lumis can be skipped, getNextItemType() should
00168   // implement the skipping internally, so that the performance gain is realized.
00169   // If this is done for a source, the 'if' blocks in this function will never be entered
00170   // for that source.
00171   InputSource::ItemType
00172   InputSource::nextItemType_() {
00173     ItemType itemType = callWithTryCatchAndPrint<ItemType>( [this](){ return getNextItemType(); }, "Calling InputSource::getNextItemType" );
00174 
00175     if(itemType == IsEvent && processingMode() != RunsLumisAndEvents) {
00176       skipEvents(1);
00177       return nextItemType_();
00178     }
00179     if(itemType == IsLumi && processingMode() == Runs) {
00180       // QQQ skipLuminosityBlock_();
00181       return nextItemType_();
00182     }
00183     return itemType;
00184   }
00185 
00186   InputSource::ItemType
00187   InputSource::nextItemType() {
00188     ItemType oldState = state_;
00189     if(eventLimitReached()) {
00190       // If the maximum event limit has been reached, stop.
00191       state_ = IsStop;
00192     } else if(lumiLimitReached()) {
00193       // If the maximum lumi limit has been reached, stop
00194       // when reaching a new file, run, or lumi.
00195       if(oldState == IsInvalid || oldState == IsFile || oldState == IsRun || processingMode() != RunsLumisAndEvents) {
00196         state_ = IsStop;
00197       } else {
00198         ItemType newState = nextItemType_();
00199         if(newState == IsEvent) {
00200           assert (processingMode() == RunsLumisAndEvents);
00201           state_ = IsEvent;
00202         } else {
00203           state_ = IsStop;
00204         }
00205       }
00206     } else {
00207       ItemType newState = nextItemType_();
00208       if(newState == IsStop) {
00209         state_ = IsStop;
00210       } else if(newState == IsFile || oldState == IsInvalid) {
00211         state_ = IsFile;
00212       } else if(newState == IsRun || oldState == IsFile) {
00213         runAuxiliary_ = readRunAuxiliary();
00214         state_ = IsRun;
00215       } else if(newState == IsLumi || oldState == IsRun) {
00216         assert (processingMode() != Runs);
00217         lumiAuxiliary_ = readLuminosityBlockAuxiliary();
00218         state_ = IsLumi;
00219       } else {
00220         assert (processingMode() == RunsLumisAndEvents);
00221         state_ = IsEvent;
00222       }
00223     }
00224     if(state_ == IsStop) {
00225       lumiAuxiliary_.reset();
00226       runAuxiliary_.reset();
00227     }
00228     return state_;
00229   }
00230 
00231   boost::shared_ptr<LuminosityBlockAuxiliary>
00232   InputSource::readLuminosityBlockAuxiliary() {
00233     return callWithTryCatchAndPrint<boost::shared_ptr<LuminosityBlockAuxiliary> >( [this](){ return readLuminosityBlockAuxiliary_(); },
00234                                                                                    "Calling InputSource::readLuminosityBlockAuxiliary_" );
00235   }
00236 
00237   boost::shared_ptr<RunAuxiliary>
00238   InputSource::readRunAuxiliary() {
00239     return callWithTryCatchAndPrint<boost::shared_ptr<RunAuxiliary> >( [this](){ return readRunAuxiliary_(); },
00240                                                                        "Calling InputSource::readRunAuxiliary_" );
00241   }
00242 
00243   void
00244   InputSource::doBeginJob() {
00245     this->beginJob();
00246   }
00247 
00248   void
00249   InputSource::doEndJob() {
00250     endJob();
00251   }
00252 
00253   void
00254   InputSource::registerProducts() {
00255     if(!typeLabelList().empty()) {
00256       addToRegistry(typeLabelList().begin(), typeLabelList().end(), moduleDescription(), productRegistryUpdate());
00257     }
00258   }
00259 
00260   // Return a dummy file block.
00261   std::unique_ptr<FileBlock>
00262   InputSource::readFile() {
00263     assert(state_ == IsFile);
00264     assert(!limitReached());
00265     return callWithTryCatchAndPrint<std::unique_ptr<FileBlock> >( [this](){ return readFile_(); },
00266                                                                   "Calling InputSource::readFile_" );
00267   }
00268 
00269   void
00270   InputSource::closeFile(FileBlock* fb, bool cleaningUpAfterException) {
00271     if(fb != nullptr) fb->close();
00272     callWithTryCatchAndPrint<void>( [this](){ closeFile_(); },
00273                                     "Calling InputSource::closeFile_",
00274                                     cleaningUpAfterException );
00275     return;
00276   }
00277 
00278   // Return a dummy file block.
00279   // This function must be overridden for any input source that reads a file
00280   // containing Products.
00281   std::unique_ptr<FileBlock>
00282   InputSource::readFile_() {
00283     return std::unique_ptr<FileBlock>(new FileBlock);
00284   }
00285 
00286   boost::shared_ptr<RunPrincipal>
00287   InputSource::readAndCacheRun(HistoryAppender& historyAppender) {
00288     RunSourceSentry sentry(*this);
00289     boost::shared_ptr<RunPrincipal> rp(new RunPrincipal(runAuxiliary(), productRegistry_, processConfiguration(), &historyAppender));
00290     callWithTryCatchAndPrint<boost::shared_ptr<RunPrincipal> >( [this,&rp](){ return readRun_(rp); }, "Calling InputSource::readRun_" );
00291     return rp;
00292   }
00293 
00294   void
00295   InputSource::readAndMergeRun(boost::shared_ptr<RunPrincipal> rp) {
00296     RunSourceSentry sentry(*this);
00297     callWithTryCatchAndPrint<boost::shared_ptr<RunPrincipal> >( [this,&rp](){ return readRun_(rp); }, "Calling InputSource::readRun_" );
00298   }
00299 
00300   boost::shared_ptr<LuminosityBlockPrincipal>
00301   InputSource::readAndCacheLumi(HistoryAppender& historyAppender) {
00302     LumiSourceSentry sentry(*this);
00303     boost::shared_ptr<LuminosityBlockPrincipal> lbp(
00304       new LuminosityBlockPrincipal(luminosityBlockAuxiliary(),
00305                                    productRegistry_,
00306                                    processConfiguration(),
00307                                    &historyAppender));
00308     callWithTryCatchAndPrint<boost::shared_ptr<LuminosityBlockPrincipal> >( [this,&lbp](){ return readLuminosityBlock_(lbp); },
00309                                                                             "Calling InputSource::readLuminosityBlock_" );
00310     if(remainingLumis_ > 0) {
00311       --remainingLumis_;
00312     }
00313     return lbp;
00314   }
00315 
00316   void
00317   InputSource::readAndMergeLumi(boost::shared_ptr<LuminosityBlockPrincipal> lbp) {
00318     LumiSourceSentry sentry(*this);
00319     callWithTryCatchAndPrint<boost::shared_ptr<LuminosityBlockPrincipal> >( [this,&lbp](){ return readLuminosityBlock_(lbp); },
00320                                                                             "Calling InputSource::readLuminosityBlock_" );
00321     if(remainingLumis_ > 0) {
00322       --remainingLumis_;
00323     }
00324   }
00325 
00326   boost::shared_ptr<RunPrincipal>
00327   InputSource::readRun_(boost::shared_ptr<RunPrincipal> runPrincipal) {
00328     // Note: For the moment, we do not support saving and restoring the state of the
00329     // random number generator if random numbers are generated during processing of runs
00330     // (e.g. beginRun(), endRun())
00331     runPrincipal->fillRunPrincipal();
00332     return runPrincipal;
00333   }
00334 
00335   boost::shared_ptr<LuminosityBlockPrincipal>
00336   InputSource::readLuminosityBlock_(boost::shared_ptr<LuminosityBlockPrincipal> lumiPrincipal) {
00337     lumiPrincipal->fillLuminosityBlockPrincipal();
00338     return lumiPrincipal;
00339   }
00340 
00341   EventPrincipal*
00342   InputSource::readEvent(EventPrincipal& ep) {
00343     assert(state_ == IsEvent);
00344     assert(!eventLimitReached());
00345 
00346     EventPrincipal* result = callWithTryCatchAndPrint<EventPrincipal*>( [this,&ep](){ return readEvent_(ep); }, "Calling InputSource::readEvent_" );
00347     if(receiver_) {
00348       --numberOfEventsBeforeBigSkip_;
00349     }
00350 
00351     if(result != 0) {
00352       Event event(*result, moduleDescription());
00353       postRead(event);
00354       if(remainingEvents_ > 0) --remainingEvents_;
00355       ++readCount_;
00356       setTimestamp(result->time());
00357       issueReports(result->id());
00358     }
00359     return result;
00360   }
00361 
00362   EventPrincipal*
00363   InputSource::readEvent(EventPrincipal& ep, EventID const& eventID) {
00364     EventPrincipal* result = 0;
00365 
00366     if(!limitReached()) {
00367       //result = callWithTryCatchAndPrint<EventPrincipal*>( [this,ep,&eventID](){ return readIt(eventID, ep); }, "Calling InputSource::readIt" );
00368       result = readIt(eventID, ep);
00369 
00370       if(result != 0) {
00371         Event event(*result, moduleDescription());
00372         postRead(event);
00373         if(remainingEvents_ > 0) --remainingEvents_;
00374         ++readCount_;
00375         issueReports(result->id());
00376       }
00377     }
00378     return result;
00379   }
00380 
00381   void
00382   InputSource::skipEvents(int offset) {
00383     callWithTryCatchAndPrint<void>( [this,&offset](){ skip(offset); }, "Calling InputSource::skip" );
00384   }
00385 
00386   bool
00387   InputSource::goToEvent(EventID const& eventID) {
00388     return callWithTryCatchAndPrint<bool>( [this,&eventID](){ return goToEvent_(eventID); }, "Calling InputSource::goToEvent_" );
00389   }
00390 
00391   void
00392   InputSource::rewind() {
00393     state_ = IsInvalid;
00394     remainingEvents_ = maxEvents_;
00395     setNewRun();
00396     setNewLumi();
00397     resetEventCached();
00398     callWithTryCatchAndPrint<void>( [this](){ rewind_(); }, "Calling InputSource::rewind_" );
00399     if(receiver_) {
00400       unsigned int numberToSkip = receiver_->numberToSkip();
00401       skip(numberToSkip);
00402       decreaseRemainingEventsBy(numberToSkip);
00403     }
00404   }
00405 
00406   void
00407   InputSource::issueReports(EventID const& eventID) {
00408     if(isInfoEnabled()) {
00409       LogVerbatim("FwkReport") << "Begin processing the " << readCount_
00410                                << suffix(readCount_) << " record. Run " << eventID.run()
00411                                << ", Event " << eventID.event()
00412                                << ", LumiSection " << eventID.luminosityBlock()
00413                                << " at " << std::setprecision(3) << TimeOfDay();
00414     }
00415     if(!statusFileName_.empty()) {
00416       std::ofstream statusFile(statusFileName_.c_str());
00417       statusFile << eventID << " time: " << std::setprecision(3) << TimeOfDay() << '\n';
00418       statusFile.close();
00419     }
00420 
00421     // At some point we may want to initiate checkpointing here
00422   }
00423 
00424   EventPrincipal*
00425   InputSource::readIt(EventID const&, EventPrincipal&) {
00426     throw Exception(errors::LogicError)
00427       << "InputSource::readIt()\n"
00428       << "Random access is not implemented for this type of Input Source\n"
00429       << "Contact a Framework Developer\n";
00430   }
00431 
00432   void
00433   InputSource::setRun(RunNumber_t) {
00434     throw Exception(errors::LogicError)
00435       << "InputSource::setRun()\n"
00436       << "Run number cannot be modified for this type of Input Source\n"
00437       << "Contact a Framework Developer\n";
00438   }
00439 
00440   void
00441   InputSource::setLumi(LuminosityBlockNumber_t) {
00442     throw Exception(errors::LogicError)
00443       << "InputSource::setLumi()\n"
00444       << "Luminosity Block ID cannot be modified for this type of Input Source\n"
00445       << "Contact a Framework Developer\n";
00446   }
00447 
00448   void
00449   InputSource::skip(int) {
00450     throw Exception(errors::LogicError)
00451       << "InputSource::skip()\n"
00452       << "Forking and random access are not implemented for this type of Input Source\n"
00453       << "Contact a Framework Developer\n";
00454   }
00455 
00456   bool
00457   InputSource::goToEvent_(EventID const&) {
00458     throw Exception(errors::LogicError)
00459       << "InputSource::goToEvent_()\n"
00460       << "Random access is not implemented for this type of Input Source\n"
00461       << "Contact a Framework Developer\n";
00462     return true;
00463   }
00464 
00465   void
00466   InputSource::rewind_() {
00467     throw Exception(errors::LogicError)
00468       << "InputSource::rewind()\n"
00469       << "Forking and random access are not implemented for this type of Input Source\n"
00470       << "Contact a Framework Developer\n";
00471   }
00472 
00473   void
00474   InputSource::decreaseRemainingEventsBy(int iSkipped) {
00475     if(-1 == remainingEvents_) {
00476       return;
00477     }
00478     if(iSkipped < remainingEvents_) {
00479       remainingEvents_ -= iSkipped;
00480     } else {
00481       remainingEvents_ = 0;
00482     }
00483   }
00484 
00485   void
00486   InputSource::postRead(Event& event) {
00487     Service<RandomNumberGenerator> rng;
00488     if(rng.isAvailable()) {
00489       rng->postEventRead(event);
00490     }
00491   }
00492 
00493   void
00494   InputSource::doBeginRun(RunPrincipal& rp) {
00495     Run run(rp, moduleDescription());
00496     callWithTryCatchAndPrint<void>( [this,&run](){ beginRun(run); }, "Calling InputSource::beginRun" );
00497     run.commit_();
00498   }
00499 
00500   void
00501   InputSource::doEndRun(RunPrincipal& rp, bool cleaningUpAfterException) {
00502     rp.setEndTime(time_);
00503     rp.setComplete();
00504     Run run(rp, moduleDescription());
00505     callWithTryCatchAndPrint<void>( [this,&run](){ endRun(run); }, "Calling InputSource::endRun", cleaningUpAfterException );
00506     run.commit_();
00507   }
00508 
00509   void
00510   InputSource::doBeginLumi(LuminosityBlockPrincipal& lbp) {
00511     LuminosityBlock lb(lbp, moduleDescription());
00512     callWithTryCatchAndPrint<void>( [this,&lb](){ beginLuminosityBlock(lb); }, "Calling InputSource::beginLuminosityBlock" );
00513     lb.commit_();
00514   }
00515 
00516   void
00517   InputSource::doEndLumi(LuminosityBlockPrincipal& lbp, bool cleaningUpAfterException) {
00518     lbp.setEndTime(time_);
00519     lbp.setComplete();
00520     LuminosityBlock lb(lbp, moduleDescription());
00521     callWithTryCatchAndPrint<void>( [this,&lb](){ endLuminosityBlock(lb); }, "Calling InputSource::endLuminosityBlock", cleaningUpAfterException );
00522     lb.commit_();
00523   }
00524 
00525   void
00526   InputSource::doPreForkReleaseResources() {
00527     callWithTryCatchAndPrint<void>( [this](){ preForkReleaseResources(); }, "Calling InputSource::preForkReleaseResources" );
00528   }
00529 
00530   void
00531   InputSource::doPostForkReacquireResources(boost::shared_ptr<multicore::MessageReceiverForSource> iReceiver) {
00532     callWithTryCatchAndPrint<void>( [this, &iReceiver](){ postForkReacquireResources(iReceiver); },
00533                                     "Calling InputSource::postForkReacquireResources" );
00534   }
00535 
00536   bool
00537   InputSource::randomAccess() const {
00538     return callWithTryCatchAndPrint<bool>( [this](){ return randomAccess_(); },
00539                                            "Calling InputSource::randomAccess_" );
00540   }
00541 
00542   ProcessingController::ForwardState
00543   InputSource::forwardState() const {
00544     return callWithTryCatchAndPrint<ProcessingController::ForwardState>( [this](){ return forwardState_(); },
00545                                                                          "Calling InputSource::forwardState_" );
00546   }
00547 
00548   ProcessingController::ReverseState
00549   InputSource::reverseState() const {
00550     return callWithTryCatchAndPrint<ProcessingController::ReverseState>( [this](){ return reverseState_(); },
00551                                                                          "Calling InputSource::reverseState__" );
00552   }
00553 
00554   void
00555   InputSource::beginLuminosityBlock(LuminosityBlock&) {}
00556 
00557   void
00558   InputSource::endLuminosityBlock(LuminosityBlock&) {}
00559 
00560   void
00561   InputSource::beginRun(Run&) {}
00562 
00563   void
00564   InputSource::endRun(Run&) {}
00565 
00566   void
00567   InputSource::beginJob() {}
00568 
00569   void
00570   InputSource::endJob() {}
00571 
00572   void
00573   InputSource::preForkReleaseResources() {}
00574 
00575   void
00576   InputSource::postForkReacquireResources(boost::shared_ptr<multicore::MessageReceiverForSource> iReceiver) {
00577     receiver_ = iReceiver;
00578     receiver_->receive();
00579     numberOfEventsBeforeBigSkip_ = receiver_->numberOfConsecutiveIndices();
00580     rewind();
00581   }
00582 
00583   bool
00584   InputSource::randomAccess_() const {
00585     return false;
00586   }
00587 
00588   ProcessingController::ForwardState
00589   InputSource::forwardState_() const {
00590     return ProcessingController::kUnknownForward;
00591   }
00592 
00593   ProcessingController::ReverseState
00594   InputSource::reverseState_() const {
00595     return ProcessingController::kUnknownReverse;
00596   }
00597 
00598   ProcessHistoryID const&
00599   InputSource::reducedProcessHistoryID() const {
00600     assert(runAuxiliary());
00601     return ProcessHistoryRegistry::instance()->extra().reduceProcessHistoryID(runAuxiliary()->processHistoryID());
00602   }
00603 
00604   RunNumber_t
00605   InputSource::run() const {
00606     assert(runAuxiliary());
00607     return runAuxiliary()->run();
00608   }
00609 
00610   LuminosityBlockNumber_t
00611   InputSource::luminosityBlock() const {
00612     assert(luminosityBlockAuxiliary());
00613     return luminosityBlockAuxiliary()->luminosityBlock();
00614   }
00615 
00616   InputSource::SourceSentry::SourceSentry(Sig& pre, Sig& post) : post_(post) {
00617     pre();
00618   }
00619 
00620   InputSource::SourceSentry::~SourceSentry() {
00621     post_();
00622   }
00623 
00624   InputSource::EventSourceSentry::EventSourceSentry(InputSource const& source) :
00625      sentry_(source.actReg()->preSourceSignal_, source.actReg()->postSourceSignal_) {
00626   }
00627 
00628   InputSource::LumiSourceSentry::LumiSourceSentry(InputSource const& source) :
00629      sentry_(source.actReg()->preSourceLumiSignal_, source.actReg()->postSourceLumiSignal_) {
00630   }
00631 
00632   InputSource::RunSourceSentry::RunSourceSentry(InputSource const& source) :
00633      sentry_(source.actReg()->preSourceRunSignal_, source.actReg()->postSourceRunSignal_) {
00634   }
00635 
00636   InputSource::FileOpenSentry::FileOpenSentry(InputSource const& source) :
00637      sentry_(source.actReg()->preOpenFileSignal_, source.actReg()->postOpenFileSignal_) {
00638   }
00639 
00640   InputSource::FileCloseSentry::FileCloseSentry(InputSource const& source) :
00641      post_(source.actReg()->postCloseFileSignal_) {
00642      source.actReg()->preCloseFileSignal_("", false);
00643   }
00644 
00645   InputSource::FileCloseSentry::FileCloseSentry(InputSource const& source, std::string const& lfn, bool usedFallback) :
00646      post_(source.actReg()->postCloseFileSignal_) {
00647      source.actReg()->preCloseFileSignal_(lfn, usedFallback);
00648   }
00649 
00650   InputSource::FileCloseSentry::~FileCloseSentry() {
00651      post_();
00652   }
00653 }