CMS 3D CMS Logo

Public Types | Public Member Functions | Private Member Functions

stor::Processing Class Reference

#include <StateMachine.h>

Inheritance diagram for stor::Processing:
state stor::Operations

List of all members.

Public Types

typedef bsc::transition< Stop,
DrainingQueues
DT
typedef bsc::in_state_reaction
< EndRun, Processing,&Processing::logEndRunRequest > 
EndRunIR
typedef boost::mpl::list< DT,
EndRunIR
reactions

Public Member Functions

void logEndRunRequest (const EndRun &request)
 Processing (my_context)
virtual ~Processing ()

Private Member Functions

virtual void do_entryActionWork ()
virtual void do_exitActionWork ()
virtual void do_moveToFailedState (xcept::Exception &exception) const
virtual void do_noFragmentToProcess () const
virtual void do_processI2OFragment (I2OChain &frag) const
virtual std::string do_stateName () const

Detailed Description

Processing state

Author:
mommsen
Revision:
1.11
Date:
2011/03/07 15:31:32

Definition at line 503 of file StateMachine.h.


Member Typedef Documentation

typedef bsc::transition<Stop,DrainingQueues> stor::Processing::DT

Definition at line 510 of file StateMachine.h.

typedef bsc::in_state_reaction<EndRun,Processing,&Processing::logEndRunRequest> stor::Processing::EndRunIR

Definition at line 511 of file StateMachine.h.

typedef boost::mpl::list<DT,EndRunIR> stor::Processing::reactions

Definition at line 512 of file StateMachine.h.


Constructor & Destructor Documentation

Processing::Processing ( my_context  c)

Definition at line 22 of file Processing.cc.

References stor::Operations::safeEntryAction().

                                    : my_base(c)
{
  safeEntryAction();
}
Processing::~Processing ( ) [virtual]

Definition at line 35 of file Processing.cc.

References stor::Operations::safeExitAction().


Member Function Documentation

void Processing::do_entryActionWork ( ) [private, virtual]

Implements stor::Operations.

Definition at line 27 of file Processing.cc.

References stor::Operations::stateName().

{
  TransitionRecord tr( stateName(), true );
  outermost_context().updateHistory( tr );
  outermost_context().setExternallyVisibleState( "Enabled" );
  outermost_context().getNotifier()->reportNewState( "Enabled" );
}
void Processing::do_exitActionWork ( ) [private, virtual]

Implements stor::Operations.

Definition at line 40 of file Processing.cc.

References stor::Operations::stateName().

{
  TransitionRecord tr( stateName(), false );
  outermost_context().updateHistory( tr );
}
void Processing::do_moveToFailedState ( xcept::Exception exception) const [private, virtual]

Implements stor::Operations.

Definition at line 51 of file Processing.cc.

{
  outermost_context().getSharedResources()->moveToFailedState( exception );
}
void Processing::do_noFragmentToProcess ( ) const [private, virtual]

Reimplemented from stor::Operations.

Definition at line 114 of file Processing.cc.

References stor::WorkerThreadParams::staleFragmentTimeOut_.

{
  I2OChain staleEvent;

  WorkerThreadParams workerParams =
    outermost_context().getSharedResources()->configuration_->getWorkerThreadParams();
  bool gotStaleEvent = 
    outermost_context().getFragmentStore()->
    getStaleEvent(staleEvent, workerParams.staleFragmentTimeOut_);

  if ( gotStaleEvent )
  {
    outermost_context().getSharedResources()->discardManager_->sendDiscardMessage(staleEvent);
    outermost_context().getEventDistributor()->addEventToRelevantQueues(staleEvent);
  }
  outermost_context().getEventDistributor()->checkForStaleConsumers();
}
void Processing::do_processI2OFragment ( I2OChain frag) const [private, virtual]

Reimplemented from stor::Operations.

Definition at line 62 of file Processing.cc.

References stor::FragmentStore::addFragment(), stor::I2OChain::assertRunNumber(), stor::AlarmHandler::ERROR, stor::FragmentStore::memoryUsed(), stor::Operations::noFragmentToProcess(), inputsource_file_cfi::runNumber, stor::ThroughputMonitorCollection::setFragmentStoreMemoryUsed(), stor::ThroughputMonitorCollection::setFragmentStoreSize(), and stor::FragmentStore::size().

{
  static unsigned int noFragmentCount = 0;

  FragmentStore* fragmentStore = outermost_context().getFragmentStore();
  bool completed = fragmentStore->addFragment(frag);
  if ( completed )
  {
    // The run number check has to be done before the event is added to the
    // queues, as for some event types, e.g. error events, the run number
    // match is enforced.
    try
    {
      uint32_t runNumber = outermost_context().getSharedResources()->configuration_->getRunNumber();
      frag.assertRunNumber(runNumber);
    }
    catch(stor::exception::RunNumberMismatch &e)
    {
      // Just raise an alarm, but continue to process the event
      outermost_context().getSharedResources()->statisticsReporter_->
        alarmHandler()->notifySentinel(AlarmHandler::ERROR, e);
    }
    outermost_context().getEventDistributor()->addEventToRelevantQueues(frag);
    outermost_context().getSharedResources()->discardManager_->sendDiscardMessage(frag);
  }
  else
  {
    // Only do the check every 100th fragment
    // TODO: shall we make this number configurable?
    ++noFragmentCount;
    if ( noFragmentCount >= 100 )
    {
      noFragmentCount = 0;
      this->noFragmentToProcess();
    }
  }

  // 12-Aug-2009, KAB - I put the sampling of the fragment store size
  // *after* the code to add fragments to the store and move them from the
  // fragment store to the relevant queues (when needed) so that the baseline
  // number of events in the fragment store is zero.  For example, when
  // disk writing is slow or stopped, the stream queue fills up, and there
  // is backpressure within the SM, the true number of events in the fragment
  // store is zero, and putting the sampling here reflects that.
  ThroughputMonitorCollection& tmc = 
    outermost_context().getSharedResources()->statisticsReporter_->
    getThroughputMonitorCollection();
  tmc.setFragmentStoreSize(fragmentStore->size());
  tmc.setFragmentStoreMemoryUsed(fragmentStore->memoryUsed());
}
string Processing::do_stateName ( ) const [private, virtual]

Implements stor::Operations.

Definition at line 46 of file Processing.cc.

{
  return std::string( "Processing" );
}
void Processing::logEndRunRequest ( const EndRun request)

Definition at line 56 of file Processing.cc.

{
  outermost_context().unconsumed_event( request );
}