00001
00002
00003 #include "FWCore/Framework/interface/InputSource.h"
00004
00005 #include "DataFormats/Provenance/interface/ProcessHistory.h"
00006 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
00007 #include "FWCore/Framework/interface/Event.h"
00008 #include "FWCore/Framework/interface/EventPrincipal.h"
00009 #include "FWCore/Framework/interface/FileBlock.h"
00010 #include "FWCore/Framework/interface/InputSourceDescription.h"
00011 #include "FWCore/Framework/interface/LuminosityBlock.h"
00012 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
00013 #include "FWCore/Framework/interface/Run.h"
00014 #include "FWCore/Framework/interface/RunPrincipal.h"
00015 #include "FWCore/Framework/src/PrincipalCache.h"
00016 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00017 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00018 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
00019 #include "FWCore/ServiceRegistry/interface/Service.h"
00020 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
00021 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
00022 #include "FWCore/Utilities/interface/do_nothing_deleter.h"
00023 #include "FWCore/Utilities/interface/GlobalIdentifier.h"
00024 #include "FWCore/Utilities/interface/RandomNumberGenerator.h"
00025 #include "FWCore/Utilities/interface/TimeOfDay.h"
00026
00027 #include <cassert>
00028 #include <fstream>
00029 #include <iomanip>
00030
00031 namespace edm {
00032
00033 namespace {
00034 std::string const& suffix(int count) {
00035 static std::string const st("st");
00036 static std::string const nd("nd");
00037 static std::string const rd("rd");
00038 static std::string const th("th");
00039
00040 int lastDigit = count % 10;
00041 if(lastDigit >= 4 || lastDigit == 0) return th;
00042
00043 if(count % 100 - lastDigit == 10) return th;
00044 return (lastDigit == 1 ? st : (lastDigit == 2 ? nd : rd));
00045 }
00046 template <typename T>
00047 boost::shared_ptr<T> createSharedPtrToStatic(T* ptr) {
00048 return boost::shared_ptr<T>(ptr, do_nothing_deleter());
00049 }
00050
00051 ProcessHistoryID
00052 deleteFromProcessHistory(ProcessHistoryID const& phid, std::string const& processName) {
00053
00054
00055
00056 if(!phid.isValid()) {
00057 return phid;
00058 }
00059 ProcessHistory ph;
00060 bool found = ProcessHistoryRegistry::instance()->getMapped(phid, ph);
00061 assert(found);
00062 ProcessHistory newPH;
00063 newPH.reserve(ph.size());
00064 for(ProcessHistory::const_iterator it = ph.begin(), itEnd = ph.end(); it != itEnd; ++it) {
00065 if(processName != it->processName()) {
00066 newPH.push_back(*it);
00067 }
00068 }
00069 ProcessHistoryRegistry::instance()->insertMapped(newPH);
00070 return newPH.id();
00071 }
00072 }
00073
00074 InputSource::InputSource(ParameterSet const& pset, InputSourceDescription const& desc) :
00075 ProductRegistryHelper(),
00076 actReg_(desc.actReg_),
00077 principalCache_(desc.principalCache_),
00078 maxEvents_(desc.maxEvents_),
00079 remainingEvents_(maxEvents_),
00080 maxLumis_(desc.maxLumis_),
00081 remainingLumis_(maxLumis_),
00082 readCount_(0),
00083 processingMode_(RunsLumisAndEvents),
00084 moduleDescription_(desc.moduleDescription_),
00085 productRegistry_(createSharedPtrToStatic<ProductRegistry const>(desc.productRegistry_)),
00086 primary_(pset.getParameter<std::string>("@module_label") == std::string("@main_input")),
00087 processGUID_(primary_ ? createGlobalIdentifier() : std::string()),
00088 time_(),
00089 doneReadAhead_(false),
00090 state_(IsInvalid),
00091 runAuxiliary_(),
00092 lumiAuxiliary_(),
00093 runPrematurelyRead_(false),
00094 lumiPrematurelyRead_(false),
00095 statusFileName_() {
00096
00097 if(pset.getUntrackedParameter<bool>("writeStatusFile", false)) {
00098 std::ostringstream statusfilename;
00099 statusfilename << "source_" << getpid();
00100 statusFileName_ = statusfilename.str();
00101 }
00102
00103
00104 if(primary_) {
00105 assert(desc.productRegistry_ != 0);
00106 }
00107 std::string const defaultMode("RunsLumisAndEvents");
00108 std::string const runMode("Runs");
00109 std::string const runLumiMode("RunsAndLumis");
00110
00111
00112
00113
00114
00115
00116 std::string processingMode = pset.getUntrackedParameter<std::string>("processingMode", defaultMode);
00117 if(processingMode == runMode) {
00118 processingMode_ = Runs;
00119 } else if(processingMode == runLumiMode) {
00120 processingMode_ = RunsAndLumis;
00121 } else if(processingMode != defaultMode) {
00122 throw Exception(errors::Configuration)
00123 << "InputSource::InputSource()\n"
00124 << "The 'processingMode' parameter for sources has an illegal value '" << processingMode << "'\n"
00125 << "Legal values are '" << defaultMode << "', '" << runLumiMode << "', or '" << runMode << "'.\n";
00126 }
00127 }
00128
00129 InputSource::~InputSource() {}
00130
00131 void
00132 InputSource::fillDescriptions(ConfigurationDescriptions& descriptions) {
00133 ParameterSetDescription desc;
00134 desc.setUnknown();
00135 descriptions.addDefault(desc);
00136 }
00137
00138 static std::string const kBaseType("Source");
00139
00140 std::string const&
00141 InputSource::baseType() {
00142 return kBaseType;
00143 }
00144
00145 void
00146 InputSource::fillDescription(ParameterSetDescription& desc) {
00147 std::string defaultString("RunsLumisAndEvents");
00148 desc.addUntracked<std::string>("processingMode", defaultString)->setComment(
00149 "'RunsLumisAndEvents': process runs, lumis, and events.\n"
00150 "'RunsAndLumis': process runs and lumis (not events).\n"
00151 "'Runs': process runs (not lumis or events).");
00152 desc.addUntracked<bool>("writeStatusFile", false)->setComment("Write a status file. Intended for use by workflow management.");
00153 }
00154
00155 EventPrincipal* const
00156 InputSource::eventPrincipalCache() {
00157 return &principalCache().eventPrincipal();
00158 }
00159
00160
00161
00162
00163
00164
00165
00166
00167
00168 InputSource::ItemType
00169 InputSource::nextItemType_() {
00170 ItemType itemType = getNextItemType();
00171 if(itemType == IsEvent && processingMode() != RunsLumisAndEvents) {
00172 readEvent_();
00173 return nextItemType_();
00174 }
00175 if(itemType == IsLumi && processingMode() == Runs) {
00176
00177 return nextItemType_();
00178 }
00179 return itemType;
00180 }
00181
00182 InputSource::ItemType
00183 InputSource::nextItemType() {
00184 if(doneReadAhead_) {
00185 return state_;
00186 }
00187 doneReadAhead_ = true;
00188 ItemType oldState = state_;
00189 if(eventLimitReached()) {
00190
00191 state_ = IsStop;
00192 } else if(lumiLimitReached()) {
00193
00194
00195 if(oldState == IsInvalid || oldState == IsFile || oldState == IsRun || processingMode() != RunsLumisAndEvents) {
00196 state_ = IsStop;
00197 } else {
00198 ItemType newState = nextItemType_();
00199 if(newState == IsEvent) {
00200 assert (processingMode() == RunsLumisAndEvents);
00201 state_ = IsEvent;
00202 } else {
00203 state_ = IsStop;
00204 }
00205 }
00206 } else {
00207 ItemType newState = nextItemType_();
00208 if(newState == IsStop) {
00209 state_ = IsStop;
00210 } else if(newState == IsFile || oldState == IsInvalid) {
00211 state_ = IsFile;
00212 } else if(newState == IsRun || oldState == IsFile) {
00213 runAuxiliary_ = readRunAuxiliary();
00214 state_ = IsRun;
00215 } else if(newState == IsLumi || oldState == IsRun) {
00216 assert (processingMode() != Runs);
00217 lumiAuxiliary_ = readLuminosityBlockAuxiliary();
00218 state_ = IsLumi;
00219 } else {
00220 assert (processingMode() == RunsLumisAndEvents);
00221 state_ = IsEvent;
00222 }
00223 }
00224 if(state_ == IsStop) {
00225 lumiAuxiliary_.reset();
00226 runAuxiliary_.reset();
00227 }
00228 return state_;
00229 }
00230
00231 void
00232 InputSource::doBeginJob() {
00233 this->beginJob();
00234 }
00235
00236 void
00237 InputSource::doEndJob() {
00238 endJob();
00239 }
00240
00241 void
00242 InputSource::registerProducts() {
00243 if(!typeLabelList().empty()) {
00244 addToRegistry(typeLabelList().begin(), typeLabelList().end(), moduleDescription(), productRegistryUpdate());
00245 }
00246 }
00247
00248
00249 boost::shared_ptr<FileBlock>
00250 InputSource::readFile() {
00251 assert(doneReadAhead_);
00252 assert(state_ == IsFile);
00253 assert(!limitReached());
00254 doneReadAhead_ = false;
00255 boost::shared_ptr<FileBlock> fb = readFile_();
00256 return fb;
00257 }
00258
00259 void
00260 InputSource::closeFile(boost::shared_ptr<FileBlock> fb) {
00261 fb->close();
00262 closeFile_();
00263 return;
00264 }
00265
00266
00267
00268
00269 boost::shared_ptr<FileBlock>
00270 InputSource::readFile_() {
00271 return boost::shared_ptr<FileBlock>(new FileBlock);
00272 }
00273
00274 boost::shared_ptr<RunPrincipal> const
00275 InputSource::runPrincipal() const {
00276 return principalCache_->runPrincipalPtr();
00277 }
00278
00279 boost::shared_ptr<LuminosityBlockPrincipal> const
00280 InputSource::luminosityBlockPrincipal() const {
00281 return principalCache_->lumiPrincipalPtr();
00282 }
00283
00284 void
00285 InputSource::readAndCacheRun() {
00286 if(runPrematurelyRead_) {
00287 runPrematurelyRead_ = false;
00288 return;
00289 }
00290 RunSourceSentry(*this);
00291 bool merged = principalCache_->merge(runAuxiliary(), productRegistry_);
00292 if(!merged) {
00293 boost::shared_ptr<RunPrincipal> rp(new RunPrincipal(runAuxiliary(), productRegistry_, processConfiguration()));
00294 principalCache_->insert(rp);
00295 }
00296 readRun_(principalCache_->runPrincipalPtr());
00297 }
00298
00299 int
00300 InputSource::markRun() {
00301 assert(doneReadAhead_);
00302 assert(state_ == IsRun);
00303 assert(!limitReached());
00304 doneReadAhead_ = false;
00305 return principalCache_->runPrincipal().run();
00306 }
00307
00308 void
00309 InputSource::readAndCacheLumi() {
00310 if(lumiPrematurelyRead_) {
00311 lumiPrematurelyRead_ = false;
00312 return;
00313 }
00314 LumiSourceSentry(*this);
00315 bool merged = principalCache_->merge(luminosityBlockAuxiliary(), productRegistry_);
00316 if(!merged) {
00317 boost::shared_ptr<LuminosityBlockPrincipal> lb(
00318 new LuminosityBlockPrincipal(luminosityBlockAuxiliary(),
00319 productRegistry_,
00320 processConfiguration(),
00321 principalCache_->runPrincipalPtr()));
00322 principalCache_->insert(lb);
00323 }
00324 readLuminosityBlock_(principalCache_->lumiPrincipalPtr());
00325 }
00326
00327 int
00328 InputSource::markLumi() {
00329 assert(doneReadAhead_);
00330 assert(state_ == IsLumi);
00331 assert(!limitReached());
00332 doneReadAhead_ = false;
00333 --remainingLumis_;
00334 assert(principalCache_->lumiPrincipal().luminosityBlock() == luminosityBlockAuxiliary()->luminosityBlock());
00335 return principalCache_->lumiPrincipal().luminosityBlock();
00336 }
00337
00338 boost::shared_ptr<RunPrincipal>
00339 InputSource::readRun_(boost::shared_ptr<RunPrincipal> rpCache) {
00340
00341
00342
00343 rpCache->fillRunPrincipal();
00344 return rpCache;
00345 }
00346
00347 boost::shared_ptr<LuminosityBlockPrincipal>
00348 InputSource::readLuminosityBlock_(boost::shared_ptr<LuminosityBlockPrincipal> lbCache) {
00349 lbCache->fillLuminosityBlockPrincipal();
00350 return lbCache;
00351 }
00352
00353 EventPrincipal*
00354 InputSource::readEvent(boost::shared_ptr<LuminosityBlockPrincipal> lbCache) {
00355 assert(doneReadAhead_);
00356 assert(state_ == IsEvent);
00357 assert(!eventLimitReached());
00358 doneReadAhead_ = false;
00359
00360 EventPrincipal* result = readEvent_();
00361 if(result != 0) {
00362 assert(lbCache->run() == result->run());
00363 assert(lbCache->luminosityBlock() == result->luminosityBlock());
00364 Event event(*result, moduleDescription());
00365 postRead(event);
00366 if(remainingEvents_ > 0) --remainingEvents_;
00367 ++readCount_;
00368 setTimestamp(result->time());
00369 issueReports(result->id());
00370 }
00371 return result;
00372 }
00373
00374 EventPrincipal*
00375 InputSource::readEvent(EventID const& eventID) {
00376 EventPrincipal* result = 0;
00377
00378 if(!limitReached()) {
00379 result = readIt(eventID);
00380 if(result != 0) {
00381 Event event(*result, moduleDescription());
00382 postRead(event);
00383 if(remainingEvents_ > 0) --remainingEvents_;
00384 ++readCount_;
00385 issueReports(result->id());
00386 }
00387 }
00388 return result;
00389 }
00390
00391 void
00392 InputSource::skipEvents(int offset) {
00393 doneReadAhead_ = false;
00394 this->skip(offset);
00395 }
00396
00397 bool
00398 InputSource::goToEvent(EventID const& eventID) {
00399 doneReadAhead_ = false;
00400 return this->goToEvent_(eventID);
00401 }
00402
00403 void
00404 InputSource::issueReports(EventID const& eventID) {
00405 if(isInfoEnabled()) {
00406 LogVerbatim("FwkReport") << "Begin processing the " << readCount_
00407 << suffix(readCount_) << " record. Run " << eventID.run()
00408 << ", Event " << eventID.event()
00409 << ", LumiSection " << eventID.luminosityBlock()
00410 << " at " << std::setprecision(3) << TimeOfDay();
00411 }
00412 if(!statusFileName_.empty()) {
00413 std::ofstream statusFile(statusFileName_.c_str());
00414 statusFile << eventID << " time: " << std::setprecision(3) << TimeOfDay() << '\n';
00415 statusFile.close();
00416 }
00417
00418
00419 }
00420
00421 EventPrincipal*
00422 InputSource::readIt(EventID const&) {
00423 throw Exception(errors::LogicError)
00424 << "InputSource::readIt()\n"
00425 << "Random access is not implemented for this type of Input Source\n"
00426 << "Contact a Framework Developer\n";
00427 }
00428
00429 void
00430 InputSource::setRun(RunNumber_t) {
00431 throw Exception(errors::LogicError)
00432 << "InputSource::setRun()\n"
00433 << "Run number cannot be modified for this type of Input Source\n"
00434 << "Contact a Framework Developer\n";
00435 }
00436
00437 void
00438 InputSource::setLumi(LuminosityBlockNumber_t) {
00439 throw Exception(errors::LogicError)
00440 << "InputSource::setLumi()\n"
00441 << "Luminosity Block ID cannot be modified for this type of Input Source\n"
00442 << "Contact a Framework Developer\n";
00443 }
00444
00445 void
00446 InputSource::skip(int) {
00447 throw Exception(errors::LogicError)
00448 << "InputSource::skip()\n"
00449 << "Random access is not implemented for this type of Input Source\n"
00450 << "Contact a Framework Developer\n";
00451 }
00452
00453 bool
00454 InputSource::goToEvent_(EventID const& eventID) {
00455 throw Exception(errors::LogicError)
00456 << "InputSource::goToEvent_()\n"
00457 << "Random access is not implemented for this type of Input Source\n"
00458 << "Contact a Framework Developer\n";
00459 return true;
00460 }
00461
00462
00463 void
00464 InputSource::rewind_() {
00465 throw Exception(errors::LogicError)
00466 << "InputSource::rewind()\n"
00467 << "Rewind is not implemented for this type of Input Source\n"
00468 << "Contact a Framework Developer\n";
00469 }
00470
00471 void
00472 InputSource::decreaseRemainingEventsBy(int iSkipped) {
00473 if(-1 == remainingEvents_) {
00474 return;
00475 }
00476 if(iSkipped < remainingEvents_) {
00477 remainingEvents_ -= iSkipped;
00478 } else {
00479 remainingEvents_ = 0;
00480 }
00481 }
00482
00483 void
00484 InputSource::postRead(Event& event) {
00485 Service<RandomNumberGenerator> rng;
00486 if(rng.isAvailable()) {
00487 rng->postEventRead(event);
00488 }
00489 }
00490
00491 void
00492 InputSource::doBeginRun(RunPrincipal& rp) {
00493 Run run(rp, moduleDescription());
00494 beginRun(run);
00495 run.commit_();
00496 }
00497
00498 void
00499 InputSource::doEndRun(RunPrincipal& rp) {
00500 rp.setEndTime(time_);
00501 Run run(rp, moduleDescription());
00502 endRun(run);
00503 run.commit_();
00504 runPrematurelyRead_ = false;
00505 }
00506
00507 void
00508 InputSource::doBeginLumi(LuminosityBlockPrincipal& lbp) {
00509 LuminosityBlock lb(lbp, moduleDescription());
00510 beginLuminosityBlock(lb);
00511 lb.commit_();
00512 }
00513
00514 void
00515 InputSource::doEndLumi(LuminosityBlockPrincipal& lbp) {
00516 lbp.setEndTime(time_);
00517 LuminosityBlock lb(lbp, moduleDescription());
00518 endLuminosityBlock(lb);
00519 lb.commit_();
00520 lumiPrematurelyRead_ = false;
00521 }
00522
00523 void
00524 InputSource::doPreForkReleaseResources() {
00525 preForkReleaseResources();
00526 }
00527
00528 void
00529 InputSource::doPostForkReacquireResources(boost::shared_ptr<multicore::MessageReceiverForSource> iReceiver) {
00530 postForkReacquireResources(iReceiver);
00531 }
00532
00533 void
00534 InputSource::wakeUp_() {}
00535
00536 void
00537 InputSource::beginLuminosityBlock(LuminosityBlock&) {}
00538
00539 void
00540 InputSource::endLuminosityBlock(LuminosityBlock&) {}
00541
00542 void
00543 InputSource::beginRun(Run&) {}
00544
00545 void
00546 InputSource::endRun(Run&) {}
00547
00548 void
00549 InputSource::beginJob() {}
00550
00551 void
00552 InputSource::endJob() {}
00553
00554 void
00555 InputSource::preForkReleaseResources() {}
00556
00557 void
00558 InputSource::postForkReacquireResources(boost::shared_ptr<multicore::MessageReceiverForSource>) {}
00559
00560 bool
00561 InputSource::randomAccess_() const {
00562 return false;
00563 }
00564
00565 ProcessingController::ForwardState
00566 InputSource::forwardState_() const {
00567 return ProcessingController::kUnknownForward;
00568 }
00569
00570 ProcessingController::ReverseState
00571 InputSource::reverseState_() const {
00572 return ProcessingController::kUnknownReverse;
00573 }
00574
00575 ProcessHistoryID const&
00576 InputSource::processHistoryID() const {
00577 assert(runAuxiliary());
00578 return runAuxiliary()->processHistoryID();
00579 }
00580
00581 RunNumber_t
00582 InputSource::run() const {
00583 assert(runAuxiliary());
00584 return runAuxiliary()->run();
00585 }
00586
00587 LuminosityBlockNumber_t
00588 InputSource::luminosityBlock() const {
00589 assert(luminosityBlockAuxiliary());
00590 return luminosityBlockAuxiliary()->luminosityBlock();
00591 }
00592
00593 InputSource::SourceSentry::SourceSentry(Sig& pre, Sig& post) : post_(post) {
00594 pre();
00595 }
00596
00597 InputSource::SourceSentry::~SourceSentry() {
00598 post_();
00599 }
00600
00601 InputSource::EventSourceSentry::EventSourceSentry(InputSource const& source) :
00602 sentry_(source.actReg()->preSourceSignal_, source.actReg()->postSourceSignal_) {
00603 }
00604
00605 InputSource::LumiSourceSentry::LumiSourceSentry(InputSource const& source) :
00606 sentry_(source.actReg()->preSourceLumiSignal_, source.actReg()->postSourceLumiSignal_) {
00607 }
00608
00609 InputSource::RunSourceSentry::RunSourceSentry(InputSource const& source) :
00610 sentry_(source.actReg()->preSourceRunSignal_, source.actReg()->postSourceRunSignal_) {
00611 }
00612
00613 InputSource::FileOpenSentry::FileOpenSentry(InputSource const& source) :
00614 sentry_(source.actReg()->preOpenFileSignal_, source.actReg()->postOpenFileSignal_) {
00615 }
00616
00617 InputSource::FileCloseSentry::FileCloseSentry(InputSource const& source) :
00618 sentry_(source.actReg()->preCloseFileSignal_, source.actReg()->postCloseFileSignal_) {
00619 }
00620 }