#include <FragmentProcessor.h>
Public Member Functions | |
FragmentProcessor (xdaq::Application *app, SharedResourcesPtr sr) | |
bool | processMessages (toolbox::task::WorkLoop *) |
void | startWorkLoop (std::string workloopName) |
~FragmentProcessor () | |
Private Member Functions | |
void | processAllCommands () |
void | processAllRegistrations () |
void | processOneFragment () |
void | processOneFragmentIfPossible () |
Private Attributes | |
bool | actionIsActive_ |
xdaq::Application * | app_ |
EventDistributor | eventDistributor_ |
FragmentStore | fragmentStore_ |
toolbox::task::WorkLoop * | processWL_ |
SharedResourcesPtr | sharedResources_ |
StateMachinePtr | stateMachine_ |
boost::posix_time::time_duration | timeout_ |
WrapperNotifier | wrapperNotifier_ |
Processes I2O event fragments
It pops the next fragment from the FragmentQueue and adds it to the FragmentStore. If this completes the event, it hands it to the EventDistributor.
Definition at line 40 of file FragmentProcessor.h.
FragmentProcessor::FragmentProcessor | ( | xdaq::Application * | app, |
SharedResourcesPtr | sr | ||
) |
Definition at line 20 of file FragmentProcessor.cc.
References eventDistributor_, stor::WorkerThreadParams::FPdeqWaitTime_, fragmentStore_, sharedResources_, stateMachine_, timeout_, and wrapperNotifier_.
: app_(app), sharedResources_(sr), wrapperNotifier_( app ), fragmentStore_(sr->configuration_->getQueueConfigurationParams().fragmentStoreMemoryLimitMB_), eventDistributor_(sr), actionIsActive_(true) { stateMachine_.reset( new StateMachine( &eventDistributor_, &fragmentStore_, &wrapperNotifier_, sharedResources_ ) ); stateMachine_->initiate(); WorkerThreadParams workerParams = sharedResources_->configuration_->getWorkerThreadParams(); timeout_ = workerParams.FPdeqWaitTime_; }
FragmentProcessor::~FragmentProcessor | ( | ) |
Definition at line 39 of file FragmentProcessor.cc.
References actionIsActive_, and processWL_.
{ // Stop the activity actionIsActive_ = false; // Cancel the workloop (will wait until the action has finished) processWL_->cancel(); }
void FragmentProcessor::processAllCommands | ( | ) | [private] |
Processes all state machine events in the command queue
Definition at line 162 of file FragmentProcessor.cc.
References stor::WorkerThreadParams::FPdeqWaitTime_, sharedResources_, stateMachine_, and timeout_.
Referenced by processMessages().
{ CommandQueuePtr cq = sharedResources_->commandQueue_; stor::EventPtr_t evt; bool gotCommand = false; while( cq->deqNowait( evt ) ) { gotCommand = true; stateMachine_->process_event( *evt ); } // the timeout value may have changed if the transition was // a Configuration transition, so check for a new value here if (gotCommand) { WorkerThreadParams workerParams = sharedResources_->configuration_->getWorkerThreadParams(); timeout_ = workerParams.FPdeqWaitTime_; } }
void FragmentProcessor::processAllRegistrations | ( | ) | [private] |
Processes all consumer registrations in the registration queue
Definition at line 185 of file FragmentProcessor.cc.
References eventDistributor_, and sharedResources_.
Referenced by processMessages().
{ RegPtr regPtr; RegistrationQueuePtr regQueue = sharedResources_->registrationQueue_; while ( regQueue->deqNowait( regPtr ) ) { regPtr->registerMe( &eventDistributor_ ); } }
bool FragmentProcessor::processMessages | ( | toolbox::task::WorkLoop * | ) |
The workloop action processing state machine commands from the command queue and handling I2O messages retrieved from the FragmentQueue
Definition at line 74 of file FragmentProcessor.cc.
References actionIsActive_, alignCSCRings::e, stor::AlarmHandler::ERROR, exception, Exception, processAllCommands(), processAllRegistrations(), processOneFragmentIfPossible(), sharedResources_, and AlCaHLTBitMon_QueryRunRegistry::string.
Referenced by startWorkLoop().
{ std::string errorMsg; try { errorMsg = "Failed to process state machine events: "; processAllCommands(); errorMsg = "Failed to process consumer registrations: "; processAllRegistrations(); errorMsg = "Failed to process an event fragment: "; processOneFragmentIfPossible(); } catch(stor::exception::RBLookupFailed &e) { sharedResources_->alarmHandler_-> notifySentinel(AlarmHandler::ERROR, e); } catch(xcept::Exception &e) { XCEPT_DECLARE_NESTED( stor::exception::FragmentProcessing, sentinelException, errorMsg, e ); sharedResources_->alarmHandler_->moveToFailedState(sentinelException); } catch(std::exception &e) { errorMsg += e.what(); XCEPT_DECLARE(stor::exception::FragmentProcessing, sentinelException, errorMsg); sharedResources_->alarmHandler_->moveToFailedState(sentinelException); } catch(...) { errorMsg += "Unknown exception"; XCEPT_DECLARE(stor::exception::FragmentProcessing, sentinelException, errorMsg); sharedResources_->alarmHandler_->moveToFailedState(sentinelException); } return actionIsActive_; }
void FragmentProcessor::processOneFragment | ( | ) | [private] |
Process a single fragment. This should only be called if it has already been determined there is a place to put it.
Definition at line 136 of file FragmentProcessor.cc.
References stor::utils::getCurrentTime(), stor::I2OChain::memoryUsed(), sharedResources_, stateMachine_, and timeout_.
Referenced by processOneFragmentIfPossible().
{ I2OChain fragment; FragmentQueuePtr fq = sharedResources_->fragmentQueue_; utils::TimePoint_t startTime = utils::getCurrentTime(); if (fq->deqTimedWait(fragment, timeout_)) { utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime; sharedResources_->statisticsReporter_->getThroughputMonitorCollection(). addFragmentProcessorIdleSample(elapsedTime); sharedResources_->statisticsReporter_->getThroughputMonitorCollection(). addPoppedFragmentSample(fragment.memoryUsed()); stateMachine_->getCurrentState().processI2OFragment(fragment); } else { utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime; sharedResources_->statisticsReporter_->getThroughputMonitorCollection(). addFragmentProcessorIdleSample(elapsedTime); stateMachine_->getCurrentState().noFragmentToProcess(); } }
void FragmentProcessor::processOneFragmentIfPossible | ( | ) | [private] |
Process a single fragment, if there is place to put it.
Definition at line 118 of file FragmentProcessor.cc.
References stor::FragmentStore::addToStaleEventTimes(), eventDistributor_, fragmentStore_, stor::FragmentStore::full(), stor::EventDistributor::full(), stor::utils::getCurrentTime(), processOneFragment(), sharedResources_, stor::utils::sleep(), and timeout_.
Referenced by processMessages().
{ if (fragmentStore_.full() || eventDistributor_.full()) { utils::TimePoint_t startTime = utils::getCurrentTime(); utils::sleep(timeout_); utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime; sharedResources_->statisticsReporter_->getThroughputMonitorCollection(). addFragmentProcessorIdleSample(elapsedTime); fragmentStore_.addToStaleEventTimes(elapsedTime); } else processOneFragment(); }
void FragmentProcessor::startWorkLoop | ( | std::string | workloopName | ) |
Create and start the fragment processing workloop
Definition at line 48 of file FragmentProcessor.cc.
References app_, alignCSCRings::e, Exception, stor::utils::getIdentifier(), lumiQueryAPI::msg, processMessages(), processWL_, and AlCaHLTBitMon_QueryRunRegistry::string.
{ try { std::string identifier = utils::getIdentifier(app_->getApplicationDescriptor()); processWL_ = toolbox::task::getWorkLoopFactory()-> getWorkLoop( identifier + workloopName, "waiting" ); if ( ! processWL_->isActive() ) { toolbox::task::ActionSignature* processAction = toolbox::task::bind(this, &FragmentProcessor::processMessages, identifier + "ProcessMessages"); processWL_->submit(processAction); processWL_->activate(); } } catch (xcept::Exception& e) { std::string msg = "Failed to start workloop 'FragmentProcessor' with 'processMessages'."; XCEPT_RETHROW(stor::exception::FragmentProcessing, msg, e); } }
bool stor::FragmentProcessor::actionIsActive_ [private] |
Definition at line 92 of file FragmentProcessor.h.
Referenced by processMessages(), and ~FragmentProcessor().
xdaq::Application* stor::FragmentProcessor::app_ [private] |
Definition at line 84 of file FragmentProcessor.h.
Referenced by startWorkLoop().
Definition at line 89 of file FragmentProcessor.h.
Referenced by FragmentProcessor(), processAllRegistrations(), and processOneFragmentIfPossible().
Definition at line 88 of file FragmentProcessor.h.
Referenced by FragmentProcessor(), and processOneFragmentIfPossible().
toolbox::task::WorkLoop* stor::FragmentProcessor::processWL_ [private] |
Definition at line 94 of file FragmentProcessor.h.
Referenced by startWorkLoop(), and ~FragmentProcessor().
Definition at line 85 of file FragmentProcessor.h.
Referenced by FragmentProcessor(), processAllCommands(), processAllRegistrations(), processMessages(), processOneFragment(), and processOneFragmentIfPossible().
Definition at line 87 of file FragmentProcessor.h.
Referenced by FragmentProcessor(), processAllCommands(), and processOneFragment().
boost::posix_time::time_duration stor::FragmentProcessor::timeout_ [private] |
Definition at line 91 of file FragmentProcessor.h.
Referenced by FragmentProcessor(), processAllCommands(), processOneFragment(), and processOneFragmentIfPossible().
Definition at line 86 of file FragmentProcessor.h.
Referenced by FragmentProcessor().