CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
Processing.cc
Go to the documentation of this file.
1 // $Id: Processing.cc,v 1.19 2011/11/08 10:48:41 mommsen Exp $
3 
14 
15 #include <iostream>
16 #include <sstream>
17 
18 #include "xcept/tools.h"
19 
20 using namespace std;
21 using namespace stor;
22 
23 Processing::Processing( my_context c ): my_base(c)
24 {
26 }
27 
29 {
30  TransitionRecord tr( stateName(), true );
31  outermost_context().updateHistory( tr );
32  outermost_context().setExternallyVisibleState( "Enabled" );
33  outermost_context().getNotifier()->reportNewState( "Enabled" );
34 }
35 
37 {
39 }
40 
42 {
43  TransitionRecord tr( stateName(), false );
44  outermost_context().updateHistory( tr );
45 }
46 
48 {
49  return std::string( "Processing" );
50 }
51 
53 {
54  outermost_context().getSharedResources()->alarmHandler_->moveToFailedState( exception );
55 }
56 
57 void Processing::logEndRunRequest( const EndRun& request )
58 {
59  outermost_context().unconsumed_event( request );
60 }
61 
62 void
64 {
65  static unsigned int noFragmentCount = 0;
66 
67  FragmentStore* fragmentStore = outermost_context().getFragmentStore();
68  bool completed = fragmentStore->addFragment(frag);
69  if ( completed )
70  {
71  // The run number check has to be done before the event is added to the
72  // queues, as for some event types, e.g. error events, the run number
73  // match is enforced.
74  try
75  {
76  uint32_t runNumber = outermost_context().getSharedResources()->configuration_->getRunNumber();
77  frag.assertRunNumber(runNumber);
78  }
79  catch(stor::exception::RunNumberMismatch &e)
80  {
81  // Just raise an alarm, but continue to process the event
82  outermost_context().getSharedResources()->
83  alarmHandler_->notifySentinel(AlarmHandler::ERROR, e);
84  }
85  outermost_context().getEventDistributor()->addEventToRelevantQueues(frag);
86  outermost_context().getSharedResources()->discardManager_->sendDiscardMessage(frag);
87  }
88  else
89  {
90  // Only do the check every 100th fragment
91  // TODO: shall we make this number configurable?
92  ++noFragmentCount;
93  if ( noFragmentCount >= 100 )
94  {
95  noFragmentCount = 0;
96  this->noFragmentToProcess();
97  }
98  }
99 
100  // 12-Aug-2009, KAB - I put the sampling of the fragment store size
101  // *after* the code to add fragments to the store and move them from the
102  // fragment store to the relevant queues (when needed) so that the baseline
103  // number of events in the fragment store is zero. For example, when
104  // disk writing is slow or stopped, the stream queue fills up, and there
105  // is backpressure within the SM, the true number of events in the fragment
106  // store is zero, and putting the sampling here reflects that.
108  outermost_context().getSharedResources()->statisticsReporter_->
109  getThroughputMonitorCollection();
110  tmc.setFragmentStoreSize(fragmentStore->size());
111  tmc.setFragmentStoreMemoryUsed(fragmentStore->memoryUsed());
112 }
113 
114 void
116 {
117  I2OChain staleEvent;
118 
119  WorkerThreadParams workerParams =
120  outermost_context().getSharedResources()->configuration_->getWorkerThreadParams();
121  bool gotStaleEvent =
122  outermost_context().getFragmentStore()->
123  getStaleEvent(staleEvent, workerParams.staleFragmentTimeOut_);
124 
125  if ( gotStaleEvent )
126  {
127  outermost_context().getSharedResources()->discardManager_->sendDiscardMessage(staleEvent);
128  outermost_context().getEventDistributor()->addEventToRelevantQueues(staleEvent);
129  }
130  outermost_context().getEventDistributor()->checkForStaleConsumers();
131 }
132 
133 
void logEndRunRequest(const EndRun &request)
Definition: Processing.cc:57
std::string stateName() const
Definition: Operations.cc:39
const bool addFragment(I2OChain &)
void noFragmentToProcess() const
Definition: Operations.cc:34
void assertRunNumber(uint32_t runNumber)
Definition: I2OChain.cc:575
virtual ~Processing()
Definition: Processing.cc:36
virtual void do_entryActionWork()
Definition: Processing.cc:28
void safeExitAction()
Definition: Operations.cc:108
utils::Duration_t staleFragmentTimeOut_
size_t memoryUsed() const
Definition: FragmentStore.h:95
virtual std::string do_stateName() const
Definition: Processing.cc:47
virtual void do_exitActionWork()
Definition: Processing.cc:41
unsigned int size() const
Definition: FragmentStore.h:88
virtual void do_moveToFailedState(xcept::Exception &exception) const
Definition: Processing.cc:52
virtual void do_processI2OFragment(I2OChain &frag) const
Definition: Processing.cc:63
virtual void do_noFragmentToProcess() const
Definition: Processing.cc:115
void safeEntryAction()
Definition: Operations.cc:77