CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
FragmentProcessor.cc
Go to the documentation of this file.
1 // $Id: FragmentProcessor.cc,v 1.20 2011/11/08 10:48:40 mommsen Exp $
3 
4 #include <unistd.h>
5 
6 #include "toolbox/task/WorkLoopFactory.h"
7 #include "xcept/tools.h"
8 
16 
17 using namespace stor;
18 
19 
20 FragmentProcessor::FragmentProcessor( xdaq::Application *app,
21  SharedResourcesPtr sr ) :
22  app_(app),
23  sharedResources_(sr),
24  wrapperNotifier_( app ),
25  fragmentStore_(sr->configuration_->getQueueConfigurationParams().fragmentStoreMemoryLimitMB_),
26  eventDistributor_(sr),
27  actionIsActive_(true)
28 {
31  sharedResources_ ) );
32  stateMachine_->initiate();
33 
34  WorkerThreadParams workerParams =
35  sharedResources_->configuration_->getWorkerThreadParams();
36  timeout_ = workerParams.FPdeqWaitTime_;
37 }
38 
40 {
41  // Stop the activity
42  actionIsActive_ = false;
43 
44  // Cancel the workloop (will wait until the action has finished)
45  processWL_->cancel();
46 }
47 
49 {
50  try
51  {
52  std::string identifier = utils::getIdentifier(app_->getApplicationDescriptor());
53 
54  processWL_ = toolbox::task::getWorkLoopFactory()->
55  getWorkLoop( identifier + workloopName, "waiting" );
56 
57  if ( ! processWL_->isActive() )
58  {
59  toolbox::task::ActionSignature* processAction =
60  toolbox::task::bind(this, &FragmentProcessor::processMessages,
61  identifier + "ProcessMessages");
62  processWL_->submit(processAction);
63 
64  processWL_->activate();
65  }
66  }
67  catch (xcept::Exception& e)
68  {
69  std::string msg = "Failed to start workloop 'FragmentProcessor' with 'processMessages'.";
70  XCEPT_RETHROW(stor::exception::FragmentProcessing, msg, e);
71  }
72 }
73 
74 bool FragmentProcessor::processMessages(toolbox::task::WorkLoop*)
75 {
76  std::string errorMsg;
77 
78  try
79  {
80  errorMsg = "Failed to process state machine events: ";
82 
83  errorMsg = "Failed to process consumer registrations: ";
85 
86  errorMsg = "Failed to process an event fragment: ";
88  }
89  catch(stor::exception::RBLookupFailed &e)
90  {
91  sharedResources_->alarmHandler_->
92  notifySentinel(AlarmHandler::ERROR, e);
93  }
94  catch(xcept::Exception &e)
95  {
96  XCEPT_DECLARE_NESTED( stor::exception::FragmentProcessing,
97  sentinelException, errorMsg, e );
98  sharedResources_->alarmHandler_->moveToFailedState(sentinelException);
99  }
100  catch(std::exception &e)
101  {
102  errorMsg += e.what();
103  XCEPT_DECLARE(stor::exception::FragmentProcessing,
104  sentinelException, errorMsg);
105  sharedResources_->alarmHandler_->moveToFailedState(sentinelException);
106  }
107  catch(...)
108  {
109  errorMsg += "Unknown exception";
110  XCEPT_DECLARE(stor::exception::FragmentProcessing,
111  sentinelException, errorMsg);
112  sharedResources_->alarmHandler_->moveToFailedState(sentinelException);
113  }
114 
115  return actionIsActive_;
116 }
117 
119 {
121  {
123 
125 
126  utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
127  sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
128  addFragmentProcessorIdleSample(elapsedTime);
129 
131  }
132  else
134 }
135 
137 {
138  I2OChain fragment;
139  FragmentQueuePtr fq = sharedResources_->fragmentQueue_;
141  if (fq->deqTimedWait(fragment, timeout_))
142  {
143  utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
144  sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
145  addFragmentProcessorIdleSample(elapsedTime);
146  sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
147  addPoppedFragmentSample(fragment.memoryUsed());
148 
149  stateMachine_->getCurrentState().processI2OFragment(fragment);
150  }
151  else
152  {
153  utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
154  sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
155  addFragmentProcessorIdleSample(elapsedTime);
156 
157  stateMachine_->getCurrentState().noFragmentToProcess();
158  }
159 }
160 
161 
163 {
164  CommandQueuePtr cq = sharedResources_->commandQueue_;
165  stor::EventPtr_t evt;
166  bool gotCommand = false;
167 
168  while( cq->deqNowait( evt ) )
169  {
170  gotCommand = true;
171  stateMachine_->process_event( *evt );
172  }
173 
174  // the timeout value may have changed if the transition was
175  // a Configuration transition, so check for a new value here
176  if (gotCommand)
177  {
178  WorkerThreadParams workerParams =
179  sharedResources_->configuration_->getWorkerThreadParams();
180  timeout_ = workerParams.FPdeqWaitTime_;
181  }
182 }
183 
184 
186 {
187  RegPtr regPtr;
188  RegistrationQueuePtr regQueue =
189  sharedResources_->registrationQueue_;
190  while ( regQueue->deqNowait( regPtr ) )
191  {
192  regPtr->registerMe( &eventDistributor_ );
193  }
194 }
195 
196 
TimePoint_t getCurrentTime()
Definition: Utils.h:158
SharedResourcesPtr sharedResources_
boost::shared_ptr< RegistrationInfoBase > RegPtr
boost::shared_ptr< CommandQueue > CommandQueuePtr
Definition: CommandQueue.h:23
xdaq::Application * app_
toolbox::task::WorkLoop * processWL_
FragmentProcessor(xdaq::Application *app, SharedResourcesPtr sr)
bool processMessages(toolbox::task::WorkLoop *)
boost::shared_ptr< SharedResources > SharedResourcesPtr
const bool full() const
void sleep(Duration_t)
Definition: Utils.h:163
boost::shared_ptr< boost::statechart::event_base > EventPtr_t
Definition: CommandQueue.h:21
boost::shared_ptr< FragmentQueue > FragmentQueuePtr
Definition: FragmentQueue.h:22
boost::posix_time::time_duration Duration_t
Definition: Utils.h:41
bool full() const
Definition: FragmentStore.h:81
EventDistributor eventDistributor_
boost::posix_time::time_duration FPdeqWaitTime_
boost::posix_time::ptime TimePoint_t
Definition: Utils.h:35
void addToStaleEventTimes(const utils::Duration_t)
boost::posix_time::time_duration timeout_
std::string getIdentifier(xdaq::ApplicationDescriptor *)
Definition: Utils.cc:72
void startWorkLoop(std::string workloopName)
size_t memoryUsed() const
Definition: I2OChain.cc:426
WrapperNotifier wrapperNotifier_
StateMachinePtr stateMachine_
boost::shared_ptr< RegistrationQueue > RegistrationQueuePtr