CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_4_1_8_patch9/src/FWCore/Framework/src/InputSource.cc

Go to the documentation of this file.
00001 /*----------------------------------------------------------------------
00002 ----------------------------------------------------------------------*/
00003 #include "FWCore/Framework/interface/InputSource.h"
00004 
00005 #include "DataFormats/Provenance/interface/ProcessHistory.h"
00006 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
00007 #include "FWCore/Framework/interface/Event.h"
00008 #include "FWCore/Framework/interface/EventPrincipal.h"
00009 #include "FWCore/Framework/interface/FileBlock.h"
00010 #include "FWCore/Framework/interface/InputSourceDescription.h"
00011 #include "FWCore/Framework/interface/LuminosityBlock.h"
00012 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
00013 #include "FWCore/Framework/interface/Run.h"
00014 #include "FWCore/Framework/interface/RunPrincipal.h"
00015 #include "FWCore/Framework/src/PrincipalCache.h"
00016 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00017 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00018 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
00019 #include "FWCore/ServiceRegistry/interface/Service.h"
00020 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
00021 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
00022 #include "FWCore/Utilities/interface/do_nothing_deleter.h"
00023 #include "FWCore/Utilities/interface/GlobalIdentifier.h"
00024 #include "FWCore/Utilities/interface/RandomNumberGenerator.h"
00025 #include "FWCore/Utilities/interface/TimeOfDay.h"
00026 
00027 #include <cassert>
00028 #include <fstream>
00029 #include <iomanip>
00030 
00031 namespace edm {
00032 
00033   namespace {
00034         std::string const& suffix(int count) {
00035           static std::string const st("st");
00036           static std::string const nd("nd");
00037           static std::string const rd("rd");
00038           static std::string const th("th");
00039           // *0, *4 - *9 use "th".
00040           int lastDigit = count % 10;
00041           if(lastDigit >= 4 || lastDigit == 0) return th;
00042           // *11, *12, or *13 use "th".
00043           if(count % 100 - lastDigit == 10) return th;
00044           return (lastDigit == 1 ? st : (lastDigit == 2 ? nd : rd));
00045         }
00046         template <typename T>
00047         boost::shared_ptr<T> createSharedPtrToStatic(T* ptr) {
00048           return boost::shared_ptr<T>(ptr, do_nothing_deleter());
00049         }
00050 
00051         ProcessHistoryID
00052         deleteFromProcessHistory(ProcessHistoryID const& phid, std::string const& processName) {
00053         // Delete the current process from the process history.  This must be done to maintain consistency
00054         // for runs or lumis when the principal cache is flushed, because the process history modified flag,
00055         // stored in the principal, is lost when the cache is flushed.
00056           if(!phid.isValid()) {
00057             return phid;
00058           }
00059           ProcessHistory ph;
00060           bool found = ProcessHistoryRegistry::instance()->getMapped(phid, ph);
00061           assert(found);
00062           ProcessHistory newPH;
00063           newPH.reserve(ph.size());
00064           for(ProcessHistory::const_iterator it = ph.begin(), itEnd = ph.end(); it != itEnd; ++it) {
00065             if(processName != it->processName()) {
00066               newPH.push_back(*it);
00067             }
00068           }
00069           ProcessHistoryRegistry::instance()->insertMapped(newPH);
00070           return newPH.id();
00071         }
00072   }
00073 
00074   InputSource::InputSource(ParameterSet const& pset, InputSourceDescription const& desc) :
00075       ProductRegistryHelper(),
00076       actReg_(desc.actReg_),
00077       principalCache_(desc.principalCache_),
00078       maxEvents_(desc.maxEvents_),
00079       remainingEvents_(maxEvents_),
00080       maxLumis_(desc.maxLumis_),
00081       remainingLumis_(maxLumis_),
00082       readCount_(0),
00083       processingMode_(RunsLumisAndEvents),
00084       moduleDescription_(desc.moduleDescription_),
00085       productRegistry_(createSharedPtrToStatic<ProductRegistry const>(desc.productRegistry_)),
00086       primary_(pset.getParameter<std::string>("@module_label") == std::string("@main_input")),
00087       processGUID_(primary_ ? createGlobalIdentifier() : std::string()),
00088       time_(),
00089       doneReadAhead_(false),
00090       state_(IsInvalid),
00091       runAuxiliary_(),
00092       lumiAuxiliary_(),
00093       runPrematurelyRead_(false),
00094       lumiPrematurelyRead_(false),
00095       statusFileName_() {
00096 
00097     if(pset.getUntrackedParameter<bool>("writeStatusFile", false)) {
00098       std::ostringstream statusfilename;
00099       statusfilename << "source_" << getpid();
00100       statusFileName_ = statusfilename.str();
00101     }
00102 
00103     // Secondary input sources currently do not have a product registry.
00104     if(primary_) {
00105       assert(desc.productRegistry_ != 0);
00106     }
00107     std::string const defaultMode("RunsLumisAndEvents");
00108     std::string const runMode("Runs");
00109     std::string const runLumiMode("RunsAndLumis");
00110 
00111     // The default value provided as the second argument to the getUntrackedParameter function call
00112     // is not used when the ParameterSet has been validated and the parameters are not optional
00113     // in the description.  As soon as all primary input sources and all modules with a secondary
00114     // input sources have defined descriptions, the defaults in the getUntrackedParameterSet function
00115     // calls can and should be deleted from the code.
00116     std::string processingMode = pset.getUntrackedParameter<std::string>("processingMode", defaultMode);
00117     if(processingMode == runMode) {
00118       processingMode_ = Runs;
00119     } else if(processingMode == runLumiMode) {
00120       processingMode_ = RunsAndLumis;
00121     } else if(processingMode != defaultMode) {
00122       throw Exception(errors::Configuration)
00123         << "InputSource::InputSource()\n"
00124         << "The 'processingMode' parameter for sources has an illegal value '" << processingMode << "'\n"
00125         << "Legal values are '" << defaultMode << "', '" << runLumiMode << "', or '" << runMode << "'.\n";
00126     }
00127   }
00128 
00129   InputSource::~InputSource() {}
00130 
00131   void
00132   InputSource::fillDescriptions(ConfigurationDescriptions& descriptions) {
00133     ParameterSetDescription desc;
00134     desc.setUnknown();
00135     descriptions.addDefault(desc);
00136   }
00137 
00138   static std::string const kBaseType("Source");
00139 
00140   std::string const&
00141   InputSource::baseType() {
00142     return kBaseType;
00143   }
00144 
00145   void
00146   InputSource::fillDescription(ParameterSetDescription& desc) {
00147     std::string defaultString("RunsLumisAndEvents");
00148     desc.addUntracked<std::string>("processingMode", defaultString)->setComment(
00149     "'RunsLumisAndEvents': process runs, lumis, and events.\n"
00150     "'RunsAndLumis':       process runs and lumis (not events).\n"
00151     "'Runs':               process runs (not lumis or events).");
00152     desc.addUntracked<bool>("writeStatusFile", false)->setComment("Write a status file. Intended for use by workflow management.");
00153   }
00154 
00155   EventPrincipal* const
00156   InputSource::eventPrincipalCache() {
00157     return &principalCache().eventPrincipal();
00158   }
00159 
00160   // This next function is to guarantee that "runs only" mode does not return events or lumis,
00161   // and that "runs and lumis only" mode does not return events.
00162   // For input sources that are not random access (e.g. you need to read through the events
00163   // to get to the lumis and runs), this is all that is involved to implement these modes.
00164   // For input sources where events or lumis can be skipped, getNextItemType() should
00165   // implement the skipping internally, so that the performance gain is realized.
00166   // If this is done for a source, the 'if' blocks in this function will never be entered
00167   // for that source.
00168   InputSource::ItemType
00169   InputSource::nextItemType_() {
00170     ItemType itemType = getNextItemType();
00171     if(itemType == IsEvent && processingMode() != RunsLumisAndEvents) {
00172       readEvent_();
00173       return nextItemType_();
00174     }
00175     if(itemType == IsLumi && processingMode() == Runs) {
00176       // QQQ skipLuminosityBlock_();
00177       return nextItemType_();
00178     }
00179     return itemType;
00180   }
00181 
00182   InputSource::ItemType
00183   InputSource::nextItemType() {
00184     if(doneReadAhead_) {
00185       return state_;
00186     }
00187     doneReadAhead_ = true;
00188     ItemType oldState = state_;
00189     if(eventLimitReached()) {
00190       // If the maximum event limit has been reached, stop.
00191       state_ = IsStop;
00192     } else if(lumiLimitReached()) {
00193       // If the maximum lumi limit has been reached, stop
00194       // when reaching a new file, run, or lumi.
00195       if(oldState == IsInvalid || oldState == IsFile || oldState == IsRun || processingMode() != RunsLumisAndEvents) {
00196         state_ = IsStop;
00197       } else {
00198         ItemType newState = nextItemType_();
00199         if(newState == IsEvent) {
00200           assert (processingMode() == RunsLumisAndEvents);
00201           state_ = IsEvent;
00202         } else {
00203           state_ = IsStop;
00204         }
00205       }
00206     } else {
00207       ItemType newState = nextItemType_();
00208       if(newState == IsStop) {
00209         state_ = IsStop;
00210       } else if(newState == IsFile || oldState == IsInvalid) {
00211         state_ = IsFile;
00212       } else if(newState == IsRun || oldState == IsFile) {
00213         runAuxiliary_ = readRunAuxiliary();
00214         state_ = IsRun;
00215       } else if(newState == IsLumi || oldState == IsRun) {
00216         assert (processingMode() != Runs);
00217         lumiAuxiliary_ = readLuminosityBlockAuxiliary();
00218         state_ = IsLumi;
00219       } else {
00220         assert (processingMode() == RunsLumisAndEvents);
00221         state_ = IsEvent;
00222       }
00223     }
00224     if(state_ == IsStop) {
00225       lumiAuxiliary_.reset();
00226       runAuxiliary_.reset();
00227     }
00228     return state_;
00229   }
00230 
00231   void
00232   InputSource::doBeginJob() {
00233     this->beginJob();
00234   }
00235 
00236   void
00237   InputSource::doEndJob() {
00238     endJob();
00239   }
00240 
00241   void
00242   InputSource::registerProducts() {
00243     if(!typeLabelList().empty()) {
00244       addToRegistry(typeLabelList().begin(), typeLabelList().end(), moduleDescription(), productRegistryUpdate());
00245     }
00246   }
00247 
00248   // Return a dummy file block.
00249   boost::shared_ptr<FileBlock>
00250   InputSource::readFile() {
00251     assert(doneReadAhead_);
00252     assert(state_ == IsFile);
00253     assert(!limitReached());
00254     doneReadAhead_ = false;
00255     boost::shared_ptr<FileBlock> fb = readFile_();
00256     return fb;
00257   }
00258 
00259   void
00260   InputSource::closeFile(boost::shared_ptr<FileBlock> fb) {
00261     fb->close();
00262     closeFile_();
00263     return;
00264   }
00265 
00266   // Return a dummy file block.
00267   // This function must be overridden for any input source that reads a file
00268   // containing Products.
00269   boost::shared_ptr<FileBlock>
00270   InputSource::readFile_() {
00271     return boost::shared_ptr<FileBlock>(new FileBlock);
00272   }
00273 
00274   boost::shared_ptr<RunPrincipal> const
00275   InputSource::runPrincipal() const {
00276     return principalCache_->runPrincipalPtr();
00277   }
00278 
00279   boost::shared_ptr<LuminosityBlockPrincipal> const
00280   InputSource::luminosityBlockPrincipal() const {
00281     return principalCache_->lumiPrincipalPtr();
00282   }
00283 
00284   void
00285   InputSource::readAndCacheRun() {
00286     if(runPrematurelyRead_) {
00287       runPrematurelyRead_ = false;
00288       return;
00289     }
00290     RunSourceSentry(*this);
00291     bool merged = principalCache_->merge(runAuxiliary(), productRegistry_);
00292     if(!merged) {
00293       boost::shared_ptr<RunPrincipal> rp(new RunPrincipal(runAuxiliary(), productRegistry_, processConfiguration()));
00294       principalCache_->insert(rp);
00295     }
00296     readRun_(principalCache_->runPrincipalPtr());
00297   }
00298 
00299   int
00300   InputSource::markRun() {
00301     assert(doneReadAhead_);
00302     assert(state_ == IsRun);
00303     assert(!limitReached());
00304     doneReadAhead_ = false;
00305     return principalCache_->runPrincipal().run();
00306   }
00307 
00308   void
00309   InputSource::readAndCacheLumi() {
00310     if(lumiPrematurelyRead_) {
00311       lumiPrematurelyRead_ = false;
00312       return;
00313     }
00314     LumiSourceSentry(*this);
00315     bool merged = principalCache_->merge(luminosityBlockAuxiliary(), productRegistry_);
00316     if(!merged) {
00317       boost::shared_ptr<LuminosityBlockPrincipal> lb(
00318         new LuminosityBlockPrincipal(luminosityBlockAuxiliary(),
00319                                      productRegistry_,
00320                                      processConfiguration(),
00321                                      principalCache_->runPrincipalPtr()));
00322       principalCache_->insert(lb);
00323     }
00324     readLuminosityBlock_(principalCache_->lumiPrincipalPtr());
00325   }
00326 
00327   int
00328   InputSource::markLumi() {
00329     assert(doneReadAhead_);
00330     assert(state_ == IsLumi);
00331     assert(!limitReached());
00332     doneReadAhead_ = false;
00333     --remainingLumis_;
00334     assert(principalCache_->lumiPrincipal().luminosityBlock() == luminosityBlockAuxiliary()->luminosityBlock());
00335     return principalCache_->lumiPrincipal().luminosityBlock();
00336   }
00337 
00338   boost::shared_ptr<RunPrincipal>
00339   InputSource::readRun_(boost::shared_ptr<RunPrincipal> rpCache) {
00340     // Note: For the moment, we do not support saving and restoring the state of the
00341     // random number generator if random numbers are generated during processing of runs
00342     // (e.g. beginRun(), endRun())
00343     rpCache->fillRunPrincipal();
00344     return rpCache;
00345   }
00346 
00347   boost::shared_ptr<LuminosityBlockPrincipal>
00348   InputSource::readLuminosityBlock_(boost::shared_ptr<LuminosityBlockPrincipal> lbCache) {
00349     lbCache->fillLuminosityBlockPrincipal();
00350     return lbCache;
00351   }
00352 
00353   EventPrincipal*
00354   InputSource::readEvent(boost::shared_ptr<LuminosityBlockPrincipal> lbCache) {
00355     assert(doneReadAhead_);
00356     assert(state_ == IsEvent);
00357     assert(!eventLimitReached());
00358     doneReadAhead_ = false;
00359 
00360     EventPrincipal* result = readEvent_();
00361     if(result != 0) {
00362       assert(lbCache->run() == result->run());
00363       assert(lbCache->luminosityBlock() == result->luminosityBlock());
00364       Event event(*result, moduleDescription());
00365       postRead(event);
00366       if(remainingEvents_ > 0) --remainingEvents_;
00367       ++readCount_;
00368       setTimestamp(result->time());
00369       issueReports(result->id());
00370     }
00371     return result;
00372   }
00373 
00374   EventPrincipal*
00375   InputSource::readEvent(EventID const& eventID) {
00376     EventPrincipal* result = 0;
00377 
00378     if(!limitReached()) {
00379       result = readIt(eventID);
00380       if(result != 0) {
00381         Event event(*result, moduleDescription());
00382         postRead(event);
00383         if(remainingEvents_ > 0) --remainingEvents_;
00384         ++readCount_;
00385         issueReports(result->id());
00386       }
00387     }
00388     return result;
00389   }
00390 
00391   void
00392   InputSource::skipEvents(int offset) {
00393     doneReadAhead_ = false;
00394     this->skip(offset);
00395   }
00396 
00397   bool
00398   InputSource::goToEvent(EventID const& eventID) {
00399     doneReadAhead_ = false;
00400     return this->goToEvent_(eventID);
00401   }
00402 
00403   void
00404   InputSource::issueReports(EventID const& eventID) {
00405     if(isInfoEnabled()) {
00406       LogVerbatim("FwkReport") << "Begin processing the " << readCount_
00407                                << suffix(readCount_) << " record. Run " << eventID.run()
00408                                << ", Event " << eventID.event()
00409                                << ", LumiSection " << eventID.luminosityBlock()
00410                                << " at " << std::setprecision(3) << TimeOfDay();
00411     }
00412     if(!statusFileName_.empty()) {
00413       std::ofstream statusFile(statusFileName_.c_str());
00414       statusFile << eventID << " time: " << std::setprecision(3) << TimeOfDay() << '\n';
00415       statusFile.close();
00416     }
00417 
00418     // At some point we may want to initiate checkpointing here
00419   }
00420 
00421   EventPrincipal*
00422   InputSource::readIt(EventID const&) {
00423     throw Exception(errors::LogicError)
00424       << "InputSource::readIt()\n"
00425       << "Random access is not implemented for this type of Input Source\n"
00426       << "Contact a Framework Developer\n";
00427   }
00428 
00429   void
00430   InputSource::setRun(RunNumber_t) {
00431     throw Exception(errors::LogicError)
00432       << "InputSource::setRun()\n"
00433       << "Run number cannot be modified for this type of Input Source\n"
00434       << "Contact a Framework Developer\n";
00435   }
00436 
00437   void
00438   InputSource::setLumi(LuminosityBlockNumber_t) {
00439     throw Exception(errors::LogicError)
00440       << "InputSource::setLumi()\n"
00441       << "Luminosity Block ID cannot be modified for this type of Input Source\n"
00442       << "Contact a Framework Developer\n";
00443   }
00444 
00445   void
00446   InputSource::skip(int) {
00447     throw Exception(errors::LogicError)
00448       << "InputSource::skip()\n"
00449       << "Random access is not implemented for this type of Input Source\n"
00450       << "Contact a Framework Developer\n";
00451   }
00452 
00453   bool
00454   InputSource::goToEvent_(EventID const& eventID) {
00455     throw Exception(errors::LogicError)
00456       << "InputSource::goToEvent_()\n"
00457       << "Random access is not implemented for this type of Input Source\n"
00458       << "Contact a Framework Developer\n";
00459     return true;
00460   }
00461 
00462 
00463   void
00464   InputSource::rewind_() {
00465     throw Exception(errors::LogicError)
00466       << "InputSource::rewind()\n"
00467       << "Rewind is not implemented for this type of Input Source\n"
00468       << "Contact a Framework Developer\n";
00469   }
00470 
00471   void
00472   InputSource::decreaseRemainingEventsBy(int iSkipped) {
00473     if(-1 == remainingEvents_) {
00474       return;
00475     }
00476     if(iSkipped < remainingEvents_) {
00477       remainingEvents_ -= iSkipped;
00478     } else {
00479       remainingEvents_ = 0;
00480     }
00481   }
00482 
00483   void
00484   InputSource::postRead(Event& event) {
00485     Service<RandomNumberGenerator> rng;
00486     if(rng.isAvailable()) {
00487       rng->postEventRead(event);
00488     }
00489   }
00490 
00491   void
00492   InputSource::doBeginRun(RunPrincipal& rp) {
00493     Run run(rp, moduleDescription());
00494     beginRun(run);
00495     run.commit_();
00496   }
00497 
00498   void
00499   InputSource::doEndRun(RunPrincipal& rp) {
00500     rp.setEndTime(time_);
00501     Run run(rp, moduleDescription());
00502     endRun(run);
00503     run.commit_();
00504     runPrematurelyRead_ = false;
00505   }
00506 
00507   void
00508   InputSource::doBeginLumi(LuminosityBlockPrincipal& lbp) {
00509     LuminosityBlock lb(lbp, moduleDescription());
00510     beginLuminosityBlock(lb);
00511     lb.commit_();
00512   }
00513 
00514   void
00515   InputSource::doEndLumi(LuminosityBlockPrincipal& lbp) {
00516     lbp.setEndTime(time_);
00517     LuminosityBlock lb(lbp, moduleDescription());
00518     endLuminosityBlock(lb);
00519     lb.commit_();
00520     lumiPrematurelyRead_ = false;
00521   }
00522 
00523   void
00524   InputSource::doPreForkReleaseResources() {
00525     preForkReleaseResources();
00526   }
00527 
00528   void
00529   InputSource::doPostForkReacquireResources(boost::shared_ptr<multicore::MessageReceiverForSource> iReceiver) {
00530     postForkReacquireResources(iReceiver);
00531   }
00532 
00533   void
00534   InputSource::wakeUp_() {}
00535 
00536   void
00537   InputSource::beginLuminosityBlock(LuminosityBlock&) {}
00538 
00539   void
00540   InputSource::endLuminosityBlock(LuminosityBlock&) {}
00541 
00542   void
00543   InputSource::beginRun(Run&) {}
00544 
00545   void
00546   InputSource::endRun(Run&) {}
00547 
00548   void
00549   InputSource::beginJob() {}
00550 
00551   void
00552   InputSource::endJob() {}
00553 
00554   void
00555   InputSource::preForkReleaseResources() {}
00556 
00557   void
00558   InputSource::postForkReacquireResources(boost::shared_ptr<multicore::MessageReceiverForSource>) {}
00559 
00560   bool
00561   InputSource::randomAccess_() const {
00562     return false;
00563   }
00564 
00565   ProcessingController::ForwardState
00566   InputSource::forwardState_() const {
00567     return ProcessingController::kUnknownForward;
00568   }
00569 
00570   ProcessingController::ReverseState
00571   InputSource::reverseState_() const {
00572     return ProcessingController::kUnknownReverse;
00573   }
00574 
00575   ProcessHistoryID const&
00576   InputSource::processHistoryID() const {
00577     assert(runAuxiliary());
00578     return runAuxiliary()->processHistoryID();
00579   }
00580 
00581   RunNumber_t
00582   InputSource::run() const {
00583     assert(runAuxiliary());
00584     return runAuxiliary()->run();
00585   }
00586 
00587   LuminosityBlockNumber_t
00588   InputSource::luminosityBlock() const {
00589     assert(luminosityBlockAuxiliary());
00590     return luminosityBlockAuxiliary()->luminosityBlock();
00591   }
00592 
00593   InputSource::SourceSentry::SourceSentry(Sig& pre, Sig& post) : post_(post) {
00594     pre();
00595   }
00596 
00597   InputSource::SourceSentry::~SourceSentry() {
00598     post_();
00599   }
00600 
00601   InputSource::EventSourceSentry::EventSourceSentry(InputSource const& source) :
00602      sentry_(source.actReg()->preSourceSignal_, source.actReg()->postSourceSignal_) {
00603   }
00604 
00605   InputSource::LumiSourceSentry::LumiSourceSentry(InputSource const& source) :
00606      sentry_(source.actReg()->preSourceLumiSignal_, source.actReg()->postSourceLumiSignal_) {
00607   }
00608 
00609   InputSource::RunSourceSentry::RunSourceSentry(InputSource const& source) :
00610      sentry_(source.actReg()->preSourceRunSignal_, source.actReg()->postSourceRunSignal_) {
00611   }
00612 
00613   InputSource::FileOpenSentry::FileOpenSentry(InputSource const& source) :
00614      sentry_(source.actReg()->preOpenFileSignal_, source.actReg()->postOpenFileSignal_) {
00615   }
00616 
00617   InputSource::FileCloseSentry::FileCloseSentry(InputSource const& source) :
00618      sentry_(source.actReg()->preCloseFileSignal_, source.actReg()->postCloseFileSignal_) {
00619   }
00620 }