![]() |
![]() |
#include <DiskWriter.h>
Writes events to disk
It gets the next event from the StreamQueue and writes it to the appropriate stream file(s) on disk.
Definition at line 42 of file DiskWriter.h.
typedef boost::shared_ptr<StreamHandler> stor::DiskWriter::StreamHandlerPtr [private] |
Definition at line 153 of file DiskWriter.h.
typedef std::vector<StreamHandlerPtr> stor::DiskWriter::StreamHandlers [private] |
Definition at line 154 of file DiskWriter.h.
stor::DiskWriter::DiskWriter | ( | xdaq::Application * | app, |
SharedResourcesPtr | sr | ||
) |
Definition at line 24 of file DiskWriter.cc.
References stor::WorkerThreadParams::DWdeqWaitTime_, sharedResources_, and timeout_.
: app_(app), sharedResources_(sr), dbFileHandler_(new DbFileHandler()), runNumber_(0), lastFileTimeoutCheckTime_(utils::getCurrentTime()), endOfRunReport_(new StreamsMonitorCollection::EndOfRunReport()), actionIsActive_(true) { WorkerThreadParams workerParams = sharedResources_->configuration_->getWorkerThreadParams(); timeout_ = workerParams.DWdeqWaitTime_; }
stor::DiskWriter::~DiskWriter | ( | ) |
Definition at line 39 of file DiskWriter.cc.
References actionIsActive_, destroyStreams(), and writingWL_.
{ // Stop the activity actionIsActive_ = false; // Cancel the workloop (will wait until the action has finished) writingWL_->cancel(); // Destroy any remaining streams. Under normal conditions, there should be none destroyStreams(); }
stor::DiskWriter::DiskWriter | ( | DiskWriter const & | ) | [private] |
void stor::DiskWriter::checkForFileTimeOuts | ( | const bool | doItNow = false | ) | [private] |
Close old files if fileClosingTestInterval has passed or do it now if argument is true
Definition at line 201 of file DiskWriter.cc.
References closeTimedOutFiles(), dwParams_, stor::DiskWritingParams::fileClosingTestInterval_, stor::utils::getCurrentTime(), lastFileTimeoutCheckTime_, and cmsPerfSuiteHarvest::now.
Referenced by writeNextEvent().
{ utils::TimePoint_t now = utils::getCurrentTime(); if (doItNow || (now - lastFileTimeoutCheckTime_) > dwParams_.fileClosingTestInterval_) { closeTimedOutFiles(now); lastFileTimeoutCheckTime_ = now; } }
void stor::DiskWriter::checkStreamChangeRequest | ( | ) | [private] |
Reconfigure streams if a request is pending
Definition at line 173 of file DiskWriter.cc.
References configureErrorStreams(), configureEventStreams(), dbFileHandler_, destroyStreams(), dwParams_, makeFaultyEventStream(), runNumber_, sharedResources_, and timeout_.
Referenced by writeNextEvent().
{ EvtStrConfigListPtr evtCfgList; ErrStrConfigListPtr errCfgList; DiskWritingParams newdwParams; unsigned int newRunNumber; boost::posix_time::time_duration newTimeoutValue; bool doConfig; if (sharedResources_->diskWriterResources_-> streamChangeRequested(doConfig, evtCfgList, errCfgList, newdwParams, newRunNumber, newTimeoutValue)) { destroyStreams(); if (doConfig) { dwParams_ = newdwParams; runNumber_ = newRunNumber; timeout_ = newTimeoutValue; dbFileHandler_->configure(runNumber_, dwParams_); makeFaultyEventStream(); configureEventStreams(evtCfgList); configureErrorStreams(errCfgList); } sharedResources_->diskWriterResources_->streamChangeDone(); } }
void stor::DiskWriter::closeTimedOutFiles | ( | const utils::TimePoint_t | now | ) | [private] |
Close all timed-out files
Definition at line 213 of file DiskWriter.cc.
References stor::StreamHandler::closeTimedOutFiles(), and streamHandlers_.
Referenced by checkForFileTimeOuts().
{ std::for_each(streamHandlers_.begin(), streamHandlers_.end(), boost::bind(&StreamHandler::closeTimedOutFiles, _1, now)); }
void stor::DiskWriter::configureErrorStreams | ( | ErrStrConfigListPtr | cfgList | ) | [private] |
Configures the error streams to be written to disk
Definition at line 235 of file DiskWriter.cc.
References makeErrorStream().
Referenced by checkStreamChangeRequest().
{ for ( ErrStrConfigList::iterator it = cfgList->begin(), itEnd = cfgList->end(); it != itEnd; ++it ) { makeErrorStream(*it); } }
void stor::DiskWriter::configureEventStreams | ( | EvtStrConfigListPtr | cfgList | ) | [private] |
Configures the event streams to be written to disk
Definition at line 220 of file DiskWriter.cc.
References makeEventStream().
Referenced by checkStreamChangeRequest().
{ for ( EvtStrConfigList::iterator it = cfgList->begin(), itEnd = cfgList->end(); it != itEnd; ++it ) { if ( it->fractionToDisk() > 0 ) makeEventStream(*it); } }
void stor::DiskWriter::destroyStreams | ( | ) | [private] |
Gracefully close all streams
Definition at line 280 of file DiskWriter.cc.
References stor::StreamHandler::closeAllFiles(), reportRemainingLumiSections(), streamHandlers_, and writeEndOfRunMarker().
Referenced by checkStreamChangeRequest(), and ~DiskWriter().
{ if (streamHandlers_.empty()) return; std::for_each(streamHandlers_.begin(), streamHandlers_.end(), boost::bind(&StreamHandler::closeAllFiles, _1)); streamHandlers_.clear(); reportRemainingLumiSections(); writeEndOfRunMarker(); }
void stor::DiskWriter::makeErrorStream | ( | ErrorStreamConfigurationInfo & | streamCfg | ) | [private] |
Creates the handler for the given error event stream
Definition at line 270 of file DiskWriter.cc.
References dbFileHandler_, stor::ErrorStreamConfigurationInfo::setStreamId(), sharedResources_, and streamHandlers_.
Referenced by configureErrorStreams().
{ boost::shared_ptr<FRDStreamHandler> newHandler( new FRDStreamHandler(streamCfg, sharedResources_, dbFileHandler_) ); streamHandlers_.push_back(boost::dynamic_pointer_cast<StreamHandler>(newHandler)); streamCfg.setStreamId(streamHandlers_.size() - 1); }
void stor::DiskWriter::makeEventStream | ( | EventStreamConfigurationInfo & | streamCfg | ) | [private] |
Creates the handler for the given event stream
Definition at line 260 of file DiskWriter.cc.
References dbFileHandler_, stor::EventStreamConfigurationInfo::setStreamId(), sharedResources_, and streamHandlers_.
Referenced by configureEventStreams().
{ boost::shared_ptr<EventStreamHandler> newHandler( new EventStreamHandler(streamCfg, sharedResources_, dbFileHandler_) ); streamHandlers_.push_back(boost::dynamic_pointer_cast<StreamHandler>(newHandler)); streamCfg.setStreamId(streamHandlers_.size() - 1); }
void stor::DiskWriter::makeFaultyEventStream | ( | ) | [private] |
Creates the handler for faulty events detected by the SM
Definition at line 249 of file DiskWriter.cc.
References dbFileHandler_, dwParams_, stor::DiskWritingParams::faultyEventsStream_, sharedResources_, and streamHandlers_.
Referenced by checkStreamChangeRequest().
{ if ( dwParams_.faultyEventsStream_.empty() ) return; boost::shared_ptr<FaultyEventStreamHandler> newHandler( new FaultyEventStreamHandler(sharedResources_, dbFileHandler_, dwParams_.faultyEventsStream_) ); streamHandlers_.push_back(boost::dynamic_pointer_cast<StreamHandler>(newHandler)); }
DiskWriter& stor::DiskWriter::operator= | ( | DiskWriter const & | ) | [private] |
void stor::DiskWriter::processEndOfLumiSection | ( | const I2OChain & | msg | ) | [private] |
Close files at the end of a luminosity section and release message memory:
Definition at line 314 of file DiskWriter.cc.
References dbFileHandler_, endOfRunReport_, stor::I2OChain::faulty(), stor::I2OChain::lumiSection(), stor::I2OChain::runNumber(), runNumber_, and streamHandlers_.
Referenced by writeNextEvent().
{ if ( msg.faulty() || msg.runNumber() != runNumber_ ) return; const uint32_t lumiSection = msg.lumiSection(); std::string fileCountStr; bool filesWritten = false; for (StreamHandlers::const_iterator it = streamHandlers_.begin(), itEnd = streamHandlers_.end(); it != itEnd; ++it) { if ( (*it)->closeFilesForLumiSection(lumiSection, fileCountStr) ) filesWritten = true; } fileCountStr += "\tEoLS:1"; dbFileHandler_->write(fileCountStr); ++(endOfRunReport_->eolsCount); if (filesWritten) ++(endOfRunReport_->lsCountWithFiles); endOfRunReport_->updateLatestWrittenLumiSection(lumiSection); }
void stor::DiskWriter::reportRemainingLumiSections | ( | ) | [private] |
Log file statistics for so far unreported lumi sections
Definition at line 293 of file DiskWriter.cc.
References dbFileHandler_, endOfRunReport_, stor::StreamsMonitorCollection::reportAllLumiSectionInfos(), and sharedResources_.
Referenced by destroyStreams().
{ StreamsMonitorCollection& smc = sharedResources_->statisticsReporter_->getStreamsMonitorCollection(); smc.reportAllLumiSectionInfos(dbFileHandler_, endOfRunReport_); }
void stor::DiskWriter::startWorkLoop | ( | std::string | workloopName | ) |
Creates and starts the disk writing workloop
Definition at line 52 of file DiskWriter.cc.
References app_, Exception, stor::utils::getIdentifier(), runTheMatrix::msg, writeAction(), and writingWL_.
{ try { std::string identifier = utils::getIdentifier(app_->getApplicationDescriptor()); writingWL_ = toolbox::task::getWorkLoopFactory()-> getWorkLoop( identifier + workloopName, "waiting" ); if ( ! writingWL_->isActive() ) { toolbox::task::ActionSignature* processAction = toolbox::task::bind(this, &DiskWriter::writeAction, identifier + "WriteNextEvent"); writingWL_->submit(processAction); writingWL_->activate(); } } catch (xcept::Exception& e) { std::string msg = "Failed to start workloop 'DiskWriter' with 'writeNextEvent'."; XCEPT_RETHROW(stor::exception::DiskWriting, msg, e); } }
bool stor::DiskWriter::writeAction | ( | toolbox::task::WorkLoop * | ) |
The workloop action taking the next event from the StreamQueue and writing it to disk
Definition at line 79 of file DiskWriter.cc.
References actionIsActive_, exception, Exception, sharedResources_, and writeNextEvent().
Referenced by startWorkLoop().
{ std::string errorMsg = "Failed to write an event: "; try { writeNextEvent(); } catch(xcept::Exception &e) { XCEPT_DECLARE_NESTED( stor::exception::DiskWriting, sentinelException, errorMsg, e ); sharedResources_->moveToFailedState(sentinelException); } catch(std::exception &e) { errorMsg += e.what(); XCEPT_DECLARE( stor::exception::DiskWriting, sentinelException, errorMsg ); sharedResources_->moveToFailedState(sentinelException); } catch(...) { errorMsg += "Unknown exception"; XCEPT_DECLARE( stor::exception::DiskWriting, sentinelException, errorMsg ); sharedResources_->moveToFailedState(sentinelException); } return actionIsActive_; }
void stor::DiskWriter::writeEndOfRunMarker | ( | ) | [private] |
Log end-of-run marker
Definition at line 302 of file DiskWriter.cc.
References dbFileHandler_, and endOfRunReport_.
Referenced by destroyStreams().
{ std::ostringstream str; str << "LScount:" << endOfRunReport_->lsCountWithFiles << "\tEoLScount:" << endOfRunReport_->eolsCount << "\tLastLumi:" << endOfRunReport_->latestLumiSectionWritten << "\tEoR"; dbFileHandler_->write(str.str()); endOfRunReport_->reset(); }
void stor::DiskWriter::writeEventToStreams | ( | const I2OChain & | event | ) | [private] |
Writes the event to the appropriate streams
Definition at line 148 of file DiskWriter.cc.
References runTheMatrix::msg, streamHandlers_, and hcal_dqm_sourceclient-file_cfg::streams.
Referenced by writeNextEvent().
{ std::vector<StreamID> streams = event.getStreamTags(); for ( std::vector<StreamID>::const_iterator it = streams.begin(), itEnd = streams.end(); it != itEnd; ++it ) { try { streamHandlers_.at(*it)->writeEvent(event); } catch (std::out_of_range& e) { std::ostringstream msg; msg << "Unable to retrieve stream handler for " << (*it) << " : "; msg << e.what(); XCEPT_RAISE(exception::UnknownStreamId, msg.str()); } } }
void stor::DiskWriter::writeNextEvent | ( | ) | [private] |
Takes the event from the stream queue
Definition at line 112 of file DiskWriter.cc.
References checkForFileTimeOuts(), checkStreamChangeRequest(), event(), stor::utils::getCurrentTime(), stor::I2OChain::isEndOfLumiSectionMessage(), stor::I2OChain::memoryUsed(), processEndOfLumiSection(), sharedResources_, timeout_, and writeEventToStreams().
Referenced by writeAction().
{ I2OChain event; StreamQueuePtr sq = sharedResources_->streamQueue_; utils::TimePoint_t startTime = utils::getCurrentTime(); if (sq->deqTimedWait(event, timeout_)) { sharedResources_->diskWriterResources_->setBusy(true); utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime; sharedResources_->statisticsReporter_->getThroughputMonitorCollection().addDiskWriterIdleSample(elapsedTime); sharedResources_->statisticsReporter_->getThroughputMonitorCollection().addPoppedEventSample(event.memoryUsed()); if( event.isEndOfLumiSectionMessage() ) { processEndOfLumiSection( event ); } else { writeEventToStreams( event ); checkForFileTimeOuts(); } } else { utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime; sharedResources_->statisticsReporter_-> getThroughputMonitorCollection().addDiskWriterIdleSample(elapsedTime); checkStreamChangeRequest(); checkForFileTimeOuts(true); sharedResources_->diskWriterResources_->setBusy(false); } }
bool stor::DiskWriter::actionIsActive_ [private] |
Definition at line 159 of file DiskWriter.h.
Referenced by writeAction(), and ~DiskWriter().
xdaq::Application* stor::DiskWriter::app_ [private] |
Definition at line 144 of file DiskWriter.h.
Referenced by startWorkLoop().
const DbFileHandlerPtr stor::DiskWriter::dbFileHandler_ [private] |
Definition at line 147 of file DiskWriter.h.
Referenced by checkStreamChangeRequest(), makeErrorStream(), makeEventStream(), makeFaultyEventStream(), processEndOfLumiSection(), reportRemainingLumiSections(), and writeEndOfRunMarker().
DiskWritingParams stor::DiskWriter::dwParams_ [private] |
Definition at line 146 of file DiskWriter.h.
Referenced by checkForFileTimeOuts(), checkStreamChangeRequest(), and makeFaultyEventStream().
Definition at line 157 of file DiskWriter.h.
Referenced by processEndOfLumiSection(), reportRemainingLumiSections(), and writeEndOfRunMarker().
Definition at line 151 of file DiskWriter.h.
Referenced by checkForFileTimeOuts().
unsigned int stor::DiskWriter::runNumber_ [private] |
Definition at line 149 of file DiskWriter.h.
Referenced by checkStreamChangeRequest(), and processEndOfLumiSection().
Definition at line 145 of file DiskWriter.h.
Referenced by checkStreamChangeRequest(), DiskWriter(), makeErrorStream(), makeEventStream(), makeFaultyEventStream(), reportRemainingLumiSections(), writeAction(), and writeNextEvent().
Definition at line 155 of file DiskWriter.h.
Referenced by closeTimedOutFiles(), destroyStreams(), makeErrorStream(), makeEventStream(), makeFaultyEventStream(), processEndOfLumiSection(), and writeEventToStreams().
boost::posix_time::time_duration stor::DiskWriter::timeout_ [private] |
Definition at line 150 of file DiskWriter.h.
Referenced by checkStreamChangeRequest(), DiskWriter(), and writeNextEvent().
toolbox::task::WorkLoop* stor::DiskWriter::writingWL_ [private] |
Definition at line 160 of file DiskWriter.h.
Referenced by startWorkLoop(), and ~DiskWriter().