#include <StateMachine.h>
Public Types | |
typedef bsc::transition< Stop, DrainingQueues > | DT |
typedef bsc::in_state_reaction < EndRun, Processing,&Processing::logEndRunRequest > | EndRunIR |
typedef boost::mpl::list< DT, EndRunIR > | reactions |
Public Member Functions | |
void | logEndRunRequest (const EndRun &request) |
Processing (my_context) | |
virtual | ~Processing () |
Private Member Functions | |
virtual void | do_entryActionWork () |
virtual void | do_exitActionWork () |
virtual void | do_moveToFailedState (xcept::Exception &exception) const |
virtual void | do_noFragmentToProcess () const |
virtual void | do_processI2OFragment (I2OChain &frag) const |
virtual std::string | do_stateName () const |
Processing state
Definition at line 503 of file StateMachine.h.
typedef bsc::transition<Stop,DrainingQueues> stor::Processing::DT |
Definition at line 510 of file StateMachine.h.
typedef bsc::in_state_reaction<EndRun,Processing,&Processing::logEndRunRequest> stor::Processing::EndRunIR |
Definition at line 511 of file StateMachine.h.
typedef boost::mpl::list<DT,EndRunIR> stor::Processing::reactions |
Definition at line 512 of file StateMachine.h.
Processing::Processing | ( | my_context | c | ) |
Definition at line 22 of file Processing.cc.
References stor::Operations::safeEntryAction().
: my_base(c) { safeEntryAction(); }
Processing::~Processing | ( | ) | [virtual] |
Definition at line 35 of file Processing.cc.
References stor::Operations::safeExitAction().
{ safeExitAction(); }
void Processing::do_entryActionWork | ( | ) | [private, virtual] |
Implements stor::Operations.
Definition at line 27 of file Processing.cc.
References stor::Operations::stateName().
{ TransitionRecord tr( stateName(), true ); outermost_context().updateHistory( tr ); outermost_context().setExternallyVisibleState( "Enabled" ); outermost_context().getNotifier()->reportNewState( "Enabled" ); }
void Processing::do_exitActionWork | ( | ) | [private, virtual] |
Implements stor::Operations.
Definition at line 40 of file Processing.cc.
References stor::Operations::stateName().
{ TransitionRecord tr( stateName(), false ); outermost_context().updateHistory( tr ); }
void Processing::do_moveToFailedState | ( | xcept::Exception & | exception | ) | const [private, virtual] |
Implements stor::Operations.
Definition at line 51 of file Processing.cc.
{ outermost_context().getSharedResources()->moveToFailedState( exception ); }
void Processing::do_noFragmentToProcess | ( | ) | const [private, virtual] |
Reimplemented from stor::Operations.
Definition at line 114 of file Processing.cc.
References stor::WorkerThreadParams::staleFragmentTimeOut_.
{ I2OChain staleEvent; WorkerThreadParams workerParams = outermost_context().getSharedResources()->configuration_->getWorkerThreadParams(); bool gotStaleEvent = outermost_context().getFragmentStore()-> getStaleEvent(staleEvent, workerParams.staleFragmentTimeOut_); if ( gotStaleEvent ) { outermost_context().getSharedResources()->discardManager_->sendDiscardMessage(staleEvent); outermost_context().getEventDistributor()->addEventToRelevantQueues(staleEvent); } outermost_context().getEventDistributor()->checkForStaleConsumers(); }
void Processing::do_processI2OFragment | ( | I2OChain & | frag | ) | const [private, virtual] |
Reimplemented from stor::Operations.
Definition at line 62 of file Processing.cc.
References stor::FragmentStore::addFragment(), stor::I2OChain::assertRunNumber(), stor::AlarmHandler::ERROR, stor::FragmentStore::memoryUsed(), stor::Operations::noFragmentToProcess(), inputsource_file_cfi::runNumber, stor::ThroughputMonitorCollection::setFragmentStoreMemoryUsed(), stor::ThroughputMonitorCollection::setFragmentStoreSize(), and stor::FragmentStore::size().
{ static unsigned int noFragmentCount = 0; FragmentStore* fragmentStore = outermost_context().getFragmentStore(); bool completed = fragmentStore->addFragment(frag); if ( completed ) { // The run number check has to be done before the event is added to the // queues, as for some event types, e.g. error events, the run number // match is enforced. try { uint32_t runNumber = outermost_context().getSharedResources()->configuration_->getRunNumber(); frag.assertRunNumber(runNumber); } catch(stor::exception::RunNumberMismatch &e) { // Just raise an alarm, but continue to process the event outermost_context().getSharedResources()->statisticsReporter_-> alarmHandler()->notifySentinel(AlarmHandler::ERROR, e); } outermost_context().getEventDistributor()->addEventToRelevantQueues(frag); outermost_context().getSharedResources()->discardManager_->sendDiscardMessage(frag); } else { // Only do the check every 100th fragment // TODO: shall we make this number configurable? ++noFragmentCount; if ( noFragmentCount >= 100 ) { noFragmentCount = 0; this->noFragmentToProcess(); } } // 12-Aug-2009, KAB - I put the sampling of the fragment store size // *after* the code to add fragments to the store and move them from the // fragment store to the relevant queues (when needed) so that the baseline // number of events in the fragment store is zero. For example, when // disk writing is slow or stopped, the stream queue fills up, and there // is backpressure within the SM, the true number of events in the fragment // store is zero, and putting the sampling here reflects that. ThroughputMonitorCollection& tmc = outermost_context().getSharedResources()->statisticsReporter_-> getThroughputMonitorCollection(); tmc.setFragmentStoreSize(fragmentStore->size()); tmc.setFragmentStoreMemoryUsed(fragmentStore->memoryUsed()); }
string Processing::do_stateName | ( | ) | const [private, virtual] |
Implements stor::Operations.
Definition at line 46 of file Processing.cc.
{ return std::string( "Processing" ); }
void Processing::logEndRunRequest | ( | const EndRun & | request | ) |
Definition at line 56 of file Processing.cc.
{ outermost_context().unconsumed_event( request ); }