#include <DiskWriter.h>
Writes events to disk
It gets the next event from the StreamQueue and writes it to the appropriate stream file(s) on disk.
Definition at line 41 of file DiskWriter.h.
typedef boost::shared_ptr<StreamHandler> stor::DiskWriter::StreamHandlerPtr [private] |
Definition at line 152 of file DiskWriter.h.
typedef std::vector<StreamHandlerPtr> stor::DiskWriter::StreamHandlers [private] |
Definition at line 153 of file DiskWriter.h.
stor::DiskWriter::DiskWriter | ( | xdaq::Application * | app, |
SharedResourcesPtr | sr | ||
) |
Definition at line 24 of file DiskWriter.cc.
References stor::WorkerThreadParams::DWdeqWaitTime_, sharedResources_, and timeout_.
: app_(app), sharedResources_(sr), dbFileHandler_(new DbFileHandler()), runNumber_(0), lastFileTimeoutCheckTime_(utils::getCurrentTime()), actionIsActive_(true) { WorkerThreadParams workerParams = sharedResources_->configuration_->getWorkerThreadParams(); timeout_ = workerParams.DWdeqWaitTime_; }
stor::DiskWriter::~DiskWriter | ( | ) |
Definition at line 38 of file DiskWriter.cc.
References actionIsActive_, destroyStreams(), and writingWL_.
{ // Stop the activity actionIsActive_ = false; // Cancel the workloop (will wait until the action has finished) writingWL_->cancel(); // Destroy any remaining streams. Under normal conditions, there should be none destroyStreams(); }
stor::DiskWriter::DiskWriter | ( | DiskWriter const & | ) | [private] |
void stor::DiskWriter::checkForFileTimeOuts | ( | const bool | doItNow = false | ) | [private] |
Close old files if fileClosingTestInterval has passed or do it now if argument is true
Definition at line 200 of file DiskWriter.cc.
References closeTimedOutFiles(), dwParams_, stor::DiskWritingParams::fileClosingTestInterval_, stor::utils::getCurrentTime(), lastFileTimeoutCheckTime_, and cmsPerfSuiteHarvest::now.
Referenced by writeNextEvent().
{ utils::TimePoint_t now = utils::getCurrentTime(); if (doItNow || (now - lastFileTimeoutCheckTime_) > dwParams_.fileClosingTestInterval_) { closeTimedOutFiles(now); lastFileTimeoutCheckTime_ = now; } }
void stor::DiskWriter::checkStreamChangeRequest | ( | ) | [private] |
Reconfigure streams if a request is pending
Definition at line 172 of file DiskWriter.cc.
References configureErrorStreams(), configureEventStreams(), dbFileHandler_, destroyStreams(), dwParams_, makeFaultyEventStream(), runNumber_, sharedResources_, and timeout_.
Referenced by writeNextEvent().
{ EvtStrConfigListPtr evtCfgList; ErrStrConfigListPtr errCfgList; DiskWritingParams newdwParams; unsigned int newRunNumber; boost::posix_time::time_duration newTimeoutValue; bool doConfig; if (sharedResources_->diskWriterResources_-> streamChangeRequested(doConfig, evtCfgList, errCfgList, newdwParams, newRunNumber, newTimeoutValue)) { destroyStreams(); if (doConfig) { dwParams_ = newdwParams; runNumber_ = newRunNumber; timeout_ = newTimeoutValue; dbFileHandler_->configure(runNumber_, dwParams_); makeFaultyEventStream(); configureEventStreams(evtCfgList); configureErrorStreams(errCfgList); } sharedResources_->diskWriterResources_->streamChangeDone(); } }
void stor::DiskWriter::closeTimedOutFiles | ( | const utils::TimePoint_t | now | ) | [private] |
Close all timed-out files
Definition at line 212 of file DiskWriter.cc.
References stor::StreamHandler::closeTimedOutFiles(), and streamHandlers_.
Referenced by checkForFileTimeOuts().
{ std::for_each(streamHandlers_.begin(), streamHandlers_.end(), boost::bind(&StreamHandler::closeTimedOutFiles, _1, now)); }
void stor::DiskWriter::configureErrorStreams | ( | ErrStrConfigListPtr | cfgList | ) | [private] |
Configures the error streams to be written to disk
Definition at line 234 of file DiskWriter.cc.
References makeErrorStream().
Referenced by checkStreamChangeRequest().
{ for ( ErrStrConfigList::iterator it = cfgList->begin(), itEnd = cfgList->end(); it != itEnd; ++it ) { makeErrorStream(*it); } }
void stor::DiskWriter::configureEventStreams | ( | EvtStrConfigListPtr | cfgList | ) | [private] |
Configures the event streams to be written to disk
Definition at line 219 of file DiskWriter.cc.
References makeEventStream().
Referenced by checkStreamChangeRequest().
{ for ( EvtStrConfigList::iterator it = cfgList->begin(), itEnd = cfgList->end(); it != itEnd; ++it ) { if ( it->fractionToDisk() > 0 ) makeEventStream(*it); } }
void stor::DiskWriter::destroyStreams | ( | ) | [private] |
Gracefully close all streams
Definition at line 279 of file DiskWriter.cc.
References stor::StreamHandler::closeAllFiles(), reportRemainingLumiSections(), streamHandlers_, and writeEndOfRunMarker().
Referenced by checkStreamChangeRequest(), and ~DiskWriter().
{ if (streamHandlers_.empty()) return; std::for_each(streamHandlers_.begin(), streamHandlers_.end(), boost::bind(&StreamHandler::closeAllFiles, _1)); streamHandlers_.clear(); reportRemainingLumiSections(); writeEndOfRunMarker(); }
void stor::DiskWriter::makeErrorStream | ( | ErrorStreamConfigurationInfo & | streamCfg | ) | [private] |
Creates the handler for the given error event stream
Definition at line 269 of file DiskWriter.cc.
References dbFileHandler_, stor::ErrorStreamConfigurationInfo::setStreamId(), sharedResources_, and streamHandlers_.
Referenced by configureErrorStreams().
{ boost::shared_ptr<FRDStreamHandler> newHandler( new FRDStreamHandler(streamCfg, sharedResources_, dbFileHandler_) ); streamHandlers_.push_back(boost::dynamic_pointer_cast<StreamHandler>(newHandler)); streamCfg.setStreamId(streamHandlers_.size() - 1); }
void stor::DiskWriter::makeEventStream | ( | EventStreamConfigurationInfo & | streamCfg | ) | [private] |
Creates the handler for the given event stream
Definition at line 259 of file DiskWriter.cc.
References dbFileHandler_, stor::EventStreamConfigurationInfo::setStreamId(), sharedResources_, and streamHandlers_.
Referenced by configureEventStreams().
{ boost::shared_ptr<EventStreamHandler> newHandler( new EventStreamHandler(streamCfg, sharedResources_, dbFileHandler_) ); streamHandlers_.push_back(boost::dynamic_pointer_cast<StreamHandler>(newHandler)); streamCfg.setStreamId(streamHandlers_.size() - 1); }
void stor::DiskWriter::makeFaultyEventStream | ( | ) | [private] |
Creates the handler for faulty events detected by the SM
Definition at line 248 of file DiskWriter.cc.
References dbFileHandler_, dwParams_, stor::DiskWritingParams::faultyEventsStream_, sharedResources_, and streamHandlers_.
Referenced by checkStreamChangeRequest().
{ if ( dwParams_.faultyEventsStream_.empty() ) return; boost::shared_ptr<FaultyEventStreamHandler> newHandler( new FaultyEventStreamHandler(sharedResources_, dbFileHandler_, dwParams_.faultyEventsStream_) ); streamHandlers_.push_back(boost::dynamic_pointer_cast<StreamHandler>(newHandler)); }
DiskWriter& stor::DiskWriter::operator= | ( | DiskWriter const & | ) | [private] |
void stor::DiskWriter::processEndOfLumiSection | ( | const I2OChain & | msg | ) | [private] |
Close files at the end of a luminosity section and release message memory:
Definition at line 322 of file DiskWriter.cc.
References dbFileHandler_, stor::I2OChain::faulty(), stor::I2OChain::lumiSection(), stor::I2OChain::runNumber(), runNumber_, and streamHandlers_.
Referenced by writeNextEvent().
{ if ( msg.faulty() || msg.runNumber() != runNumber_ ) return; const uint32_t lumiSection = msg.lumiSection(); std::string fileCountStr; for (StreamHandlers::const_iterator it = streamHandlers_.begin(), itEnd = streamHandlers_.end(); it != itEnd; ++it) { (*it)->closeFilesForLumiSection(lumiSection, fileCountStr); } dbFileHandler_->write(fileCountStr); }
void stor::DiskWriter::reportRemainingLumiSections | ( | ) | const [private] |
Log file statistics for so far unreported lumi sections
Definition at line 292 of file DiskWriter.cc.
References dbFileHandler_, stor::StreamsMonitorCollection::reportAllLumiSectionInfos(), and sharedResources_.
Referenced by destroyStreams().
{ StreamsMonitorCollection& smc = sharedResources_->statisticsReporter_->getStreamsMonitorCollection(); smc.reportAllLumiSectionInfos(dbFileHandler_); }
void stor::DiskWriter::startWorkLoop | ( | std::string | workloopName | ) |
Creates and starts the disk writing workloop
Definition at line 51 of file DiskWriter.cc.
References app_, ExpressReco_HICollisions_FallBack::e, Exception, stor::utils::getIdentifier(), runTheMatrix::msg, writeAction(), and writingWL_.
{ try { std::string identifier = utils::getIdentifier(app_->getApplicationDescriptor()); writingWL_ = toolbox::task::getWorkLoopFactory()-> getWorkLoop( identifier + workloopName, "waiting" ); if ( ! writingWL_->isActive() ) { toolbox::task::ActionSignature* processAction = toolbox::task::bind(this, &DiskWriter::writeAction, identifier + "WriteNextEvent"); writingWL_->submit(processAction); writingWL_->activate(); } } catch (xcept::Exception& e) { std::string msg = "Failed to start workloop 'DiskWriter' with 'writeNextEvent'."; XCEPT_RETHROW(stor::exception::DiskWriting, msg, e); } }
bool stor::DiskWriter::writeAction | ( | toolbox::task::WorkLoop * | ) |
The workloop action taking the next event from the StreamQueue and writing it to disk
Definition at line 78 of file DiskWriter.cc.
References actionIsActive_, ExpressReco_HICollisions_FallBack::e, exception, Exception, sharedResources_, and writeNextEvent().
Referenced by startWorkLoop().
{ std::string errorMsg = "Failed to write an event: "; try { writeNextEvent(); } catch(xcept::Exception &e) { XCEPT_DECLARE_NESTED( stor::exception::DiskWriting, sentinelException, errorMsg, e ); sharedResources_->moveToFailedState(sentinelException); } catch(std::exception &e) { errorMsg += e.what(); XCEPT_DECLARE( stor::exception::DiskWriting, sentinelException, errorMsg ); sharedResources_->moveToFailedState(sentinelException); } catch(...) { errorMsg += "Unknown exception"; XCEPT_DECLARE( stor::exception::DiskWriting, sentinelException, errorMsg ); sharedResources_->moveToFailedState(sentinelException); } return actionIsActive_; }
void stor::DiskWriter::writeEndOfRunMarker | ( | ) | const [private] |
Log end-of-run marker
Definition at line 301 of file DiskWriter.cc.
References stor::MonitorCollection::calculateStatistics(), dbFileHandler_, stor::utils::getCurrentTime(), stor::RunMonitorCollection::getEoLSSeenMQ(), stor::MonitoredQuantity::Stats::getLastSampleValue(), stor::RunMonitorCollection::getLumiSectionsSeenMQ(), stor::MonitoredQuantity::Stats::getSampleCount(), stor::MonitoredQuantity::getStats(), and sharedResources_.
Referenced by destroyStreams().
{ RunMonitorCollection& rmc = sharedResources_->statisticsReporter_->getRunMonitorCollection(); // Make sure we report the latest values rmc.calculateStatistics(utils::getCurrentTime()); MonitoredQuantity::Stats lumiSectionsSeenStats; rmc.getLumiSectionsSeenMQ().getStats(lumiSectionsSeenStats); MonitoredQuantity::Stats eolsSeenStats; rmc.getEoLSSeenMQ().getStats(eolsSeenStats); std::ostringstream str; str << "LScount:" << lumiSectionsSeenStats.getSampleCount() << "\tEoLScount:" << eolsSeenStats.getSampleCount() << "\tLastLumi:" << lumiSectionsSeenStats.getLastSampleValue() << "\tEoR"; dbFileHandler_->write(str.str()); }
void stor::DiskWriter::writeEventToStreams | ( | const I2OChain & | event | ) | [private] |
Writes the event to the appropriate streams
Definition at line 147 of file DiskWriter.cc.
References ExpressReco_HICollisions_FallBack::e, runTheMatrix::msg, streamHandlers_, and hcal_dqm_sourceclient-file_cfg::streams.
Referenced by writeNextEvent().
{ std::vector<StreamID> streams = event.getStreamTags(); for ( std::vector<StreamID>::const_iterator it = streams.begin(), itEnd = streams.end(); it != itEnd; ++it ) { try { streamHandlers_.at(*it)->writeEvent(event); } catch (std::out_of_range& e) { std::ostringstream msg; msg << "Unable to retrieve stream handler for " << (*it) << " : "; msg << e.what(); XCEPT_RAISE(exception::UnknownStreamId, msg.str()); } } }
void stor::DiskWriter::writeNextEvent | ( | ) | [private] |
Takes the event from the stream queue
Definition at line 111 of file DiskWriter.cc.
References checkForFileTimeOuts(), checkStreamChangeRequest(), event(), stor::utils::getCurrentTime(), stor::I2OChain::isEndOfLumiSectionMessage(), stor::I2OChain::memoryUsed(), processEndOfLumiSection(), sharedResources_, timeout_, and writeEventToStreams().
Referenced by writeAction().
{ I2OChain event; StreamQueuePtr sq = sharedResources_->streamQueue_; utils::TimePoint_t startTime = utils::getCurrentTime(); if (sq->deqTimedWait(event, timeout_)) { sharedResources_->diskWriterResources_->setBusy(true); utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime; sharedResources_->statisticsReporter_->getThroughputMonitorCollection().addDiskWriterIdleSample(elapsedTime); sharedResources_->statisticsReporter_->getThroughputMonitorCollection().addPoppedEventSample(event.memoryUsed()); if( event.isEndOfLumiSectionMessage() ) { processEndOfLumiSection( event ); } else { writeEventToStreams( event ); checkForFileTimeOuts(); } } else { utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime; sharedResources_->statisticsReporter_-> getThroughputMonitorCollection().addDiskWriterIdleSample(elapsedTime); checkStreamChangeRequest(); checkForFileTimeOuts(true); sharedResources_->diskWriterResources_->setBusy(false); } }
bool stor::DiskWriter::actionIsActive_ [private] |
Definition at line 156 of file DiskWriter.h.
Referenced by writeAction(), and ~DiskWriter().
xdaq::Application* stor::DiskWriter::app_ [private] |
Definition at line 143 of file DiskWriter.h.
Referenced by startWorkLoop().
const DbFileHandlerPtr stor::DiskWriter::dbFileHandler_ [private] |
Definition at line 146 of file DiskWriter.h.
Referenced by checkStreamChangeRequest(), makeErrorStream(), makeEventStream(), makeFaultyEventStream(), processEndOfLumiSection(), reportRemainingLumiSections(), and writeEndOfRunMarker().
DiskWritingParams stor::DiskWriter::dwParams_ [private] |
Definition at line 145 of file DiskWriter.h.
Referenced by checkForFileTimeOuts(), checkStreamChangeRequest(), and makeFaultyEventStream().
Definition at line 150 of file DiskWriter.h.
Referenced by checkForFileTimeOuts().
unsigned int stor::DiskWriter::runNumber_ [private] |
Definition at line 148 of file DiskWriter.h.
Referenced by checkStreamChangeRequest(), and processEndOfLumiSection().
Definition at line 144 of file DiskWriter.h.
Referenced by checkStreamChangeRequest(), DiskWriter(), makeErrorStream(), makeEventStream(), makeFaultyEventStream(), reportRemainingLumiSections(), writeAction(), writeEndOfRunMarker(), and writeNextEvent().
Definition at line 154 of file DiskWriter.h.
Referenced by closeTimedOutFiles(), destroyStreams(), makeErrorStream(), makeEventStream(), makeFaultyEventStream(), processEndOfLumiSection(), and writeEventToStreams().
boost::posix_time::time_duration stor::DiskWriter::timeout_ [private] |
Definition at line 149 of file DiskWriter.h.
Referenced by checkStreamChangeRequest(), DiskWriter(), and writeNextEvent().
toolbox::task::WorkLoop* stor::DiskWriter::writingWL_ [private] |
Definition at line 157 of file DiskWriter.h.
Referenced by startWorkLoop(), and ~DiskWriter().