6 #include "toolbox/task/WorkLoopFactory.h"
7 #include "xcept/tools.h"
24 wrapperNotifier_( app ),
25 fragmentStore_(sr->configuration_->getQueueConfigurationParams().fragmentStoreMemoryLimitMB_),
26 eventDistributor_(sr),
54 processWL_ = toolbox::task::getWorkLoopFactory()->
55 getWorkLoop( identifier + workloopName,
"waiting" );
57 if ( ! processWL_->isActive() )
59 toolbox::task::ActionSignature* processAction =
61 identifier +
"ProcessMessages");
62 processWL_->submit(processAction);
64 processWL_->activate();
69 std::string msg =
"Failed to start workloop 'FragmentProcessor' with 'processMessages'.";
70 XCEPT_RETHROW(stor::exception::FragmentProcessing, msg, e);
80 errorMsg =
"Failed to process state machine events: ";
83 errorMsg =
"Failed to process consumer registrations: ";
86 errorMsg =
"Failed to process an event fragment: ";
89 catch(stor::exception::RBLookupFailed &
e)
96 XCEPT_DECLARE_NESTED( stor::exception::FragmentProcessing,
97 sentinelException, errorMsg, e );
102 errorMsg += e.what();
103 XCEPT_DECLARE(stor::exception::FragmentProcessing,
104 sentinelException, errorMsg);
109 errorMsg +=
"Unknown exception";
110 XCEPT_DECLARE(stor::exception::FragmentProcessing,
111 sentinelException, errorMsg);
128 addFragmentProcessorIdleSample(elapsedTime);
141 if (fq->deqTimedWait(fragment,
timeout_))
145 addFragmentProcessorIdleSample(elapsedTime);
147 addPoppedFragmentSample(fragment.
memoryUsed());
149 stateMachine_->getCurrentState().processI2OFragment(fragment);
155 addFragmentProcessorIdleSample(elapsedTime);
166 bool gotCommand =
false;
168 while( cq->deqNowait( evt ) )
190 while ( regQueue->deqNowait( regPtr ) )
TimePoint_t getCurrentTime()
SharedResourcesPtr sharedResources_
boost::shared_ptr< RegistrationInfoBase > RegPtr
boost::shared_ptr< CommandQueue > CommandQueuePtr
toolbox::task::WorkLoop * processWL_
FragmentProcessor(xdaq::Application *app, SharedResourcesPtr sr)
bool processMessages(toolbox::task::WorkLoop *)
boost::shared_ptr< SharedResources > SharedResourcesPtr
boost::shared_ptr< boost::statechart::event_base > EventPtr_t
void processAllCommands()
void processOneFragmentIfPossible()
boost::shared_ptr< FragmentQueue > FragmentQueuePtr
boost::posix_time::time_duration Duration_t
FragmentStore fragmentStore_
void processOneFragment()
EventDistributor eventDistributor_
boost::posix_time::time_duration FPdeqWaitTime_
boost::posix_time::ptime TimePoint_t
void addToStaleEventTimes(const utils::Duration_t)
boost::posix_time::time_duration timeout_
std::string getIdentifier(xdaq::ApplicationDescriptor *)
void startWorkLoop(std::string workloopName)
size_t memoryUsed() const
WrapperNotifier wrapperNotifier_
void processAllRegistrations()
StateMachinePtr stateMachine_
boost::shared_ptr< RegistrationQueue > RegistrationQueuePtr