CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
Processing.cc
Go to the documentation of this file.
1 // $Id: Processing.cc,v 1.17.6.1 2011/03/07 11:33:05 mommsen Exp $
3 
13 
14 #include <iostream>
15 #include <sstream>
16 
17 #include "xcept/tools.h"
18 
19 using namespace std;
20 using namespace stor;
21 
22 Processing::Processing( my_context c ): my_base(c)
23 {
25 }
26 
28 {
29  TransitionRecord tr( stateName(), true );
30  outermost_context().updateHistory( tr );
31  outermost_context().setExternallyVisibleState( "Enabled" );
32  outermost_context().getNotifier()->reportNewState( "Enabled" );
33 }
34 
36 {
38 }
39 
41 {
42  TransitionRecord tr( stateName(), false );
43  outermost_context().updateHistory( tr );
44 }
45 
47 {
48  return std::string( "Processing" );
49 }
50 
52 {
53  outermost_context().getSharedResources()->moveToFailedState( exception );
54 }
55 
56 void Processing::logEndRunRequest( const EndRun& request )
57 {
58  outermost_context().unconsumed_event( request );
59 }
60 
61 void
63 {
64  static unsigned int noFragmentCount = 0;
65 
66  FragmentStore* fragmentStore = outermost_context().getFragmentStore();
67  bool completed = fragmentStore->addFragment(frag);
68  if ( completed )
69  {
70  // The run number check has to be done before the event is added to the
71  // queues, as for some event types, e.g. error events, the run number
72  // match is enforced.
73  try
74  {
75  uint32_t runNumber = outermost_context().getSharedResources()->configuration_->getRunNumber();
76  frag.assertRunNumber(runNumber);
77  }
78  catch(stor::exception::RunNumberMismatch &e)
79  {
80  // Just raise an alarm, but continue to process the event
81  outermost_context().getSharedResources()->statisticsReporter_->
82  alarmHandler()->notifySentinel(AlarmHandler::ERROR, e);
83  }
84  outermost_context().getEventDistributor()->addEventToRelevantQueues(frag);
85  outermost_context().getSharedResources()->discardManager_->sendDiscardMessage(frag);
86  }
87  else
88  {
89  // Only do the check every 100th fragment
90  // TODO: shall we make this number configurable?
91  ++noFragmentCount;
92  if ( noFragmentCount >= 100 )
93  {
94  noFragmentCount = 0;
95  this->noFragmentToProcess();
96  }
97  }
98 
99  // 12-Aug-2009, KAB - I put the sampling of the fragment store size
100  // *after* the code to add fragments to the store and move them from the
101  // fragment store to the relevant queues (when needed) so that the baseline
102  // number of events in the fragment store is zero. For example, when
103  // disk writing is slow or stopped, the stream queue fills up, and there
104  // is backpressure within the SM, the true number of events in the fragment
105  // store is zero, and putting the sampling here reflects that.
107  outermost_context().getSharedResources()->statisticsReporter_->
108  getThroughputMonitorCollection();
109  tmc.setFragmentStoreSize(fragmentStore->size());
110  tmc.setFragmentStoreMemoryUsed(fragmentStore->memoryUsed());
111 }
112 
113 void
115 {
116  I2OChain staleEvent;
117 
118  WorkerThreadParams workerParams =
119  outermost_context().getSharedResources()->configuration_->getWorkerThreadParams();
120  bool gotStaleEvent =
121  outermost_context().getFragmentStore()->
122  getStaleEvent(staleEvent, workerParams.staleFragmentTimeOut_);
123 
124  if ( gotStaleEvent )
125  {
126  outermost_context().getSharedResources()->discardManager_->sendDiscardMessage(staleEvent);
127  outermost_context().getEventDistributor()->addEventToRelevantQueues(staleEvent);
128  }
129  outermost_context().getEventDistributor()->checkForStaleConsumers();
130 }
131 
132 
void logEndRunRequest(const EndRun &request)
Definition: Processing.cc:56
std::string stateName() const
Definition: Operations.cc:39
const bool addFragment(I2OChain &)
void noFragmentToProcess() const
Definition: Operations.cc:34
void assertRunNumber(uint32_t runNumber)
Definition: I2OChain.cc:565
virtual ~Processing()
Definition: Processing.cc:35
virtual void do_entryActionWork()
Definition: Processing.cc:27
void safeExitAction()
Definition: Operations.cc:108
utils::Duration_t staleFragmentTimeOut_
size_t memoryUsed() const
Definition: FragmentStore.h:96
virtual std::string do_stateName() const
Definition: Processing.cc:46
virtual void do_exitActionWork()
Definition: Processing.cc:40
unsigned int size() const
Definition: FragmentStore.h:89
virtual void do_moveToFailedState(xcept::Exception &exception) const
Definition: Processing.cc:51
virtual void do_processI2OFragment(I2OChain &frag) const
Definition: Processing.cc:62
virtual void do_noFragmentToProcess() const
Definition: Processing.cc:114
void safeEntryAction()
Definition: Operations.cc:77