Go to the documentation of this file.00001
00003
00004 #include <unistd.h>
00005
00006 #include "toolbox/task/WorkLoopFactory.h"
00007 #include "xcept/tools.h"
00008
00009 #include "EventFilter/StorageManager/interface/AlarmHandler.h"
00010 #include "EventFilter/StorageManager/interface/Exception.h"
00011 #include "EventFilter/StorageManager/interface/FragmentProcessor.h"
00012 #include "EventFilter/StorageManager/interface/I2OChain.h"
00013 #include "EventFilter/StorageManager/interface/QueueID.h"
00014 #include "EventFilter/StorageManager/interface/StateMachine.h"
00015 #include "EventFilter/StorageManager/interface/StatisticsReporter.h"
00016
00017 using namespace stor;
00018
00019
00020 FragmentProcessor::FragmentProcessor( xdaq::Application *app,
00021 SharedResourcesPtr sr ) :
00022 app_(app),
00023 sharedResources_(sr),
00024 wrapperNotifier_( app ),
00025 fragmentStore_(sr->configuration_->getQueueConfigurationParams().fragmentStoreMemoryLimitMB_),
00026 eventDistributor_(sr),
00027 actionIsActive_(true)
00028 {
00029 stateMachine_.reset( new StateMachine( &eventDistributor_,
00030 &fragmentStore_, &wrapperNotifier_,
00031 sharedResources_ ) );
00032 stateMachine_->initiate();
00033
00034 WorkerThreadParams workerParams =
00035 sharedResources_->configuration_->getWorkerThreadParams();
00036 timeout_ = workerParams.FPdeqWaitTime_;
00037 }
00038
00039 FragmentProcessor::~FragmentProcessor()
00040 {
00041
00042 actionIsActive_ = false;
00043
00044
00045 processWL_->cancel();
00046 }
00047
00048 void FragmentProcessor::startWorkLoop(std::string workloopName)
00049 {
00050 try
00051 {
00052 std::string identifier = utils::getIdentifier(app_->getApplicationDescriptor());
00053
00054 processWL_ = toolbox::task::getWorkLoopFactory()->
00055 getWorkLoop( identifier + workloopName, "waiting" );
00056
00057 if ( ! processWL_->isActive() )
00058 {
00059 toolbox::task::ActionSignature* processAction =
00060 toolbox::task::bind(this, &FragmentProcessor::processMessages,
00061 identifier + "ProcessMessages");
00062 processWL_->submit(processAction);
00063
00064 processWL_->activate();
00065 }
00066 }
00067 catch (xcept::Exception& e)
00068 {
00069 std::string msg = "Failed to start workloop 'FragmentProcessor' with 'processMessages'.";
00070 XCEPT_RETHROW(stor::exception::FragmentProcessing, msg, e);
00071 }
00072 }
00073
00074 bool FragmentProcessor::processMessages(toolbox::task::WorkLoop*)
00075 {
00076 std::string errorMsg;
00077
00078 try
00079 {
00080 errorMsg = "Failed to process state machine events: ";
00081 processAllCommands();
00082
00083 errorMsg = "Failed to process consumer registrations: ";
00084 processAllRegistrations();
00085
00086 errorMsg = "Failed to process an event fragment: ";
00087 processOneFragmentIfPossible();
00088 }
00089 catch(stor::exception::RBLookupFailed &e)
00090 {
00091 sharedResources_->alarmHandler_->
00092 notifySentinel(AlarmHandler::ERROR, e);
00093 }
00094 catch(xcept::Exception &e)
00095 {
00096 XCEPT_DECLARE_NESTED( stor::exception::FragmentProcessing,
00097 sentinelException, errorMsg, e );
00098 sharedResources_->alarmHandler_->moveToFailedState(sentinelException);
00099 }
00100 catch(std::exception &e)
00101 {
00102 errorMsg += e.what();
00103 XCEPT_DECLARE(stor::exception::FragmentProcessing,
00104 sentinelException, errorMsg);
00105 sharedResources_->alarmHandler_->moveToFailedState(sentinelException);
00106 }
00107 catch(...)
00108 {
00109 errorMsg += "Unknown exception";
00110 XCEPT_DECLARE(stor::exception::FragmentProcessing,
00111 sentinelException, errorMsg);
00112 sharedResources_->alarmHandler_->moveToFailedState(sentinelException);
00113 }
00114
00115 return actionIsActive_;
00116 }
00117
00118 void FragmentProcessor::processOneFragmentIfPossible()
00119 {
00120 if (fragmentStore_.full() || eventDistributor_.full())
00121 {
00122 utils::TimePoint_t startTime = utils::getCurrentTime();
00123
00124 utils::sleep(timeout_);
00125
00126 utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
00127 sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
00128 addFragmentProcessorIdleSample(elapsedTime);
00129
00130 fragmentStore_.addToStaleEventTimes(elapsedTime);
00131 }
00132 else
00133 processOneFragment();
00134 }
00135
00136 void FragmentProcessor::processOneFragment()
00137 {
00138 I2OChain fragment;
00139 FragmentQueuePtr fq = sharedResources_->fragmentQueue_;
00140 utils::TimePoint_t startTime = utils::getCurrentTime();
00141 if (fq->deqTimedWait(fragment, timeout_))
00142 {
00143 utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
00144 sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
00145 addFragmentProcessorIdleSample(elapsedTime);
00146 sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
00147 addPoppedFragmentSample(fragment.memoryUsed());
00148
00149 stateMachine_->getCurrentState().processI2OFragment(fragment);
00150 }
00151 else
00152 {
00153 utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
00154 sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
00155 addFragmentProcessorIdleSample(elapsedTime);
00156
00157 stateMachine_->getCurrentState().noFragmentToProcess();
00158 }
00159 }
00160
00161
00162 void FragmentProcessor::processAllCommands()
00163 {
00164 CommandQueuePtr cq = sharedResources_->commandQueue_;
00165 stor::EventPtr_t evt;
00166 bool gotCommand = false;
00167
00168 while( cq->deqNowait( evt ) )
00169 {
00170 gotCommand = true;
00171 stateMachine_->process_event( *evt );
00172 }
00173
00174
00175
00176 if (gotCommand)
00177 {
00178 WorkerThreadParams workerParams =
00179 sharedResources_->configuration_->getWorkerThreadParams();
00180 timeout_ = workerParams.FPdeqWaitTime_;
00181 }
00182 }
00183
00184
00185 void FragmentProcessor::processAllRegistrations()
00186 {
00187 RegPtr regPtr;
00188 RegistrationQueuePtr regQueue =
00189 sharedResources_->registrationQueue_;
00190 while ( regQueue->deqNowait( regPtr ) )
00191 {
00192 regPtr->registerMe( &eventDistributor_ );
00193 }
00194 }
00195
00196