#include <DiskWriter.h>
Writes events to disk
It gets the next event from the StreamQueue and writes it to the appropriate stream file(s) on disk.
Definition at line 42 of file DiskWriter.h.
typedef boost::shared_ptr<StreamHandler> stor::DiskWriter::StreamHandlerPtr [private] |
Definition at line 153 of file DiskWriter.h.
typedef std::vector<StreamHandlerPtr> stor::DiskWriter::StreamHandlers [private] |
Definition at line 154 of file DiskWriter.h.
stor::DiskWriter::DiskWriter | ( | xdaq::Application * | app, |
SharedResourcesPtr | sr | ||
) |
Definition at line 25 of file DiskWriter.cc.
References stor::WorkerThreadParams::DWdeqWaitTime_, sharedResources_, and timeout_.
: app_(app), sharedResources_(sr), dbFileHandler_(new DbFileHandler()), runNumber_(0), lastFileTimeoutCheckTime_(utils::getCurrentTime()), endOfRunReport_(new StreamsMonitorCollection::EndOfRunReport()), actionIsActive_(true) { WorkerThreadParams workerParams = sharedResources_->configuration_->getWorkerThreadParams(); timeout_ = workerParams.DWdeqWaitTime_; }
stor::DiskWriter::~DiskWriter | ( | ) |
Definition at line 40 of file DiskWriter.cc.
References actionIsActive_, destroyStreams(), and writingWL_.
{ // Stop the activity actionIsActive_ = false; // Cancel the workloop (will wait until the action has finished) writingWL_->cancel(); // Destroy any remaining streams. Under normal conditions, there should be none destroyStreams(); }
stor::DiskWriter::DiskWriter | ( | DiskWriter const & | ) | [private] |
void stor::DiskWriter::checkForFileTimeOuts | ( | const bool | doItNow = false | ) | [private] |
Close old files if fileClosingTestInterval has passed or do it now if argument is true
Definition at line 202 of file DiskWriter.cc.
References closeTimedOutFiles(), dwParams_, stor::DiskWritingParams::fileClosingTestInterval_, stor::utils::getCurrentTime(), lastFileTimeoutCheckTime_, and cmsPerfSuiteHarvest::now.
Referenced by writeNextEvent().
{ utils::TimePoint_t now = utils::getCurrentTime(); if (doItNow || (now - lastFileTimeoutCheckTime_) > dwParams_.fileClosingTestInterval_) { closeTimedOutFiles(now); lastFileTimeoutCheckTime_ = now; } }
void stor::DiskWriter::checkStreamChangeRequest | ( | ) | [private] |
Reconfigure streams if a request is pending
Definition at line 174 of file DiskWriter.cc.
References configureErrorStreams(), configureEventStreams(), dbFileHandler_, destroyStreams(), dwParams_, makeFaultyEventStream(), runNumber_, sharedResources_, and timeout_.
Referenced by writeNextEvent().
{ EvtStrConfigListPtr evtCfgList; ErrStrConfigListPtr errCfgList; DiskWritingParams newdwParams; unsigned int newRunNumber; boost::posix_time::time_duration newTimeoutValue; bool doConfig; if (sharedResources_->diskWriterResources_-> streamChangeRequested(doConfig, evtCfgList, errCfgList, newdwParams, newRunNumber, newTimeoutValue)) { destroyStreams(); if (doConfig) { dwParams_ = newdwParams; runNumber_ = newRunNumber; timeout_ = newTimeoutValue; dbFileHandler_->configure(runNumber_, dwParams_); makeFaultyEventStream(); configureEventStreams(evtCfgList); configureErrorStreams(errCfgList); } sharedResources_->diskWriterResources_->streamChangeDone(); } }
void stor::DiskWriter::closeTimedOutFiles | ( | const utils::TimePoint_t | now | ) | [private] |
Close all timed-out files
Definition at line 214 of file DiskWriter.cc.
References stor::StreamHandler::closeTimedOutFiles(), and streamHandlers_.
Referenced by checkForFileTimeOuts().
{ std::for_each(streamHandlers_.begin(), streamHandlers_.end(), boost::bind(&StreamHandler::closeTimedOutFiles, _1, now)); }
void stor::DiskWriter::configureErrorStreams | ( | ErrStrConfigListPtr | cfgList | ) | [private] |
Configures the error streams to be written to disk
Definition at line 236 of file DiskWriter.cc.
References makeErrorStream().
Referenced by checkStreamChangeRequest().
{ for ( ErrStrConfigList::iterator it = cfgList->begin(), itEnd = cfgList->end(); it != itEnd; ++it ) { makeErrorStream(*it); } }
void stor::DiskWriter::configureEventStreams | ( | EvtStrConfigListPtr | cfgList | ) | [private] |
Configures the event streams to be written to disk
Definition at line 221 of file DiskWriter.cc.
References makeEventStream().
Referenced by checkStreamChangeRequest().
{ for ( EvtStrConfigList::iterator it = cfgList->begin(), itEnd = cfgList->end(); it != itEnd; ++it ) { if ( it->fractionToDisk() > 0 ) makeEventStream(*it); } }
void stor::DiskWriter::destroyStreams | ( | ) | [private] |
Gracefully close all streams
Definition at line 281 of file DiskWriter.cc.
References stor::StreamHandler::closeAllFiles(), reportRemainingLumiSections(), streamHandlers_, and writeEndOfRunMarker().
Referenced by checkStreamChangeRequest(), and ~DiskWriter().
{ if (streamHandlers_.empty()) return; std::for_each(streamHandlers_.begin(), streamHandlers_.end(), boost::bind(&StreamHandler::closeAllFiles, _1)); streamHandlers_.clear(); reportRemainingLumiSections(); writeEndOfRunMarker(); }
void stor::DiskWriter::makeErrorStream | ( | ErrorStreamConfigurationInfo & | streamCfg | ) | [private] |
Creates the handler for the given error event stream
Definition at line 271 of file DiskWriter.cc.
References dbFileHandler_, stor::ErrorStreamConfigurationInfo::setStreamId(), sharedResources_, and streamHandlers_.
Referenced by configureErrorStreams().
{ boost::shared_ptr<FRDStreamHandler> newHandler( new FRDStreamHandler(streamCfg, sharedResources_, dbFileHandler_) ); streamHandlers_.push_back(boost::dynamic_pointer_cast<StreamHandler>(newHandler)); streamCfg.setStreamId(streamHandlers_.size() - 1); }
void stor::DiskWriter::makeEventStream | ( | EventStreamConfigurationInfo & | streamCfg | ) | [private] |
Creates the handler for the given event stream
Definition at line 261 of file DiskWriter.cc.
References dbFileHandler_, stor::EventStreamConfigurationInfo::setStreamId(), sharedResources_, and streamHandlers_.
Referenced by configureEventStreams().
{ boost::shared_ptr<EventStreamHandler> newHandler( new EventStreamHandler(streamCfg, sharedResources_, dbFileHandler_) ); streamHandlers_.push_back(boost::dynamic_pointer_cast<StreamHandler>(newHandler)); streamCfg.setStreamId(streamHandlers_.size() - 1); }
void stor::DiskWriter::makeFaultyEventStream | ( | ) | [private] |
Creates the handler for faulty events detected by the SM
Definition at line 250 of file DiskWriter.cc.
References dbFileHandler_, dwParams_, stor::DiskWritingParams::faultyEventsStream_, sharedResources_, and streamHandlers_.
Referenced by checkStreamChangeRequest().
{ if ( dwParams_.faultyEventsStream_.empty() ) return; boost::shared_ptr<FaultyEventStreamHandler> newHandler( new FaultyEventStreamHandler(sharedResources_, dbFileHandler_, dwParams_.faultyEventsStream_) ); streamHandlers_.push_back(boost::dynamic_pointer_cast<StreamHandler>(newHandler)); }
DiskWriter& stor::DiskWriter::operator= | ( | DiskWriter const & | ) | [private] |
void stor::DiskWriter::processEndOfLumiSection | ( | const I2OChain & | msg | ) | [private] |
Close files at the end of a luminosity section and release message memory:
Definition at line 315 of file DiskWriter.cc.
References dbFileHandler_, endOfRunReport_, stor::I2OChain::faulty(), stor::I2OChain::lumiSection(), stor::I2OChain::runNumber(), runNumber_, and streamHandlers_.
Referenced by writeNextEvent().
{ if ( msg.faulty() || msg.runNumber() != runNumber_ ) return; const uint32_t lumiSection = msg.lumiSection(); std::string fileCountStr; bool filesWritten = false; for (StreamHandlers::const_iterator it = streamHandlers_.begin(), itEnd = streamHandlers_.end(); it != itEnd; ++it) { if ( (*it)->closeFilesForLumiSection(lumiSection, fileCountStr) ) filesWritten = true; } fileCountStr += "\tEoLS:1"; dbFileHandler_->write(fileCountStr); ++(endOfRunReport_->eolsCount); if (filesWritten) ++(endOfRunReport_->lsCountWithFiles); endOfRunReport_->updateLatestWrittenLumiSection(lumiSection); }
void stor::DiskWriter::reportRemainingLumiSections | ( | ) | [private] |
Log file statistics for so far unreported lumi sections
Definition at line 294 of file DiskWriter.cc.
References dbFileHandler_, endOfRunReport_, stor::StreamsMonitorCollection::reportAllLumiSectionInfos(), and sharedResources_.
Referenced by destroyStreams().
{ StreamsMonitorCollection& smc = sharedResources_->statisticsReporter_->getStreamsMonitorCollection(); smc.reportAllLumiSectionInfos(dbFileHandler_, endOfRunReport_); }
void stor::DiskWriter::startWorkLoop | ( | std::string | workloopName | ) |
Creates and starts the disk writing workloop
Definition at line 53 of file DiskWriter.cc.
References app_, alignCSCRings::e, Exception, stor::utils::getIdentifier(), lumiQueryAPI::msg, writeAction(), and writingWL_.
{ try { std::string identifier = utils::getIdentifier(app_->getApplicationDescriptor()); writingWL_ = toolbox::task::getWorkLoopFactory()-> getWorkLoop( identifier + workloopName, "waiting" ); if ( ! writingWL_->isActive() ) { toolbox::task::ActionSignature* processAction = toolbox::task::bind(this, &DiskWriter::writeAction, identifier + "WriteNextEvent"); writingWL_->submit(processAction); writingWL_->activate(); } } catch (xcept::Exception& e) { std::string msg = "Failed to start workloop 'DiskWriter' with 'writeNextEvent'."; XCEPT_RETHROW(stor::exception::DiskWriting, msg, e); } }
bool stor::DiskWriter::writeAction | ( | toolbox::task::WorkLoop * | ) |
The workloop action taking the next event from the StreamQueue and writing it to disk
Definition at line 80 of file DiskWriter.cc.
References actionIsActive_, alignCSCRings::e, exception, Exception, sharedResources_, and writeNextEvent().
Referenced by startWorkLoop().
{ std::string errorMsg = "Failed to write an event: "; try { writeNextEvent(); } catch(xcept::Exception &e) { XCEPT_DECLARE_NESTED( stor::exception::DiskWriting, sentinelException, errorMsg, e ); sharedResources_->alarmHandler_->moveToFailedState(sentinelException); } catch(std::exception &e) { errorMsg += e.what(); XCEPT_DECLARE( stor::exception::DiskWriting, sentinelException, errorMsg ); sharedResources_->alarmHandler_->moveToFailedState(sentinelException); } catch(...) { errorMsg += "Unknown exception"; XCEPT_DECLARE( stor::exception::DiskWriting, sentinelException, errorMsg ); sharedResources_->alarmHandler_->moveToFailedState(sentinelException); } return actionIsActive_; }
void stor::DiskWriter::writeEndOfRunMarker | ( | ) | [private] |
Log end-of-run marker
Definition at line 303 of file DiskWriter.cc.
References dbFileHandler_, and endOfRunReport_.
Referenced by destroyStreams().
{ std::ostringstream str; str << "LScount:" << endOfRunReport_->lsCountWithFiles << "\tEoLScount:" << endOfRunReport_->eolsCount << "\tLastLumi:" << endOfRunReport_->latestLumiSectionWritten << "\tEoR"; dbFileHandler_->write(str.str()); endOfRunReport_->reset(); }
void stor::DiskWriter::writeEventToStreams | ( | const I2OChain & | event | ) | [private] |
Writes the event to the appropriate streams
Definition at line 149 of file DiskWriter.cc.
References alignCSCRings::e, lumiQueryAPI::msg, streamHandlers_, and hcal_dqm_sourceclient-file_cfg::streams.
Referenced by writeNextEvent().
{ std::vector<StreamID> streams = event.getStreamTags(); for ( std::vector<StreamID>::const_iterator it = streams.begin(), itEnd = streams.end(); it != itEnd; ++it ) { try { streamHandlers_.at(*it)->writeEvent(event); } catch (std::out_of_range& e) { std::ostringstream msg; msg << "Unable to retrieve stream handler for " << (*it) << " : "; msg << e.what(); XCEPT_RAISE(exception::UnknownStreamId, msg.str()); } } }
void stor::DiskWriter::writeNextEvent | ( | ) | [private] |
Takes the event from the stream queue
Definition at line 113 of file DiskWriter.cc.
References checkForFileTimeOuts(), checkStreamChangeRequest(), event(), stor::utils::getCurrentTime(), stor::I2OChain::isEndOfLumiSectionMessage(), stor::I2OChain::memoryUsed(), processEndOfLumiSection(), sharedResources_, timeout_, and writeEventToStreams().
Referenced by writeAction().
{ I2OChain event; StreamQueuePtr sq = sharedResources_->streamQueue_; utils::TimePoint_t startTime = utils::getCurrentTime(); if (sq->deqTimedWait(event, timeout_)) { sharedResources_->diskWriterResources_->setBusy(true); utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime; sharedResources_->statisticsReporter_->getThroughputMonitorCollection().addDiskWriterIdleSample(elapsedTime); sharedResources_->statisticsReporter_->getThroughputMonitorCollection().addPoppedEventSample(event.memoryUsed()); if( event.isEndOfLumiSectionMessage() ) { processEndOfLumiSection( event ); } else { writeEventToStreams( event ); checkForFileTimeOuts(); } } else { utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime; sharedResources_->statisticsReporter_-> getThroughputMonitorCollection().addDiskWriterIdleSample(elapsedTime); checkStreamChangeRequest(); checkForFileTimeOuts(true); sharedResources_->diskWriterResources_->setBusy(false); } }
bool stor::DiskWriter::actionIsActive_ [private] |
Definition at line 159 of file DiskWriter.h.
Referenced by writeAction(), and ~DiskWriter().
xdaq::Application* stor::DiskWriter::app_ [private] |
Definition at line 144 of file DiskWriter.h.
Referenced by startWorkLoop().
const DbFileHandlerPtr stor::DiskWriter::dbFileHandler_ [private] |
Definition at line 147 of file DiskWriter.h.
Referenced by checkStreamChangeRequest(), makeErrorStream(), makeEventStream(), makeFaultyEventStream(), processEndOfLumiSection(), reportRemainingLumiSections(), and writeEndOfRunMarker().
DiskWritingParams stor::DiskWriter::dwParams_ [private] |
Definition at line 146 of file DiskWriter.h.
Referenced by checkForFileTimeOuts(), checkStreamChangeRequest(), and makeFaultyEventStream().
Definition at line 157 of file DiskWriter.h.
Referenced by processEndOfLumiSection(), reportRemainingLumiSections(), and writeEndOfRunMarker().
Definition at line 151 of file DiskWriter.h.
Referenced by checkForFileTimeOuts().
unsigned int stor::DiskWriter::runNumber_ [private] |
Definition at line 149 of file DiskWriter.h.
Referenced by checkStreamChangeRequest(), and processEndOfLumiSection().
Definition at line 145 of file DiskWriter.h.
Referenced by checkStreamChangeRequest(), DiskWriter(), makeErrorStream(), makeEventStream(), makeFaultyEventStream(), reportRemainingLumiSections(), writeAction(), and writeNextEvent().
Definition at line 155 of file DiskWriter.h.
Referenced by closeTimedOutFiles(), destroyStreams(), makeErrorStream(), makeEventStream(), makeFaultyEventStream(), processEndOfLumiSection(), and writeEventToStreams().
boost::posix_time::time_duration stor::DiskWriter::timeout_ [private] |
Definition at line 150 of file DiskWriter.h.
Referenced by checkStreamChangeRequest(), DiskWriter(), and writeNextEvent().
toolbox::task::WorkLoop* stor::DiskWriter::writingWL_ [private] |
Definition at line 160 of file DiskWriter.h.
Referenced by startWorkLoop(), and ~DiskWriter().