Go to the documentation of this file.00001
00003
00004 #include <unistd.h>
00005
00006 #include "toolbox/task/WorkLoopFactory.h"
00007 #include "xcept/tools.h"
00008
00009 #include "EventFilter/StorageManager/interface/Exception.h"
00010 #include "EventFilter/StorageManager/interface/FragmentProcessor.h"
00011 #include "EventFilter/StorageManager/interface/I2OChain.h"
00012 #include "EventFilter/StorageManager/interface/QueueID.h"
00013 #include "EventFilter/StorageManager/interface/StateMachine.h"
00014 #include "EventFilter/StorageManager/interface/StatisticsReporter.h"
00015
00016 using namespace stor;
00017
00018
00019 FragmentProcessor::FragmentProcessor( xdaq::Application *app,
00020 SharedResourcesPtr sr ) :
00021 app_(app),
00022 sharedResources_(sr),
00023 wrapperNotifier_( app ),
00024 fragmentStore_(sr->configuration_->getQueueConfigurationParams().fragmentStoreMemoryLimitMB_),
00025 eventDistributor_(sr),
00026 actionIsActive_(true)
00027 {
00028 stateMachine_.reset( new StateMachine( &eventDistributor_,
00029 &fragmentStore_, &wrapperNotifier_,
00030 sharedResources_ ) );
00031 stateMachine_->initiate();
00032
00033 WorkerThreadParams workerParams =
00034 sharedResources_->configuration_->getWorkerThreadParams();
00035 timeout_ = workerParams.FPdeqWaitTime_;
00036 }
00037
00038 FragmentProcessor::~FragmentProcessor()
00039 {
00040
00041 actionIsActive_ = false;
00042
00043
00044 processWL_->cancel();
00045 }
00046
00047 void FragmentProcessor::startWorkLoop(std::string workloopName)
00048 {
00049 try
00050 {
00051 std::string identifier = utils::getIdentifier(app_->getApplicationDescriptor());
00052
00053 processWL_ = toolbox::task::getWorkLoopFactory()->
00054 getWorkLoop( identifier + workloopName, "waiting" );
00055
00056 if ( ! processWL_->isActive() )
00057 {
00058 toolbox::task::ActionSignature* processAction =
00059 toolbox::task::bind(this, &FragmentProcessor::processMessages,
00060 identifier + "ProcessMessages");
00061 processWL_->submit(processAction);
00062
00063 processWL_->activate();
00064 }
00065 }
00066 catch (xcept::Exception& e)
00067 {
00068 std::string msg = "Failed to start workloop 'FragmentProcessor' with 'processMessages'.";
00069 XCEPT_RETHROW(stor::exception::FragmentProcessing, msg, e);
00070 }
00071 }
00072
00073 bool FragmentProcessor::processMessages(toolbox::task::WorkLoop*)
00074 {
00075 std::string errorMsg;
00076
00077 try
00078 {
00079 errorMsg = "Failed to process state machine events: ";
00080 processAllCommands();
00081
00082 errorMsg = "Failed to process consumer registrations: ";
00083 processAllRegistrations();
00084
00085 errorMsg = "Failed to process an event fragment: ";
00086 processOneFragmentIfPossible();
00087 }
00088 catch(stor::exception::RBLookupFailed &e)
00089 {
00090 sharedResources_->statisticsReporter_->alarmHandler()->
00091 notifySentinel(AlarmHandler::ERROR, e);
00092 }
00093 catch(xcept::Exception &e)
00094 {
00095 XCEPT_DECLARE_NESTED( stor::exception::FragmentProcessing,
00096 sentinelException, errorMsg, e );
00097 sharedResources_->moveToFailedState(sentinelException);
00098 }
00099 catch(std::exception &e)
00100 {
00101 errorMsg += e.what();
00102 XCEPT_DECLARE(stor::exception::FragmentProcessing,
00103 sentinelException, errorMsg);
00104 sharedResources_->moveToFailedState(sentinelException);
00105 }
00106 catch(...)
00107 {
00108 errorMsg += "Unknown exception";
00109 XCEPT_DECLARE(stor::exception::FragmentProcessing,
00110 sentinelException, errorMsg);
00111 sharedResources_->moveToFailedState(sentinelException);
00112 }
00113
00114 return actionIsActive_;
00115 }
00116
00117 void FragmentProcessor::processOneFragmentIfPossible()
00118 {
00119 if (fragmentStore_.full() || eventDistributor_.full())
00120 {
00121 utils::TimePoint_t startTime = utils::getCurrentTime();
00122
00123 utils::sleep(timeout_);
00124
00125 utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
00126 sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
00127 addFragmentProcessorIdleSample(elapsedTime);
00128
00129 fragmentStore_.addToStaleEventTimes(elapsedTime);
00130 }
00131 else
00132 processOneFragment();
00133 }
00134
00135 void FragmentProcessor::processOneFragment()
00136 {
00137 I2OChain fragment;
00138 FragmentQueuePtr fq = sharedResources_->fragmentQueue_;
00139 utils::TimePoint_t startTime = utils::getCurrentTime();
00140 if (fq->deqTimedWait(fragment, timeout_))
00141 {
00142 utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
00143 sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
00144 addFragmentProcessorIdleSample(elapsedTime);
00145 sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
00146 addPoppedFragmentSample(fragment.memoryUsed());
00147
00148 stateMachine_->getCurrentState().processI2OFragment(fragment);
00149 }
00150 else
00151 {
00152 utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
00153 sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
00154 addFragmentProcessorIdleSample(elapsedTime);
00155
00156 stateMachine_->getCurrentState().noFragmentToProcess();
00157 }
00158 }
00159
00160
00161 void FragmentProcessor::processAllCommands()
00162 {
00163 CommandQueuePtr cq = sharedResources_->commandQueue_;
00164 stor::EventPtr_t evt;
00165 bool gotCommand = false;
00166
00167 while( cq->deqNowait( evt ) )
00168 {
00169 gotCommand = true;
00170 stateMachine_->process_event( *evt );
00171 }
00172
00173
00174
00175 if (gotCommand)
00176 {
00177 WorkerThreadParams workerParams =
00178 sharedResources_->configuration_->getWorkerThreadParams();
00179 timeout_ = workerParams.FPdeqWaitTime_;
00180 }
00181 }
00182
00183
00184 void FragmentProcessor::processAllRegistrations()
00185 {
00186 RegPtr regPtr;
00187 RegistrationQueuePtr regQueue =
00188 sharedResources_->registrationQueue_;
00189 while ( regQueue->deqNowait( regPtr ) )
00190 {
00191 regPtr->registerMe( &eventDistributor_ );
00192 }
00193 }
00194
00195