00001
00002
00003 #include "FWCore/Framework/interface/InputSource.h"
00004
00005 #include "DataFormats/Provenance/interface/ProcessHistory.h"
00006 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
00007 #include "DataFormats/Provenance/interface/FullHistoryToReducedHistoryMap.h"
00008 #include "FWCore/Framework/interface/Event.h"
00009 #include "FWCore/Framework/interface/EventPrincipal.h"
00010 #include "FWCore/Framework/interface/FileBlock.h"
00011 #include "FWCore/Framework/interface/InputSourceDescription.h"
00012 #include "FWCore/Framework/interface/LuminosityBlock.h"
00013 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
00014 #include "FWCore/Framework/interface/Run.h"
00015 #include "FWCore/Framework/interface/RunPrincipal.h"
00016 #include "FWCore/Framework/src/PrincipalCache.h"
00017 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00018 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00019 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
00020 #include "FWCore/ServiceRegistry/interface/Service.h"
00021 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
00022 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
00023 #include "FWCore/Utilities/interface/do_nothing_deleter.h"
00024 #include "FWCore/Utilities/interface/GlobalIdentifier.h"
00025 #include "FWCore/Utilities/interface/RandomNumberGenerator.h"
00026 #include "FWCore/Utilities/interface/TimeOfDay.h"
00027
00028 #include <cassert>
00029 #include <fstream>
00030 #include <iomanip>
00031
00032 namespace edm {
00033
00034 namespace {
00035 std::string const& suffix(int count) {
00036 static std::string const st("st");
00037 static std::string const nd("nd");
00038 static std::string const rd("rd");
00039 static std::string const th("th");
00040
00041 int lastDigit = count % 10;
00042 if(lastDigit >= 4 || lastDigit == 0) return th;
00043
00044 if(count % 100 - lastDigit == 10) return th;
00045 return (lastDigit == 1 ? st : (lastDigit == 2 ? nd : rd));
00046 }
00047 template <typename T>
00048 boost::shared_ptr<T> createSharedPtrToStatic(T* ptr) {
00049 return boost::shared_ptr<T>(ptr, do_nothing_deleter());
00050 }
00051 }
00052
00053 InputSource::InputSource(ParameterSet const& pset, InputSourceDescription const& desc) :
00054 ProductRegistryHelper(),
00055 actReg_(desc.actReg_),
00056 principalCache_(desc.principalCache_),
00057 maxEvents_(desc.maxEvents_),
00058 remainingEvents_(maxEvents_),
00059 maxLumis_(desc.maxLumis_),
00060 remainingLumis_(maxLumis_),
00061 readCount_(0),
00062 processingMode_(RunsLumisAndEvents),
00063 moduleDescription_(desc.moduleDescription_),
00064 productRegistry_(createSharedPtrToStatic<ProductRegistry const>(desc.productRegistry_)),
00065 primary_(pset.getParameter<std::string>("@module_label") == std::string("@main_input")),
00066 processGUID_(primary_ ? createGlobalIdentifier() : std::string()),
00067 time_(),
00068 doneReadAhead_(false),
00069 state_(IsInvalid),
00070 runAuxiliary_(),
00071 lumiAuxiliary_(),
00072 statusFileName_() {
00073
00074 if(pset.getUntrackedParameter<bool>("writeStatusFile", false)) {
00075 std::ostringstream statusfilename;
00076 statusfilename << "source_" << getpid();
00077 statusFileName_ = statusfilename.str();
00078 }
00079
00080
00081 if(primary_) {
00082 assert(desc.productRegistry_ != 0);
00083 }
00084 std::string const defaultMode("RunsLumisAndEvents");
00085 std::string const runMode("Runs");
00086 std::string const runLumiMode("RunsAndLumis");
00087
00088
00089
00090
00091
00092
00093 std::string processingMode = pset.getUntrackedParameter<std::string>("processingMode", defaultMode);
00094 if(processingMode == runMode) {
00095 processingMode_ = Runs;
00096 } else if(processingMode == runLumiMode) {
00097 processingMode_ = RunsAndLumis;
00098 } else if(processingMode != defaultMode) {
00099 throw Exception(errors::Configuration)
00100 << "InputSource::InputSource()\n"
00101 << "The 'processingMode' parameter for sources has an illegal value '" << processingMode << "'\n"
00102 << "Legal values are '" << defaultMode << "', '" << runLumiMode << "', or '" << runMode << "'.\n";
00103 }
00104 }
00105
00106 InputSource::~InputSource() {}
00107
00108 void
00109 InputSource::fillDescriptions(ConfigurationDescriptions& descriptions) {
00110 ParameterSetDescription desc;
00111 desc.setUnknown();
00112 descriptions.addDefault(desc);
00113 }
00114
00115 void
00116 InputSource::prevalidate(ConfigurationDescriptions& ) {
00117 }
00118
00119
00120 static std::string const kBaseType("Source");
00121
00122 std::string const&
00123 InputSource::baseType() {
00124 return kBaseType;
00125 }
00126
00127 void
00128 InputSource::fillDescription(ParameterSetDescription& desc) {
00129 std::string defaultString("RunsLumisAndEvents");
00130 desc.addUntracked<std::string>("processingMode", defaultString)->setComment(
00131 "'RunsLumisAndEvents': process runs, lumis, and events.\n"
00132 "'RunsAndLumis': process runs and lumis (not events).\n"
00133 "'Runs': process runs (not lumis or events).");
00134 desc.addUntracked<bool>("writeStatusFile", false)->setComment("Write a status file. Intended for use by workflow management.");
00135 }
00136
00137 EventPrincipal*
00138 InputSource::eventPrincipalCache() {
00139 return &principalCache().eventPrincipal();
00140 }
00141
00142
00143
00144
00145
00146
00147
00148
00149
00150 InputSource::ItemType
00151 InputSource::nextItemType_() {
00152 ItemType itemType = getNextItemType();
00153 if(itemType == IsEvent && processingMode() != RunsLumisAndEvents) {
00154 readEvent_();
00155 return nextItemType_();
00156 }
00157 if(itemType == IsLumi && processingMode() == Runs) {
00158
00159 return nextItemType_();
00160 }
00161 return itemType;
00162 }
00163
00164 InputSource::ItemType
00165 InputSource::nextItemType() {
00166 if(doneReadAhead_) {
00167 return state_;
00168 }
00169 doneReadAhead_ = true;
00170 ItemType oldState = state_;
00171 if(eventLimitReached()) {
00172
00173 state_ = IsStop;
00174 } else if(lumiLimitReached()) {
00175
00176
00177 if(oldState == IsInvalid || oldState == IsFile || oldState == IsRun || processingMode() != RunsLumisAndEvents) {
00178 state_ = IsStop;
00179 } else {
00180 ItemType newState = nextItemType_();
00181 if(newState == IsEvent) {
00182 assert (processingMode() == RunsLumisAndEvents);
00183 state_ = IsEvent;
00184 } else {
00185 state_ = IsStop;
00186 }
00187 }
00188 } else {
00189 ItemType newState = nextItemType_();
00190 if(newState == IsStop) {
00191 state_ = IsStop;
00192 } else if(newState == IsFile || oldState == IsInvalid) {
00193 state_ = IsFile;
00194 } else if(newState == IsRun || oldState == IsFile) {
00195 runAuxiliary_ = readRunAuxiliary();
00196 state_ = IsRun;
00197 } else if(newState == IsLumi || oldState == IsRun) {
00198 assert (processingMode() != Runs);
00199 lumiAuxiliary_ = readLuminosityBlockAuxiliary();
00200 state_ = IsLumi;
00201 } else {
00202 assert (processingMode() == RunsLumisAndEvents);
00203 state_ = IsEvent;
00204 }
00205 }
00206 if(state_ == IsStop) {
00207 lumiAuxiliary_.reset();
00208 runAuxiliary_.reset();
00209 }
00210 return state_;
00211 }
00212
00213 void
00214 InputSource::doBeginJob() {
00215 this->beginJob();
00216 }
00217
00218 void
00219 InputSource::doEndJob() {
00220 endJob();
00221 }
00222
00223 void
00224 InputSource::registerProducts() {
00225 if(!typeLabelList().empty()) {
00226 addToRegistry(typeLabelList().begin(), typeLabelList().end(), moduleDescription(), productRegistryUpdate());
00227 }
00228 }
00229
00230
00231 boost::shared_ptr<FileBlock>
00232 InputSource::readFile() {
00233 assert(doneReadAhead_);
00234 assert(state_ == IsFile);
00235 assert(!limitReached());
00236 doneReadAhead_ = false;
00237 boost::shared_ptr<FileBlock> fb = readFile_();
00238 return fb;
00239 }
00240
00241 void
00242 InputSource::closeFile(boost::shared_ptr<FileBlock> fb) {
00243 fb->close();
00244 closeFile_();
00245 return;
00246 }
00247
00248
00249
00250
00251 boost::shared_ptr<FileBlock>
00252 InputSource::readFile_() {
00253 return boost::shared_ptr<FileBlock>(new FileBlock);
00254 }
00255
00256 boost::shared_ptr<RunPrincipal> const
00257 InputSource::runPrincipal() const {
00258 return principalCache_->runPrincipalPtr();
00259 }
00260
00261 boost::shared_ptr<LuminosityBlockPrincipal> const
00262 InputSource::luminosityBlockPrincipal() const {
00263 return principalCache_->lumiPrincipalPtr();
00264 }
00265
00266 void
00267 InputSource::readAndCacheRun(bool merge, HistoryAppender& historyAppender) {
00268 RunSourceSentry(*this);
00269 if (merge) {
00270 principalCache_->merge(runAuxiliary(), productRegistry_);
00271 } else {
00272 boost::shared_ptr<RunPrincipal> rp(new RunPrincipal(runAuxiliary(), productRegistry_, processConfiguration(), &historyAppender));
00273 principalCache_->insert(rp);
00274 }
00275 readRun_(principalCache_->runPrincipalPtr());
00276 }
00277
00278 int
00279 InputSource::markRun() {
00280 assert(doneReadAhead_);
00281 assert(state_ == IsRun);
00282 assert(!limitReached());
00283 doneReadAhead_ = false;
00284 return principalCache_->runPrincipal().run();
00285 }
00286
00287 void
00288 InputSource::readAndCacheLumi(bool merge, HistoryAppender& historyAppender) {
00289 LumiSourceSentry(*this);
00290 if (merge) {
00291 principalCache_->merge(luminosityBlockAuxiliary(), productRegistry_);
00292 } else {
00293 boost::shared_ptr<LuminosityBlockPrincipal> lb(
00294 new LuminosityBlockPrincipal(luminosityBlockAuxiliary(),
00295 productRegistry_,
00296 processConfiguration(),
00297 principalCache_->runPrincipalPtr(),
00298 &historyAppender));
00299 principalCache_->insert(lb);
00300 }
00301 readLuminosityBlock_(principalCache_->lumiPrincipalPtr());
00302 }
00303
00304 int
00305 InputSource::markLumi() {
00306 assert(doneReadAhead_);
00307 assert(state_ == IsLumi);
00308 assert(!limitReached());
00309 doneReadAhead_ = false;
00310 --remainingLumis_;
00311 assert(principalCache_->lumiPrincipal().luminosityBlock() == luminosityBlockAuxiliary()->luminosityBlock());
00312 return principalCache_->lumiPrincipal().luminosityBlock();
00313 }
00314
00315 boost::shared_ptr<RunPrincipal>
00316 InputSource::readRun_(boost::shared_ptr<RunPrincipal> rpCache) {
00317
00318
00319
00320 rpCache->fillRunPrincipal();
00321 return rpCache;
00322 }
00323
00324 boost::shared_ptr<LuminosityBlockPrincipal>
00325 InputSource::readLuminosityBlock_(boost::shared_ptr<LuminosityBlockPrincipal> lbCache) {
00326 lbCache->fillLuminosityBlockPrincipal();
00327 return lbCache;
00328 }
00329
00330 EventPrincipal*
00331 InputSource::readEvent(boost::shared_ptr<LuminosityBlockPrincipal> lbCache) {
00332 assert(doneReadAhead_);
00333 assert(state_ == IsEvent);
00334 assert(!eventLimitReached());
00335 doneReadAhead_ = false;
00336
00337 EventPrincipal* result = readEvent_();
00338 if(result != 0) {
00339 assert(result->luminosityBlockPrincipalPtrValid());
00340 assert(lbCache->run() == result->run());
00341 assert(lbCache->luminosityBlock() == result->luminosityBlock());
00342 Event event(*result, moduleDescription());
00343 postRead(event);
00344 if(remainingEvents_ > 0) --remainingEvents_;
00345 ++readCount_;
00346 setTimestamp(result->time());
00347 issueReports(result->id());
00348 }
00349 return result;
00350 }
00351
00352 EventPrincipal*
00353 InputSource::readEvent(EventID const& eventID) {
00354 EventPrincipal* result = 0;
00355
00356 if(!limitReached()) {
00357 result = readIt(eventID);
00358 if(result != 0) {
00359 Event event(*result, moduleDescription());
00360 postRead(event);
00361 if(remainingEvents_ > 0) --remainingEvents_;
00362 ++readCount_;
00363 issueReports(result->id());
00364 }
00365 }
00366 return result;
00367 }
00368
00369 void
00370 InputSource::skipEvents(int offset) {
00371 doneReadAhead_ = false;
00372 this->skip(offset);
00373 }
00374
00375 bool
00376 InputSource::goToEvent(EventID const& eventID) {
00377 doneReadAhead_ = false;
00378 return this->goToEvent_(eventID);
00379 }
00380
00381 void
00382 InputSource::issueReports(EventID const& eventID) {
00383 if(isInfoEnabled()) {
00384 LogVerbatim("FwkReport") << "Begin processing the " << readCount_
00385 << suffix(readCount_) << " record. Run " << eventID.run()
00386 << ", Event " << eventID.event()
00387 << ", LumiSection " << eventID.luminosityBlock()
00388 << " at " << std::setprecision(3) << TimeOfDay();
00389 }
00390 if(!statusFileName_.empty()) {
00391 std::ofstream statusFile(statusFileName_.c_str());
00392 statusFile << eventID << " time: " << std::setprecision(3) << TimeOfDay() << '\n';
00393 statusFile.close();
00394 }
00395
00396
00397 }
00398
00399 EventPrincipal*
00400 InputSource::readIt(EventID const&) {
00401 throw Exception(errors::LogicError)
00402 << "InputSource::readIt()\n"
00403 << "Random access is not implemented for this type of Input Source\n"
00404 << "Contact a Framework Developer\n";
00405 }
00406
00407 void
00408 InputSource::setRun(RunNumber_t) {
00409 throw Exception(errors::LogicError)
00410 << "InputSource::setRun()\n"
00411 << "Run number cannot be modified for this type of Input Source\n"
00412 << "Contact a Framework Developer\n";
00413 }
00414
00415 void
00416 InputSource::setLumi(LuminosityBlockNumber_t) {
00417 throw Exception(errors::LogicError)
00418 << "InputSource::setLumi()\n"
00419 << "Luminosity Block ID cannot be modified for this type of Input Source\n"
00420 << "Contact a Framework Developer\n";
00421 }
00422
00423 void
00424 InputSource::skip(int) {
00425 throw Exception(errors::LogicError)
00426 << "InputSource::skip()\n"
00427 << "Random access is not implemented for this type of Input Source\n"
00428 << "Contact a Framework Developer\n";
00429 }
00430
00431 bool
00432 InputSource::goToEvent_(EventID const&) {
00433 throw Exception(errors::LogicError)
00434 << "InputSource::goToEvent_()\n"
00435 << "Random access is not implemented for this type of Input Source\n"
00436 << "Contact a Framework Developer\n";
00437 return true;
00438 }
00439
00440 void
00441 InputSource::rewind_() {
00442 throw Exception(errors::LogicError)
00443 << "InputSource::rewind()\n"
00444 << "Rewind is not implemented for this type of Input Source\n"
00445 << "Contact a Framework Developer\n";
00446 }
00447
00448 void
00449 InputSource::decreaseRemainingEventsBy(int iSkipped) {
00450 if(-1 == remainingEvents_) {
00451 return;
00452 }
00453 if(iSkipped < remainingEvents_) {
00454 remainingEvents_ -= iSkipped;
00455 } else {
00456 remainingEvents_ = 0;
00457 }
00458 }
00459
00460 void
00461 InputSource::postRead(Event& event) {
00462 Service<RandomNumberGenerator> rng;
00463 if(rng.isAvailable()) {
00464 rng->postEventRead(event);
00465 }
00466 }
00467
00468 void
00469 InputSource::doBeginRun(RunPrincipal& rp) {
00470 Run run(rp, moduleDescription());
00471 beginRun(run);
00472 run.commit_();
00473 }
00474
00475 void
00476 InputSource::doEndRun(RunPrincipal& rp) {
00477 rp.setEndTime(time_);
00478 Run run(rp, moduleDescription());
00479 endRun(run);
00480 run.commit_();
00481 }
00482
00483 void
00484 InputSource::doBeginLumi(LuminosityBlockPrincipal& lbp) {
00485 LuminosityBlock lb(lbp, moduleDescription());
00486 beginLuminosityBlock(lb);
00487 lb.commit_();
00488 }
00489
00490 void
00491 InputSource::doEndLumi(LuminosityBlockPrincipal& lbp) {
00492 lbp.setEndTime(time_);
00493 LuminosityBlock lb(lbp, moduleDescription());
00494 endLuminosityBlock(lb);
00495 lb.commit_();
00496 }
00497
00498 void
00499 InputSource::doPreForkReleaseResources() {
00500 preForkReleaseResources();
00501 }
00502
00503 void
00504 InputSource::doPostForkReacquireResources(boost::shared_ptr<multicore::MessageReceiverForSource> iReceiver) {
00505 postForkReacquireResources(iReceiver);
00506 }
00507
00508 void
00509 InputSource::wakeUp_() {}
00510
00511 void
00512 InputSource::beginLuminosityBlock(LuminosityBlock&) {}
00513
00514 void
00515 InputSource::endLuminosityBlock(LuminosityBlock&) {}
00516
00517 void
00518 InputSource::beginRun(Run&) {}
00519
00520 void
00521 InputSource::endRun(Run&) {}
00522
00523 void
00524 InputSource::beginJob() {}
00525
00526 void
00527 InputSource::endJob() {}
00528
00529 void
00530 InputSource::preForkReleaseResources() {}
00531
00532 void
00533 InputSource::postForkReacquireResources(boost::shared_ptr<multicore::MessageReceiverForSource>) {}
00534
00535 bool
00536 InputSource::randomAccess_() const {
00537 return false;
00538 }
00539
00540 ProcessingController::ForwardState
00541 InputSource::forwardState_() const {
00542 return ProcessingController::kUnknownForward;
00543 }
00544
00545 ProcessingController::ReverseState
00546 InputSource::reverseState_() const {
00547 return ProcessingController::kUnknownReverse;
00548 }
00549
00550 ProcessHistoryID const&
00551 InputSource::reducedProcessHistoryID() const {
00552 assert(runAuxiliary());
00553 return ProcessHistoryRegistry::instance()->extra().reduceProcessHistoryID(runAuxiliary()->processHistoryID());
00554 }
00555
00556 RunNumber_t
00557 InputSource::run() const {
00558 assert(runAuxiliary());
00559 return runAuxiliary()->run();
00560 }
00561
00562 LuminosityBlockNumber_t
00563 InputSource::luminosityBlock() const {
00564 assert(luminosityBlockAuxiliary());
00565 return luminosityBlockAuxiliary()->luminosityBlock();
00566 }
00567
00568 InputSource::SourceSentry::SourceSentry(Sig& pre, Sig& post) : post_(post) {
00569 pre();
00570 }
00571
00572 InputSource::SourceSentry::~SourceSentry() {
00573 post_();
00574 }
00575
00576 InputSource::EventSourceSentry::EventSourceSentry(InputSource const& source) :
00577 sentry_(source.actReg()->preSourceSignal_, source.actReg()->postSourceSignal_) {
00578 }
00579
00580 InputSource::LumiSourceSentry::LumiSourceSentry(InputSource const& source) :
00581 sentry_(source.actReg()->preSourceLumiSignal_, source.actReg()->postSourceLumiSignal_) {
00582 }
00583
00584 InputSource::RunSourceSentry::RunSourceSentry(InputSource const& source) :
00585 sentry_(source.actReg()->preSourceRunSignal_, source.actReg()->postSourceRunSignal_) {
00586 }
00587
00588 InputSource::FileOpenSentry::FileOpenSentry(InputSource const& source) :
00589 sentry_(source.actReg()->preOpenFileSignal_, source.actReg()->postOpenFileSignal_) {
00590 }
00591
00592 InputSource::FileCloseSentry::FileCloseSentry(InputSource const& source) :
00593 sentry_(source.actReg()->preCloseFileSignal_, source.actReg()->postCloseFileSignal_) {
00594 }
00595 }