CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_5_2_9/src/FWCore/Framework/src/InputSource.cc

Go to the documentation of this file.
00001 /*----------------------------------------------------------------------
00002 ----------------------------------------------------------------------*/
00003 #include "FWCore/Framework/interface/InputSource.h"
00004 
00005 #include "DataFormats/Provenance/interface/ProcessHistory.h"
00006 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
00007 #include "DataFormats/Provenance/interface/FullHistoryToReducedHistoryMap.h"
00008 #include "FWCore/Framework/interface/Event.h"
00009 #include "FWCore/Framework/interface/EventPrincipal.h"
00010 #include "FWCore/Framework/interface/FileBlock.h"
00011 #include "FWCore/Framework/interface/InputSourceDescription.h"
00012 #include "FWCore/Framework/interface/LuminosityBlock.h"
00013 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
00014 #include "FWCore/Framework/interface/Run.h"
00015 #include "FWCore/Framework/interface/RunPrincipal.h"
00016 #include "FWCore/Framework/src/PrincipalCache.h"
00017 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00018 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00019 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
00020 #include "FWCore/ServiceRegistry/interface/Service.h"
00021 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
00022 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
00023 #include "FWCore/Utilities/interface/do_nothing_deleter.h"
00024 #include "FWCore/Utilities/interface/GlobalIdentifier.h"
00025 #include "FWCore/Utilities/interface/RandomNumberGenerator.h"
00026 #include "FWCore/Utilities/interface/TimeOfDay.h"
00027 
00028 #include <cassert>
00029 #include <fstream>
00030 #include <iomanip>
00031 
00032 namespace edm {
00033 
00034   namespace {
00035         std::string const& suffix(int count) {
00036           static std::string const st("st");
00037           static std::string const nd("nd");
00038           static std::string const rd("rd");
00039           static std::string const th("th");
00040           // *0, *4 - *9 use "th".
00041           int lastDigit = count % 10;
00042           if(lastDigit >= 4 || lastDigit == 0) return th;
00043           // *11, *12, or *13 use "th".
00044           if(count % 100 - lastDigit == 10) return th;
00045           return (lastDigit == 1 ? st : (lastDigit == 2 ? nd : rd));
00046         }
00047         template <typename T>
00048         boost::shared_ptr<T> createSharedPtrToStatic(T* ptr) {
00049           return boost::shared_ptr<T>(ptr, do_nothing_deleter());
00050         }
00051   }
00052 
00053   InputSource::InputSource(ParameterSet const& pset, InputSourceDescription const& desc) :
00054       ProductRegistryHelper(),
00055       actReg_(desc.actReg_),
00056       principalCache_(desc.principalCache_),
00057       maxEvents_(desc.maxEvents_),
00058       remainingEvents_(maxEvents_),
00059       maxLumis_(desc.maxLumis_),
00060       remainingLumis_(maxLumis_),
00061       readCount_(0),
00062       processingMode_(RunsLumisAndEvents),
00063       moduleDescription_(desc.moduleDescription_),
00064       productRegistry_(createSharedPtrToStatic<ProductRegistry const>(desc.productRegistry_)),
00065       primary_(pset.getParameter<std::string>("@module_label") == std::string("@main_input")),
00066       processGUID_(primary_ ? createGlobalIdentifier() : std::string()),
00067       time_(),
00068       doneReadAhead_(false),
00069       state_(IsInvalid),
00070       runAuxiliary_(),
00071       lumiAuxiliary_(),
00072       statusFileName_() {
00073 
00074     if(pset.getUntrackedParameter<bool>("writeStatusFile", false)) {
00075       std::ostringstream statusfilename;
00076       statusfilename << "source_" << getpid();
00077       statusFileName_ = statusfilename.str();
00078     }
00079 
00080     // Secondary input sources currently do not have a product registry.
00081     if(primary_) {
00082       assert(desc.productRegistry_ != 0);
00083     }
00084     std::string const defaultMode("RunsLumisAndEvents");
00085     std::string const runMode("Runs");
00086     std::string const runLumiMode("RunsAndLumis");
00087 
00088     // The default value provided as the second argument to the getUntrackedParameter function call
00089     // is not used when the ParameterSet has been validated and the parameters are not optional
00090     // in the description.  As soon as all primary input sources and all modules with a secondary
00091     // input sources have defined descriptions, the defaults in the getUntrackedParameterSet function
00092     // calls can and should be deleted from the code.
00093     std::string processingMode = pset.getUntrackedParameter<std::string>("processingMode", defaultMode);
00094     if(processingMode == runMode) {
00095       processingMode_ = Runs;
00096     } else if(processingMode == runLumiMode) {
00097       processingMode_ = RunsAndLumis;
00098     } else if(processingMode != defaultMode) {
00099       throw Exception(errors::Configuration)
00100         << "InputSource::InputSource()\n"
00101         << "The 'processingMode' parameter for sources has an illegal value '" << processingMode << "'\n"
00102         << "Legal values are '" << defaultMode << "', '" << runLumiMode << "', or '" << runMode << "'.\n";
00103     }
00104   }
00105 
00106   InputSource::~InputSource() {}
00107 
00108   void
00109   InputSource::fillDescriptions(ConfigurationDescriptions& descriptions) {
00110     ParameterSetDescription desc;
00111     desc.setUnknown();
00112     descriptions.addDefault(desc);
00113   }
00114 
00115   void
00116   InputSource::prevalidate(ConfigurationDescriptions& ) {
00117   }
00118   
00119 
00120   static std::string const kBaseType("Source");
00121 
00122   std::string const&
00123   InputSource::baseType() {
00124     return kBaseType;
00125   }
00126 
00127   void
00128   InputSource::fillDescription(ParameterSetDescription& desc) {
00129     std::string defaultString("RunsLumisAndEvents");
00130     desc.addUntracked<std::string>("processingMode", defaultString)->setComment(
00131     "'RunsLumisAndEvents': process runs, lumis, and events.\n"
00132     "'RunsAndLumis':       process runs and lumis (not events).\n"
00133     "'Runs':               process runs (not lumis or events).");
00134     desc.addUntracked<bool>("writeStatusFile", false)->setComment("Write a status file. Intended for use by workflow management.");
00135   }
00136 
00137   EventPrincipal*
00138   InputSource::eventPrincipalCache() {
00139     return &principalCache().eventPrincipal();
00140   }
00141 
00142   // This next function is to guarantee that "runs only" mode does not return events or lumis,
00143   // and that "runs and lumis only" mode does not return events.
00144   // For input sources that are not random access (e.g. you need to read through the events
00145   // to get to the lumis and runs), this is all that is involved to implement these modes.
00146   // For input sources where events or lumis can be skipped, getNextItemType() should
00147   // implement the skipping internally, so that the performance gain is realized.
00148   // If this is done for a source, the 'if' blocks in this function will never be entered
00149   // for that source.
00150   InputSource::ItemType
00151   InputSource::nextItemType_() {
00152     ItemType itemType = getNextItemType();
00153     if(itemType == IsEvent && processingMode() != RunsLumisAndEvents) {
00154       readEvent_();
00155       return nextItemType_();
00156     }
00157     if(itemType == IsLumi && processingMode() == Runs) {
00158       // QQQ skipLuminosityBlock_();
00159       return nextItemType_();
00160     }
00161     return itemType;
00162   }
00163 
00164   InputSource::ItemType
00165   InputSource::nextItemType() {
00166     if(doneReadAhead_) {
00167       return state_;
00168     }
00169     doneReadAhead_ = true;
00170     ItemType oldState = state_;
00171     if(eventLimitReached()) {
00172       // If the maximum event limit has been reached, stop.
00173       state_ = IsStop;
00174     } else if(lumiLimitReached()) {
00175       // If the maximum lumi limit has been reached, stop
00176       // when reaching a new file, run, or lumi.
00177       if(oldState == IsInvalid || oldState == IsFile || oldState == IsRun || processingMode() != RunsLumisAndEvents) {
00178         state_ = IsStop;
00179       } else {
00180         ItemType newState = nextItemType_();
00181         if(newState == IsEvent) {
00182           assert (processingMode() == RunsLumisAndEvents);
00183           state_ = IsEvent;
00184         } else {
00185           state_ = IsStop;
00186         }
00187       }
00188     } else {
00189       ItemType newState = nextItemType_();
00190       if(newState == IsStop) {
00191         state_ = IsStop;
00192       } else if(newState == IsFile || oldState == IsInvalid) {
00193         state_ = IsFile;
00194       } else if(newState == IsRun || oldState == IsFile) {
00195         runAuxiliary_ = readRunAuxiliary();
00196         state_ = IsRun;
00197       } else if(newState == IsLumi || oldState == IsRun) {
00198         assert (processingMode() != Runs);
00199         lumiAuxiliary_ = readLuminosityBlockAuxiliary();
00200         state_ = IsLumi;
00201       } else {
00202         assert (processingMode() == RunsLumisAndEvents);
00203         state_ = IsEvent;
00204       }
00205     }
00206     if(state_ == IsStop) {
00207       lumiAuxiliary_.reset();
00208       runAuxiliary_.reset();
00209     }
00210     return state_;
00211   }
00212 
00213   void
00214   InputSource::doBeginJob() {
00215     this->beginJob();
00216   }
00217 
00218   void
00219   InputSource::doEndJob() {
00220     endJob();
00221   }
00222 
00223   void
00224   InputSource::registerProducts() {
00225     if(!typeLabelList().empty()) {
00226       addToRegistry(typeLabelList().begin(), typeLabelList().end(), moduleDescription(), productRegistryUpdate());
00227     }
00228   }
00229 
00230   // Return a dummy file block.
00231   boost::shared_ptr<FileBlock>
00232   InputSource::readFile() {
00233     assert(doneReadAhead_);
00234     assert(state_ == IsFile);
00235     assert(!limitReached());
00236     doneReadAhead_ = false;
00237     boost::shared_ptr<FileBlock> fb = readFile_();
00238     return fb;
00239   }
00240 
00241   void
00242   InputSource::closeFile(boost::shared_ptr<FileBlock> fb) {
00243     fb->close();
00244     closeFile_();
00245     return;
00246   }
00247 
00248   // Return a dummy file block.
00249   // This function must be overridden for any input source that reads a file
00250   // containing Products.
00251   boost::shared_ptr<FileBlock>
00252   InputSource::readFile_() {
00253     return boost::shared_ptr<FileBlock>(new FileBlock);
00254   }
00255 
00256   boost::shared_ptr<RunPrincipal> const
00257   InputSource::runPrincipal() const {
00258     return principalCache_->runPrincipalPtr();
00259   }
00260 
00261   boost::shared_ptr<LuminosityBlockPrincipal> const
00262   InputSource::luminosityBlockPrincipal() const {
00263     return principalCache_->lumiPrincipalPtr();
00264   }
00265 
00266   void
00267   InputSource::readAndCacheRun(bool merge, HistoryAppender& historyAppender) {
00268     RunSourceSentry(*this);
00269     if (merge) {
00270       principalCache_->merge(runAuxiliary(), productRegistry_);
00271     } else {
00272       boost::shared_ptr<RunPrincipal> rp(new RunPrincipal(runAuxiliary(), productRegistry_, processConfiguration(), &historyAppender));
00273       principalCache_->insert(rp);
00274     }
00275     readRun_(principalCache_->runPrincipalPtr());
00276   }
00277 
00278   int
00279   InputSource::markRun() {
00280     assert(doneReadAhead_);
00281     assert(state_ == IsRun);
00282     assert(!limitReached());
00283     doneReadAhead_ = false;
00284     return principalCache_->runPrincipal().run();
00285   }
00286 
00287   void
00288   InputSource::readAndCacheLumi(bool merge, HistoryAppender& historyAppender) {
00289     LumiSourceSentry(*this);
00290     if (merge) {
00291       principalCache_->merge(luminosityBlockAuxiliary(), productRegistry_);
00292     } else {
00293       boost::shared_ptr<LuminosityBlockPrincipal> lb(
00294         new LuminosityBlockPrincipal(luminosityBlockAuxiliary(),
00295                                      productRegistry_,
00296                                      processConfiguration(),
00297                                      principalCache_->runPrincipalPtr(),
00298                                      &historyAppender));
00299       principalCache_->insert(lb);
00300     }
00301     readLuminosityBlock_(principalCache_->lumiPrincipalPtr());
00302   }
00303 
00304   int
00305   InputSource::markLumi() {
00306     assert(doneReadAhead_);
00307     assert(state_ == IsLumi);
00308     assert(!limitReached());
00309     doneReadAhead_ = false;
00310     --remainingLumis_;
00311     assert(principalCache_->lumiPrincipal().luminosityBlock() == luminosityBlockAuxiliary()->luminosityBlock());
00312     return principalCache_->lumiPrincipal().luminosityBlock();
00313   }
00314 
00315   boost::shared_ptr<RunPrincipal>
00316   InputSource::readRun_(boost::shared_ptr<RunPrincipal> rpCache) {
00317     // Note: For the moment, we do not support saving and restoring the state of the
00318     // random number generator if random numbers are generated during processing of runs
00319     // (e.g. beginRun(), endRun())
00320     rpCache->fillRunPrincipal();
00321     return rpCache;
00322   }
00323 
00324   boost::shared_ptr<LuminosityBlockPrincipal>
00325   InputSource::readLuminosityBlock_(boost::shared_ptr<LuminosityBlockPrincipal> lbCache) {
00326     lbCache->fillLuminosityBlockPrincipal();
00327     return lbCache;
00328   }
00329 
00330   EventPrincipal*
00331   InputSource::readEvent(boost::shared_ptr<LuminosityBlockPrincipal> lbCache) {
00332     assert(doneReadAhead_);
00333     assert(state_ == IsEvent);
00334     assert(!eventLimitReached());
00335     doneReadAhead_ = false;
00336 
00337     EventPrincipal* result = readEvent_();
00338     if(result != 0) {
00339       assert(result->luminosityBlockPrincipalPtrValid());
00340       assert(lbCache->run() == result->run());
00341       assert(lbCache->luminosityBlock() == result->luminosityBlock());
00342       Event event(*result, moduleDescription());
00343       postRead(event);
00344       if(remainingEvents_ > 0) --remainingEvents_;
00345       ++readCount_;
00346       setTimestamp(result->time());
00347       issueReports(result->id());
00348     }
00349     return result;
00350   }
00351 
00352   EventPrincipal*
00353   InputSource::readEvent(EventID const& eventID) {
00354     EventPrincipal* result = 0;
00355 
00356     if(!limitReached()) {
00357       result = readIt(eventID);
00358       if(result != 0) {
00359         Event event(*result, moduleDescription());
00360         postRead(event);
00361         if(remainingEvents_ > 0) --remainingEvents_;
00362         ++readCount_;
00363         issueReports(result->id());
00364       }
00365     }
00366     return result;
00367   }
00368 
00369   void
00370   InputSource::skipEvents(int offset) {
00371     doneReadAhead_ = false;
00372     this->skip(offset);
00373   }
00374 
00375   bool
00376   InputSource::goToEvent(EventID const& eventID) {
00377     doneReadAhead_ = false;
00378     return this->goToEvent_(eventID);
00379   }
00380 
00381   void
00382   InputSource::issueReports(EventID const& eventID) {
00383     if(isInfoEnabled()) {
00384       LogVerbatim("FwkReport") << "Begin processing the " << readCount_
00385                                << suffix(readCount_) << " record. Run " << eventID.run()
00386                                << ", Event " << eventID.event()
00387                                << ", LumiSection " << eventID.luminosityBlock()
00388                                << " at " << std::setprecision(3) << TimeOfDay();
00389     }
00390     if(!statusFileName_.empty()) {
00391       std::ofstream statusFile(statusFileName_.c_str());
00392       statusFile << eventID << " time: " << std::setprecision(3) << TimeOfDay() << '\n';
00393       statusFile.close();
00394     }
00395 
00396     // At some point we may want to initiate checkpointing here
00397   }
00398 
00399   EventPrincipal*
00400   InputSource::readIt(EventID const&) {
00401     throw Exception(errors::LogicError)
00402       << "InputSource::readIt()\n"
00403       << "Random access is not implemented for this type of Input Source\n"
00404       << "Contact a Framework Developer\n";
00405   }
00406 
00407   void
00408   InputSource::setRun(RunNumber_t) {
00409     throw Exception(errors::LogicError)
00410       << "InputSource::setRun()\n"
00411       << "Run number cannot be modified for this type of Input Source\n"
00412       << "Contact a Framework Developer\n";
00413   }
00414 
00415   void
00416   InputSource::setLumi(LuminosityBlockNumber_t) {
00417     throw Exception(errors::LogicError)
00418       << "InputSource::setLumi()\n"
00419       << "Luminosity Block ID cannot be modified for this type of Input Source\n"
00420       << "Contact a Framework Developer\n";
00421   }
00422 
00423   void
00424   InputSource::skip(int) {
00425     throw Exception(errors::LogicError)
00426       << "InputSource::skip()\n"
00427       << "Random access is not implemented for this type of Input Source\n"
00428       << "Contact a Framework Developer\n";
00429   }
00430 
00431   bool
00432   InputSource::goToEvent_(EventID const&) {
00433     throw Exception(errors::LogicError)
00434       << "InputSource::goToEvent_()\n"
00435       << "Random access is not implemented for this type of Input Source\n"
00436       << "Contact a Framework Developer\n";
00437     return true;
00438   }
00439 
00440   void
00441   InputSource::rewind_() {
00442     throw Exception(errors::LogicError)
00443       << "InputSource::rewind()\n"
00444       << "Rewind is not implemented for this type of Input Source\n"
00445       << "Contact a Framework Developer\n";
00446   }
00447 
00448   void
00449   InputSource::decreaseRemainingEventsBy(int iSkipped) {
00450     if(-1 == remainingEvents_) {
00451       return;
00452     }
00453     if(iSkipped < remainingEvents_) {
00454       remainingEvents_ -= iSkipped;
00455     } else {
00456       remainingEvents_ = 0;
00457     }
00458   }
00459 
00460   void
00461   InputSource::postRead(Event& event) {
00462     Service<RandomNumberGenerator> rng;
00463     if(rng.isAvailable()) {
00464       rng->postEventRead(event);
00465     }
00466   }
00467 
00468   void
00469   InputSource::doBeginRun(RunPrincipal& rp) {
00470     Run run(rp, moduleDescription());
00471     beginRun(run);
00472     run.commit_();
00473   }
00474 
00475   void
00476   InputSource::doEndRun(RunPrincipal& rp) {
00477     rp.setEndTime(time_);
00478     Run run(rp, moduleDescription());
00479     endRun(run);
00480     run.commit_();
00481   }
00482 
00483   void
00484   InputSource::doBeginLumi(LuminosityBlockPrincipal& lbp) {
00485     LuminosityBlock lb(lbp, moduleDescription());
00486     beginLuminosityBlock(lb);
00487     lb.commit_();
00488   }
00489 
00490   void
00491   InputSource::doEndLumi(LuminosityBlockPrincipal& lbp) {
00492     lbp.setEndTime(time_);
00493     LuminosityBlock lb(lbp, moduleDescription());
00494     endLuminosityBlock(lb);
00495     lb.commit_();
00496   }
00497 
00498   void
00499   InputSource::doPreForkReleaseResources() {
00500     preForkReleaseResources();
00501   }
00502 
00503   void
00504   InputSource::doPostForkReacquireResources(boost::shared_ptr<multicore::MessageReceiverForSource> iReceiver) {
00505     postForkReacquireResources(iReceiver);
00506   }
00507 
00508   void
00509   InputSource::wakeUp_() {}
00510 
00511   void
00512   InputSource::beginLuminosityBlock(LuminosityBlock&) {}
00513 
00514   void
00515   InputSource::endLuminosityBlock(LuminosityBlock&) {}
00516 
00517   void
00518   InputSource::beginRun(Run&) {}
00519 
00520   void
00521   InputSource::endRun(Run&) {}
00522 
00523   void
00524   InputSource::beginJob() {}
00525 
00526   void
00527   InputSource::endJob() {}
00528 
00529   void
00530   InputSource::preForkReleaseResources() {}
00531 
00532   void
00533   InputSource::postForkReacquireResources(boost::shared_ptr<multicore::MessageReceiverForSource>) {}
00534 
00535   bool
00536   InputSource::randomAccess_() const {
00537     return false;
00538   }
00539 
00540   ProcessingController::ForwardState
00541   InputSource::forwardState_() const {
00542     return ProcessingController::kUnknownForward;
00543   }
00544 
00545   ProcessingController::ReverseState
00546   InputSource::reverseState_() const {
00547     return ProcessingController::kUnknownReverse;
00548   }
00549 
00550   ProcessHistoryID const&
00551   InputSource::reducedProcessHistoryID() const {
00552     assert(runAuxiliary());
00553     return ProcessHistoryRegistry::instance()->extra().reduceProcessHistoryID(runAuxiliary()->processHistoryID());
00554   }
00555 
00556   RunNumber_t
00557   InputSource::run() const {
00558     assert(runAuxiliary());
00559     return runAuxiliary()->run();
00560   }
00561 
00562   LuminosityBlockNumber_t
00563   InputSource::luminosityBlock() const {
00564     assert(luminosityBlockAuxiliary());
00565     return luminosityBlockAuxiliary()->luminosityBlock();
00566   }
00567 
00568   InputSource::SourceSentry::SourceSentry(Sig& pre, Sig& post) : post_(post) {
00569     pre();
00570   }
00571 
00572   InputSource::SourceSentry::~SourceSentry() {
00573     post_();
00574   }
00575 
00576   InputSource::EventSourceSentry::EventSourceSentry(InputSource const& source) :
00577      sentry_(source.actReg()->preSourceSignal_, source.actReg()->postSourceSignal_) {
00578   }
00579 
00580   InputSource::LumiSourceSentry::LumiSourceSentry(InputSource const& source) :
00581      sentry_(source.actReg()->preSourceLumiSignal_, source.actReg()->postSourceLumiSignal_) {
00582   }
00583 
00584   InputSource::RunSourceSentry::RunSourceSentry(InputSource const& source) :
00585      sentry_(source.actReg()->preSourceRunSignal_, source.actReg()->postSourceRunSignal_) {
00586   }
00587 
00588   InputSource::FileOpenSentry::FileOpenSentry(InputSource const& source) :
00589      sentry_(source.actReg()->preOpenFileSignal_, source.actReg()->postOpenFileSignal_) {
00590   }
00591 
00592   InputSource::FileCloseSentry::FileCloseSentry(InputSource const& source) :
00593      sentry_(source.actReg()->preCloseFileSignal_, source.actReg()->postCloseFileSignal_) {
00594   }
00595 }