CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Private Member Functions | Private Attributes
stor::FragmentProcessor Class Reference

#include <FragmentProcessor.h>

Inheritance diagram for stor::FragmentProcessor:

Public Member Functions

 FragmentProcessor (xdaq::Application *app, SharedResourcesPtr sr)
 
bool processMessages (toolbox::task::WorkLoop *)
 
void startWorkLoop (std::string workloopName)
 
 ~FragmentProcessor ()
 

Private Member Functions

void processAllCommands ()
 
void processAllRegistrations ()
 
void processOneFragment ()
 
void processOneFragmentIfPossible ()
 

Private Attributes

bool actionIsActive_
 
xdaq::Application * app_
 
EventDistributor eventDistributor_
 
FragmentStore fragmentStore_
 
toolbox::task::WorkLoop * processWL_
 
SharedResourcesPtr sharedResources_
 
StateMachinePtr stateMachine_
 
boost::posix_time::time_duration timeout_
 
WrapperNotifier wrapperNotifier_
 

Detailed Description

Processes I2O event fragments

It pops the next fragment from the FragmentQueue and adds it to the FragmentStore. If this completes the event, it hands it to the EventDistributor.

Author:
mommsen
Revision:
1.6
Date:
2011/03/07 15:31:32

Definition at line 40 of file FragmentProcessor.h.

Constructor & Destructor Documentation

FragmentProcessor::FragmentProcessor ( xdaq::Application *  app,
SharedResourcesPtr  sr 
)

Definition at line 20 of file FragmentProcessor.cc.

References eventDistributor_, stor::WorkerThreadParams::FPdeqWaitTime_, fragmentStore_, sharedResources_, stateMachine_, timeout_, and wrapperNotifier_.

21  :
22  app_(app),
23  sharedResources_(sr),
24  wrapperNotifier_( app ),
25  fragmentStore_(sr->configuration_->getQueueConfigurationParams().fragmentStoreMemoryLimitMB_),
27  actionIsActive_(true)
28 {
31  sharedResources_ ) );
32  stateMachine_->initiate();
33 
34  WorkerThreadParams workerParams =
35  sharedResources_->configuration_->getWorkerThreadParams();
36  timeout_ = workerParams.FPdeqWaitTime_;
37 }
SharedResourcesPtr sharedResources_
xdaq::Application * app_
EventDistributor eventDistributor_
boost::posix_time::time_duration FPdeqWaitTime_
boost::posix_time::time_duration timeout_
WrapperNotifier wrapperNotifier_
StateMachinePtr stateMachine_
FragmentProcessor::~FragmentProcessor ( )

Definition at line 39 of file FragmentProcessor.cc.

References actionIsActive_, and processWL_.

40 {
41  // Stop the activity
42  actionIsActive_ = false;
43 
44  // Cancel the workloop (will wait until the action has finished)
45  processWL_->cancel();
46 }
toolbox::task::WorkLoop * processWL_

Member Function Documentation

void FragmentProcessor::processAllCommands ( )
private

Processes all state machine events in the command queue

Definition at line 162 of file FragmentProcessor.cc.

References stor::WorkerThreadParams::FPdeqWaitTime_, sharedResources_, stateMachine_, and timeout_.

Referenced by processMessages().

163 {
164  CommandQueuePtr cq = sharedResources_->commandQueue_;
165  stor::EventPtr_t evt;
166  bool gotCommand = false;
167 
168  while( cq->deqNowait( evt ) )
169  {
170  gotCommand = true;
171  stateMachine_->process_event( *evt );
172  }
173 
174  // the timeout value may have changed if the transition was
175  // a Configuration transition, so check for a new value here
176  if (gotCommand)
177  {
178  WorkerThreadParams workerParams =
179  sharedResources_->configuration_->getWorkerThreadParams();
180  timeout_ = workerParams.FPdeqWaitTime_;
181  }
182 }
SharedResourcesPtr sharedResources_
boost::shared_ptr< CommandQueue > CommandQueuePtr
Definition: CommandQueue.h:23
boost::shared_ptr< boost::statechart::event_base > EventPtr_t
Definition: CommandQueue.h:21
boost::posix_time::time_duration FPdeqWaitTime_
boost::posix_time::time_duration timeout_
StateMachinePtr stateMachine_
void FragmentProcessor::processAllRegistrations ( )
private

Processes all consumer registrations in the registration queue

Definition at line 185 of file FragmentProcessor.cc.

References eventDistributor_, and sharedResources_.

Referenced by processMessages().

186 {
187  RegPtr regPtr;
188  RegistrationQueuePtr regQueue =
189  sharedResources_->registrationQueue_;
190  while ( regQueue->deqNowait( regPtr ) )
191  {
192  regPtr->registerMe( &eventDistributor_ );
193  }
194 }
SharedResourcesPtr sharedResources_
boost::shared_ptr< RegistrationInfoBase > RegPtr
EventDistributor eventDistributor_
boost::shared_ptr< RegistrationQueue > RegistrationQueuePtr
bool FragmentProcessor::processMessages ( toolbox::task::WorkLoop *  )

The workloop action processing state machine commands from the command queue and handling I2O messages retrieved from the FragmentQueue

Definition at line 74 of file FragmentProcessor.cc.

References actionIsActive_, alignCSCRings::e, stor::AlarmHandler::ERROR, cppFunctionSkipper::exception, edm::hlt::Exception, processAllCommands(), processAllRegistrations(), processOneFragmentIfPossible(), sharedResources_, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by startWorkLoop().

75 {
76  std::string errorMsg;
77 
78  try
79  {
80  errorMsg = "Failed to process state machine events: ";
82 
83  errorMsg = "Failed to process consumer registrations: ";
85 
86  errorMsg = "Failed to process an event fragment: ";
88  }
89  catch(stor::exception::RBLookupFailed &e)
90  {
91  sharedResources_->alarmHandler_->
92  notifySentinel(AlarmHandler::ERROR, e);
93  }
94  catch(xcept::Exception &e)
95  {
96  XCEPT_DECLARE_NESTED( stor::exception::FragmentProcessing,
97  sentinelException, errorMsg, e );
98  sharedResources_->alarmHandler_->moveToFailedState(sentinelException);
99  }
100  catch(std::exception &e)
101  {
102  errorMsg += e.what();
103  XCEPT_DECLARE(stor::exception::FragmentProcessing,
104  sentinelException, errorMsg);
105  sharedResources_->alarmHandler_->moveToFailedState(sentinelException);
106  }
107  catch(...)
108  {
109  errorMsg += "Unknown exception";
110  XCEPT_DECLARE(stor::exception::FragmentProcessing,
111  sentinelException, errorMsg);
112  sharedResources_->alarmHandler_->moveToFailedState(sentinelException);
113  }
114 
115  return actionIsActive_;
116 }
SharedResourcesPtr sharedResources_
void FragmentProcessor::processOneFragment ( )
private

Process a single fragment. This should only be called if it has already been determined there is a place to put it.

Definition at line 136 of file FragmentProcessor.cc.

References stor::utils::getCurrentTime(), stor::I2OChain::memoryUsed(), sharedResources_, stateMachine_, and timeout_.

Referenced by processOneFragmentIfPossible().

137 {
138  I2OChain fragment;
139  FragmentQueuePtr fq = sharedResources_->fragmentQueue_;
141  if (fq->deqTimedWait(fragment, timeout_))
142  {
143  utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
144  sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
145  addFragmentProcessorIdleSample(elapsedTime);
146  sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
147  addPoppedFragmentSample(fragment.memoryUsed());
148 
149  stateMachine_->getCurrentState().processI2OFragment(fragment);
150  }
151  else
152  {
153  utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
154  sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
155  addFragmentProcessorIdleSample(elapsedTime);
156 
157  stateMachine_->getCurrentState().noFragmentToProcess();
158  }
159 }
TimePoint_t getCurrentTime()
Definition: Utils.h:158
SharedResourcesPtr sharedResources_
boost::shared_ptr< FragmentQueue > FragmentQueuePtr
Definition: FragmentQueue.h:22
boost::posix_time::time_duration Duration_t
Definition: Utils.h:41
boost::posix_time::ptime TimePoint_t
Definition: Utils.h:35
boost::posix_time::time_duration timeout_
size_t memoryUsed() const
Definition: I2OChain.cc:426
StateMachinePtr stateMachine_
void FragmentProcessor::processOneFragmentIfPossible ( )
private

Process a single fragment, if there is place to put it.

Definition at line 118 of file FragmentProcessor.cc.

References stor::FragmentStore::addToStaleEventTimes(), eventDistributor_, fragmentStore_, stor::EventDistributor::full(), stor::FragmentStore::full(), stor::utils::getCurrentTime(), processOneFragment(), sharedResources_, stor::utils::sleep(), and timeout_.

Referenced by processMessages().

119 {
121  {
123 
125 
126  utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
127  sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
128  addFragmentProcessorIdleSample(elapsedTime);
129 
131  }
132  else
134 }
TimePoint_t getCurrentTime()
Definition: Utils.h:158
SharedResourcesPtr sharedResources_
const bool full() const
void sleep(Duration_t)
Definition: Utils.h:163
boost::posix_time::time_duration Duration_t
Definition: Utils.h:41
bool full() const
Definition: FragmentStore.h:81
EventDistributor eventDistributor_
boost::posix_time::ptime TimePoint_t
Definition: Utils.h:35
void addToStaleEventTimes(const utils::Duration_t)
boost::posix_time::time_duration timeout_
void FragmentProcessor::startWorkLoop ( std::string  workloopName)

Create and start the fragment processing workloop

Definition at line 48 of file FragmentProcessor.cc.

References app_, alignCSCRings::e, edm::hlt::Exception, stor::utils::getIdentifier(), lumiQueryAPI::msg, processMessages(), processWL_, and AlCaHLTBitMon_QueryRunRegistry::string.

49 {
50  try
51  {
52  std::string identifier = utils::getIdentifier(app_->getApplicationDescriptor());
53 
54  processWL_ = toolbox::task::getWorkLoopFactory()->
55  getWorkLoop( identifier + workloopName, "waiting" );
56 
57  if ( ! processWL_->isActive() )
58  {
59  toolbox::task::ActionSignature* processAction =
60  toolbox::task::bind(this, &FragmentProcessor::processMessages,
61  identifier + "ProcessMessages");
62  processWL_->submit(processAction);
63 
64  processWL_->activate();
65  }
66  }
67  catch (xcept::Exception& e)
68  {
69  std::string msg = "Failed to start workloop 'FragmentProcessor' with 'processMessages'.";
70  XCEPT_RETHROW(stor::exception::FragmentProcessing, msg, e);
71  }
72 }
xdaq::Application * app_
toolbox::task::WorkLoop * processWL_
bool processMessages(toolbox::task::WorkLoop *)
std::string getIdentifier(xdaq::ApplicationDescriptor *)
Definition: Utils.cc:72

Member Data Documentation

bool stor::FragmentProcessor::actionIsActive_
private

Definition at line 92 of file FragmentProcessor.h.

Referenced by processMessages(), and ~FragmentProcessor().

xdaq::Application* stor::FragmentProcessor::app_
private

Definition at line 84 of file FragmentProcessor.h.

Referenced by startWorkLoop().

EventDistributor stor::FragmentProcessor::eventDistributor_
private
FragmentStore stor::FragmentProcessor::fragmentStore_
private

Definition at line 88 of file FragmentProcessor.h.

Referenced by FragmentProcessor(), and processOneFragmentIfPossible().

toolbox::task::WorkLoop* stor::FragmentProcessor::processWL_
private

Definition at line 94 of file FragmentProcessor.h.

Referenced by startWorkLoop(), and ~FragmentProcessor().

SharedResourcesPtr stor::FragmentProcessor::sharedResources_
private
StateMachinePtr stor::FragmentProcessor::stateMachine_
private

Definition at line 87 of file FragmentProcessor.h.

Referenced by FragmentProcessor(), processAllCommands(), and processOneFragment().

boost::posix_time::time_duration stor::FragmentProcessor::timeout_
private
WrapperNotifier stor::FragmentProcessor::wrapperNotifier_
private

Definition at line 86 of file FragmentProcessor.h.

Referenced by FragmentProcessor().