#include <DQMEventProcessor.h>
Public Member Functions | |
DQMEventProcessor (xdaq::Application *, SharedResourcesPtr sr) | |
bool | processDQMEvents (toolbox::task::WorkLoop *) |
void | startWorkLoop (std::string workloopName) |
~DQMEventProcessor () | |
Private Member Functions | |
DQMEventProcessor (DQMEventProcessor const &) | |
void | endOfRun () |
DQMEventProcessor & | operator= (DQMEventProcessor const &) |
void | processNextDQMEvent () |
Private Attributes | |
bool | actionIsActive_ |
xdaq::Application * | app_ |
unsigned int | discardDQMUpdatesForOlderLS_ |
DQMEventStore< I2OChain, InitMsgCollection, SharedResources > | dqmEventStore_ |
uint32_t | latestLumiSection_ |
toolbox::task::WorkLoop * | processWL_ |
SharedResourcesPtr | sharedResources_ |
boost::posix_time::time_duration | timeout_ |
Processes the DQM event (histograms)
It retrieves the next DQM event from the DQMEventQueue, adds up the histograms belonging to one lumi-section, and puts it into the appropriate DQMConsumerQueues. Depending on the configuration, it also writes the histograms to disk every N lumi-sections.
Definition at line 38 of file DQMEventProcessor.h.
stor::DQMEventProcessor::DQMEventProcessor | ( | xdaq::Application * | app, |
SharedResourcesPtr | sr | ||
) |
Definition at line 34 of file DQMEventProcessor.cc.
References stor::WorkerThreadParams::DQMEPdeqWaitTime_, sharedResources_, and timeout_.
: app_(app), sharedResources_(sr), actionIsActive_(true), latestLumiSection_(0), discardDQMUpdatesForOlderLS_(0), dqmEventStore_ ( app->getApplicationDescriptor(), sr->dqmEventQueueCollection_, sr->statisticsReporter_->getDQMEventMonitorCollection(), sr->initMsgCollection_.get(), &stor::InitMsgCollection::maxMsgCount, sr.get(), &stor::SharedResources::moveToFailedState, sr->statisticsReporter_->alarmHandler() ) { WorkerThreadParams workerParams = sharedResources_->configuration_->getWorkerThreadParams(); timeout_ = workerParams.DQMEPdeqWaitTime_; }
stor::DQMEventProcessor::~DQMEventProcessor | ( | ) |
Definition at line 58 of file DQMEventProcessor.cc.
References actionIsActive_, and processWL_.
{ // Stop the activity actionIsActive_ = false; // Cancel the workloop (will wait until the action has finished) processWL_->cancel(); }
stor::DQMEventProcessor::DQMEventProcessor | ( | DQMEventProcessor const & | ) | [private] |
void stor::DQMEventProcessor::endOfRun | ( | ) | [private] |
Purge instances and process all completed DQM records
Definition at line 191 of file DQMEventProcessor.cc.
References dqmEventStore_, latestLumiSection_, and stor::DQMEventStore< EventType, ConnectionType, StateMachineType >::purge().
Referenced by processNextDQMEvent().
{ dqmEventStore_.purge(); latestLumiSection_ = 0; }
DQMEventProcessor& stor::DQMEventProcessor::operator= | ( | DQMEventProcessor const & | ) | [private] |
bool stor::DQMEventProcessor::processDQMEvents | ( | toolbox::task::WorkLoop * | ) |
The workloop action taking the next DQM event from the DQMEventQueue, processes it, and puts it into the appropriate DQMConsumerQueues when the lumi-section has finished.
Definition at line 95 of file DQMEventProcessor.cc.
References actionIsActive_, exception, Exception, processNextDQMEvent(), and sharedResources_.
Referenced by startWorkLoop().
{ std::string errorMsg = "Failed to process a DQM event: "; try { processNextDQMEvent(); } catch(xcept::Exception &e) { XCEPT_DECLARE_NESTED( stor::exception::DQMEventProcessing, sentinelException, errorMsg, e ); sharedResources_->moveToFailedState(sentinelException); } catch(std::exception &e) { errorMsg += e.what(); XCEPT_DECLARE( stor::exception::DQMEventProcessing, sentinelException, errorMsg ); sharedResources_->moveToFailedState(sentinelException); } catch(...) { errorMsg += "Unknown exception"; XCEPT_DECLARE( stor::exception::DQMEventProcessing, sentinelException, errorMsg ); sharedResources_->moveToFailedState(sentinelException); } return actionIsActive_; }
void stor::DQMEventProcessor::processNextDQMEvent | ( | ) | [private] |
Pops the next DQM event from the DQMEventQueue and adds it to the DQMStore
Definition at line 128 of file DQMEventProcessor.cc.
References stor::DQMEventStore< EventType, ConnectionType, StateMachineType >::addDQMEvent(), stor::DQMEventStore< EventType, ConnectionType, StateMachineType >::clear(), stor::DQMEventProcessorResources::Requests::configuration, stor::DQMProcessingParams::discardDQMUpdatesForOlderLS_, discardDQMUpdatesForOlderLS_, dqmEventStore_, endOfRun(), stor::DQMEventProcessorResources::Requests::endOfRun, python::Vispa::Plugins::EdmBrowser::EdmDataAccessor::eq(), stor::utils::getCurrentTime(), latestLumiSection_, max(), stor::DQMEventStore< EventType, ConnectionType, StateMachineType >::setParameters(), sharedResources_, stor::DQMEventProcessorResources::Requests::storeDestruction, and timeout_.
Referenced by processDQMEvents().
{ DQMEventQueue::ValueType dqmEvent; DQMEventQueuePtr eq = sharedResources_->dqmEventQueue_; utils::TimePoint_t startTime = utils::getCurrentTime(); if (eq->deqTimedWait(dqmEvent, timeout_)) { utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime; sharedResources_->statisticsReporter_->getThroughputMonitorCollection(). addDQMEventProcessorIdleSample(elapsedTime); if ( (discardDQMUpdatesForOlderLS_ > 0) && (dqmEvent.first.lumiSection() + discardDQMUpdatesForOlderLS_ < latestLumiSection_) ) // subtracting unsigned quantities might not yield the right result! { // discard very old LS sharedResources_->statisticsReporter_->getDQMEventMonitorCollection(). getDroppedDQMEventCountsMQ().addSample(dqmEvent.second + 1); } else { sharedResources_->statisticsReporter_->getThroughputMonitorCollection(). addPoppedDQMEventSample(dqmEvent.first.memoryUsed()); sharedResources_->statisticsReporter_->getDQMEventMonitorCollection(). getDroppedDQMEventCountsMQ().addSample(dqmEvent.second); latestLumiSection_ = std::max(latestLumiSection_, dqmEvent.first.lumiSection()); dqmEventStore_.addDQMEvent(dqmEvent.first); } } else { utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime; sharedResources_->statisticsReporter_->getThroughputMonitorCollection(). addDQMEventProcessorIdleSample(elapsedTime); } DQMEventProcessorResources::Requests requests; DQMProcessingParams dqmParams; boost::posix_time::time_duration newTimeoutValue; if (sharedResources_->dqmEventProcessorResources_-> getRequests(requests, dqmParams, newTimeoutValue)) { if (requests.configuration) { timeout_ = newTimeoutValue; dqmEventStore_.setParameters(dqmParams); discardDQMUpdatesForOlderLS_ = dqmParams.discardDQMUpdatesForOlderLS_; } if (requests.endOfRun) { endOfRun(); } if (requests.storeDestruction) { dqmEventStore_.clear(); } sharedResources_->dqmEventProcessorResources_->requestsDone(); } }
void stor::DQMEventProcessor::startWorkLoop | ( | std::string | workloopName | ) |
Creates and starts the DQM event processing workloop
Definition at line 68 of file DQMEventProcessor.cc.
References app_, Exception, stor::utils::getIdentifier(), runTheMatrix::msg, processDQMEvents(), and processWL_.
{ try { std::string identifier = utils::getIdentifier(app_->getApplicationDescriptor()); processWL_ = toolbox::task::getWorkLoopFactory()-> getWorkLoop( identifier + workloopName, "waiting" ); if ( ! processWL_->isActive() ) { toolbox::task::ActionSignature* processAction = toolbox::task::bind(this, &DQMEventProcessor::processDQMEvents, identifier + "ProcessNextDQMEvent"); processWL_->submit(processAction); processWL_->activate(); } } catch (xcept::Exception& e) { std::string msg = "Failed to start workloop 'DQMEventProcessor' with 'processNextDQMEvent'."; XCEPT_RETHROW(stor::exception::DQMEventProcessing, msg, e); } }
bool stor::DQMEventProcessor::actionIsActive_ [private] |
Definition at line 82 of file DQMEventProcessor.h.
Referenced by processDQMEvents(), and ~DQMEventProcessor().
xdaq::Application* stor::DQMEventProcessor::app_ [private] |
Definition at line 78 of file DQMEventProcessor.h.
Referenced by startWorkLoop().
unsigned int stor::DQMEventProcessor::discardDQMUpdatesForOlderLS_ [private] |
Definition at line 84 of file DQMEventProcessor.h.
Referenced by processNextDQMEvent().
DQMEventStore<I2OChain,InitMsgCollection,SharedResources> stor::DQMEventProcessor::dqmEventStore_ [private] |
Definition at line 88 of file DQMEventProcessor.h.
Referenced by endOfRun(), and processNextDQMEvent().
uint32_t stor::DQMEventProcessor::latestLumiSection_ [private] |
Definition at line 83 of file DQMEventProcessor.h.
Referenced by endOfRun(), and processNextDQMEvent().
toolbox::task::WorkLoop* stor::DQMEventProcessor::processWL_ [private] |
Definition at line 86 of file DQMEventProcessor.h.
Referenced by startWorkLoop(), and ~DQMEventProcessor().
Definition at line 79 of file DQMEventProcessor.h.
Referenced by DQMEventProcessor(), processDQMEvents(), and processNextDQMEvent().
boost::posix_time::time_duration stor::DQMEventProcessor::timeout_ [private] |
Definition at line 81 of file DQMEventProcessor.h.
Referenced by DQMEventProcessor(), and processNextDQMEvent().