CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_4_1_8_patch13/src/EventFilter/StorageManager/src/FragmentProcessor.cc

Go to the documentation of this file.
00001 // $Id: FragmentProcessor.cc,v 1.19 2011/03/30 15:16:48 mommsen Exp $
00003 
00004 #include <unistd.h>
00005 
00006 #include "toolbox/task/WorkLoopFactory.h"
00007 #include "xcept/tools.h"
00008 
00009 #include "EventFilter/StorageManager/interface/Exception.h"
00010 #include "EventFilter/StorageManager/interface/FragmentProcessor.h"
00011 #include "EventFilter/StorageManager/interface/I2OChain.h"
00012 #include "EventFilter/StorageManager/interface/QueueID.h"
00013 #include "EventFilter/StorageManager/interface/StateMachine.h"
00014 #include "EventFilter/StorageManager/interface/StatisticsReporter.h"
00015 
00016 using namespace stor;
00017 
00018 
00019 FragmentProcessor::FragmentProcessor( xdaq::Application *app,
00020                                       SharedResourcesPtr sr ) :
00021   app_(app),
00022   sharedResources_(sr),
00023   wrapperNotifier_( app ),
00024   fragmentStore_(sr->configuration_->getQueueConfigurationParams().fragmentStoreMemoryLimitMB_),
00025   eventDistributor_(sr),
00026   actionIsActive_(true)
00027 {
00028   stateMachine_.reset( new StateMachine( &eventDistributor_,
00029                                          &fragmentStore_, &wrapperNotifier_,
00030                                          sharedResources_ ) );
00031   stateMachine_->initiate();
00032 
00033   WorkerThreadParams workerParams =
00034     sharedResources_->configuration_->getWorkerThreadParams();
00035   timeout_ = workerParams.FPdeqWaitTime_;
00036 }
00037 
00038 FragmentProcessor::~FragmentProcessor()
00039 {
00040   // Stop the activity
00041   actionIsActive_ = false;
00042 
00043   // Cancel the workloop (will wait until the action has finished)
00044   processWL_->cancel();
00045 }
00046 
00047 void FragmentProcessor::startWorkLoop(std::string workloopName)
00048 {
00049   try
00050     {
00051       std::string identifier = utils::getIdentifier(app_->getApplicationDescriptor());
00052 
00053       processWL_ = toolbox::task::getWorkLoopFactory()->
00054         getWorkLoop( identifier + workloopName, "waiting" );
00055 
00056       if ( ! processWL_->isActive() )
00057         {
00058           toolbox::task::ActionSignature* processAction = 
00059             toolbox::task::bind(this, &FragmentProcessor::processMessages, 
00060                                 identifier + "ProcessMessages");
00061           processWL_->submit(processAction);
00062 
00063           processWL_->activate();
00064         }
00065     }
00066   catch (xcept::Exception& e)
00067     {
00068       std::string msg = "Failed to start workloop 'FragmentProcessor' with 'processMessages'.";
00069     XCEPT_RETHROW(stor::exception::FragmentProcessing, msg, e);
00070   }
00071 }
00072 
00073 bool FragmentProcessor::processMessages(toolbox::task::WorkLoop*)
00074 {
00075   std::string errorMsg;
00076 
00077   try
00078   {
00079     errorMsg = "Failed to process state machine events: ";
00080     processAllCommands();
00081     
00082     errorMsg = "Failed to process consumer registrations: ";
00083     processAllRegistrations();
00084     
00085     errorMsg = "Failed to process an event fragment: ";
00086     processOneFragmentIfPossible();
00087   }
00088   catch(stor::exception::RBLookupFailed &e)
00089   {
00090     sharedResources_->statisticsReporter_->alarmHandler()->
00091       notifySentinel(AlarmHandler::ERROR, e);
00092   }
00093   catch(xcept::Exception &e)
00094   {
00095     XCEPT_DECLARE_NESTED( stor::exception::FragmentProcessing,
00096                           sentinelException, errorMsg, e );
00097     sharedResources_->moveToFailedState(sentinelException);
00098   }
00099   catch(std::exception &e)
00100   {
00101     errorMsg += e.what();
00102     XCEPT_DECLARE(stor::exception::FragmentProcessing,
00103       sentinelException, errorMsg);
00104     sharedResources_->moveToFailedState(sentinelException);
00105   }
00106   catch(...)
00107   {
00108     errorMsg += "Unknown exception";
00109     XCEPT_DECLARE(stor::exception::FragmentProcessing,
00110       sentinelException, errorMsg);
00111     sharedResources_->moveToFailedState(sentinelException);
00112   }
00113 
00114   return actionIsActive_;
00115 }
00116 
00117 void FragmentProcessor::processOneFragmentIfPossible()
00118 {
00119   if (fragmentStore_.full() || eventDistributor_.full()) 
00120   {
00121     utils::TimePoint_t startTime = utils::getCurrentTime();
00122 
00123     utils::sleep(timeout_);
00124 
00125     utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
00126     sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
00127       addFragmentProcessorIdleSample(elapsedTime);
00128 
00129     fragmentStore_.addToStaleEventTimes(elapsedTime);
00130   }
00131   else 
00132     processOneFragment();
00133 }
00134 
00135 void FragmentProcessor::processOneFragment()
00136 {
00137   I2OChain fragment;
00138   FragmentQueuePtr fq = sharedResources_->fragmentQueue_;
00139   utils::TimePoint_t startTime = utils::getCurrentTime();
00140   if (fq->deqTimedWait(fragment, timeout_))
00141     {
00142       utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
00143       sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
00144         addFragmentProcessorIdleSample(elapsedTime);
00145       sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
00146         addPoppedFragmentSample(fragment.memoryUsed());
00147 
00148       stateMachine_->getCurrentState().processI2OFragment(fragment);
00149     }
00150   else
00151     {
00152       utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
00153       sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
00154         addFragmentProcessorIdleSample(elapsedTime);
00155 
00156       stateMachine_->getCurrentState().noFragmentToProcess();  
00157     }
00158 }
00159 
00160 
00161 void FragmentProcessor::processAllCommands()
00162 {
00163   CommandQueuePtr cq = sharedResources_->commandQueue_;
00164   stor::EventPtr_t evt;
00165   bool gotCommand = false;
00166 
00167   while( cq->deqNowait( evt ) )
00168     {
00169       gotCommand = true;
00170       stateMachine_->process_event( *evt );
00171     }
00172 
00173   // the timeout value may have changed if the transition was
00174   // a Configuration transition, so check for a new value here
00175   if (gotCommand)
00176     {
00177       WorkerThreadParams workerParams =
00178         sharedResources_->configuration_->getWorkerThreadParams();
00179       timeout_ = workerParams.FPdeqWaitTime_;
00180     }
00181 }
00182 
00183 
00184 void FragmentProcessor::processAllRegistrations()
00185 {
00186   RegPtr regPtr;
00187   RegistrationQueuePtr regQueue =
00188     sharedResources_->registrationQueue_;
00189   while ( regQueue->deqNowait( regPtr ) )
00190   {
00191     regPtr->registerMe( &eventDistributor_ );
00192   }
00193 }
00194 
00195