CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Types | Public Member Functions | Private Member Functions
stor::Processing Class Reference

#include <StateMachine.h>

Inheritance diagram for stor::Processing:
stor::Operations

Public Types

typedef bsc::transition< Stop,
DrainingQueues
DT
 
typedef bsc::in_state_reaction
< EndRun, Processing,&Processing::logEndRunRequest
EndRunIR
 
typedef boost::mpl::list< DT,
EndRunIR
reactions
 

Public Member Functions

void logEndRunRequest (const EndRun &request)
 
 Processing (my_context)
 
virtual ~Processing ()
 
- Public Member Functions inherited from stor::Operations
void moveToFailedState (xcept::Exception &exception) const
 
void noFragmentToProcess () const
 
 Operations ()
 
void processI2OFragment (I2OChain &frag) const
 
std::string stateName () const
 
virtual ~Operations ()=0
 

Private Member Functions

virtual void do_entryActionWork ()
 
virtual void do_exitActionWork ()
 
virtual void do_moveToFailedState (xcept::Exception &exception) const
 
virtual void do_noFragmentToProcess () const
 
virtual void do_processI2OFragment (I2OChain &frag) const
 
virtual std::string do_stateName () const
 

Additional Inherited Members

- Protected Member Functions inherited from stor::Operations
void safeEntryAction ()
 
void safeExitAction ()
 

Detailed Description

Processing state

Author:
mommsen
Revision:
1.10.12.1
Date:
2011/03/07 11:33:04

Definition at line 503 of file StateMachine.h.

Member Typedef Documentation

typedef bsc::transition<Stop,DrainingQueues> stor::Processing::DT

Definition at line 510 of file StateMachine.h.

Definition at line 511 of file StateMachine.h.

Definition at line 512 of file StateMachine.h.

Constructor & Destructor Documentation

Processing::Processing ( my_context  c)

Definition at line 22 of file Processing.cc.

References stor::Operations::safeEntryAction().

22  : my_base(c)
23 {
25 }
void safeEntryAction()
Definition: Operations.cc:77
Processing::~Processing ( )
virtual

Definition at line 35 of file Processing.cc.

References stor::Operations::safeExitAction().

36 {
38 }
void safeExitAction()
Definition: Operations.cc:108

Member Function Documentation

void Processing::do_entryActionWork ( )
privatevirtual

Implements stor::Operations.

Definition at line 27 of file Processing.cc.

References stor::Operations::stateName().

28 {
29  TransitionRecord tr( stateName(), true );
30  outermost_context().updateHistory( tr );
31  outermost_context().setExternallyVisibleState( "Enabled" );
32  outermost_context().getNotifier()->reportNewState( "Enabled" );
33 }
std::string stateName() const
Definition: Operations.cc:39
void Processing::do_exitActionWork ( )
privatevirtual

Implements stor::Operations.

Definition at line 40 of file Processing.cc.

References stor::Operations::stateName().

41 {
42  TransitionRecord tr( stateName(), false );
43  outermost_context().updateHistory( tr );
44 }
std::string stateName() const
Definition: Operations.cc:39
void Processing::do_moveToFailedState ( xcept::Exception &  exception) const
privatevirtual

Implements stor::Operations.

Definition at line 51 of file Processing.cc.

52 {
53  outermost_context().getSharedResources()->moveToFailedState( exception );
54 }
void Processing::do_noFragmentToProcess ( ) const
privatevirtual

Reimplemented from stor::Operations.

Definition at line 114 of file Processing.cc.

References stor::WorkerThreadParams::staleFragmentTimeOut_.

115 {
116  I2OChain staleEvent;
117 
118  WorkerThreadParams workerParams =
119  outermost_context().getSharedResources()->configuration_->getWorkerThreadParams();
120  bool gotStaleEvent =
121  outermost_context().getFragmentStore()->
122  getStaleEvent(staleEvent, workerParams.staleFragmentTimeOut_);
123 
124  if ( gotStaleEvent )
125  {
126  outermost_context().getSharedResources()->discardManager_->sendDiscardMessage(staleEvent);
127  outermost_context().getEventDistributor()->addEventToRelevantQueues(staleEvent);
128  }
129  outermost_context().getEventDistributor()->checkForStaleConsumers();
130 }
utils::Duration_t staleFragmentTimeOut_
void Processing::do_processI2OFragment ( I2OChain frag) const
privatevirtual

Reimplemented from stor::Operations.

Definition at line 62 of file Processing.cc.

References stor::FragmentStore::addFragment(), stor::I2OChain::assertRunNumber(), stor::AlarmHandler::ERROR, stor::FragmentStore::memoryUsed(), stor::Operations::noFragmentToProcess(), inputsource_file_cfi::runNumber, stor::ThroughputMonitorCollection::setFragmentStoreMemoryUsed(), stor::ThroughputMonitorCollection::setFragmentStoreSize(), and stor::FragmentStore::size().

63 {
64  static unsigned int noFragmentCount = 0;
65 
66  FragmentStore* fragmentStore = outermost_context().getFragmentStore();
67  bool completed = fragmentStore->addFragment(frag);
68  if ( completed )
69  {
70  // The run number check has to be done before the event is added to the
71  // queues, as for some event types, e.g. error events, the run number
72  // match is enforced.
73  try
74  {
75  uint32_t runNumber = outermost_context().getSharedResources()->configuration_->getRunNumber();
76  frag.assertRunNumber(runNumber);
77  }
78  catch(stor::exception::RunNumberMismatch &e)
79  {
80  // Just raise an alarm, but continue to process the event
81  outermost_context().getSharedResources()->statisticsReporter_->
82  alarmHandler()->notifySentinel(AlarmHandler::ERROR, e);
83  }
84  outermost_context().getEventDistributor()->addEventToRelevantQueues(frag);
85  outermost_context().getSharedResources()->discardManager_->sendDiscardMessage(frag);
86  }
87  else
88  {
89  // Only do the check every 100th fragment
90  // TODO: shall we make this number configurable?
91  ++noFragmentCount;
92  if ( noFragmentCount >= 100 )
93  {
94  noFragmentCount = 0;
95  this->noFragmentToProcess();
96  }
97  }
98 
99  // 12-Aug-2009, KAB - I put the sampling of the fragment store size
100  // *after* the code to add fragments to the store and move them from the
101  // fragment store to the relevant queues (when needed) so that the baseline
102  // number of events in the fragment store is zero. For example, when
103  // disk writing is slow or stopped, the stream queue fills up, and there
104  // is backpressure within the SM, the true number of events in the fragment
105  // store is zero, and putting the sampling here reflects that.
107  outermost_context().getSharedResources()->statisticsReporter_->
108  getThroughputMonitorCollection();
109  tmc.setFragmentStoreSize(fragmentStore->size());
110  tmc.setFragmentStoreMemoryUsed(fragmentStore->memoryUsed());
111 }
const bool addFragment(I2OChain &)
void noFragmentToProcess() const
Definition: Operations.cc:34
void assertRunNumber(uint32_t runNumber)
Definition: I2OChain.cc:565
size_t memoryUsed() const
Definition: FragmentStore.h:96
unsigned int size() const
Definition: FragmentStore.h:89
string Processing::do_stateName ( ) const
privatevirtual

Implements stor::Operations.

Definition at line 46 of file Processing.cc.

47 {
48  return std::string( "Processing" );
49 }
void Processing::logEndRunRequest ( const EndRun request)

Definition at line 56 of file Processing.cc.

57 {
58  outermost_context().unconsumed_event( request );
59 }