CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_5_3_3/src/EventFilter/StorageManager/src/FragmentProcessor.cc

Go to the documentation of this file.
00001 // $Id: FragmentProcessor.cc,v 1.20 2011/11/08 10:48:40 mommsen Exp $
00003 
00004 #include <unistd.h>
00005 
00006 #include "toolbox/task/WorkLoopFactory.h"
00007 #include "xcept/tools.h"
00008 
00009 #include "EventFilter/StorageManager/interface/AlarmHandler.h"
00010 #include "EventFilter/StorageManager/interface/Exception.h"
00011 #include "EventFilter/StorageManager/interface/FragmentProcessor.h"
00012 #include "EventFilter/StorageManager/interface/I2OChain.h"
00013 #include "EventFilter/StorageManager/interface/QueueID.h"
00014 #include "EventFilter/StorageManager/interface/StateMachine.h"
00015 #include "EventFilter/StorageManager/interface/StatisticsReporter.h"
00016 
00017 using namespace stor;
00018 
00019 
00020 FragmentProcessor::FragmentProcessor( xdaq::Application *app,
00021                                       SharedResourcesPtr sr ) :
00022   app_(app),
00023   sharedResources_(sr),
00024   wrapperNotifier_( app ),
00025   fragmentStore_(sr->configuration_->getQueueConfigurationParams().fragmentStoreMemoryLimitMB_),
00026   eventDistributor_(sr),
00027   actionIsActive_(true)
00028 {
00029   stateMachine_.reset( new StateMachine( &eventDistributor_,
00030                                          &fragmentStore_, &wrapperNotifier_,
00031                                          sharedResources_ ) );
00032   stateMachine_->initiate();
00033 
00034   WorkerThreadParams workerParams =
00035     sharedResources_->configuration_->getWorkerThreadParams();
00036   timeout_ = workerParams.FPdeqWaitTime_;
00037 }
00038 
00039 FragmentProcessor::~FragmentProcessor()
00040 {
00041   // Stop the activity
00042   actionIsActive_ = false;
00043 
00044   // Cancel the workloop (will wait until the action has finished)
00045   processWL_->cancel();
00046 }
00047 
00048 void FragmentProcessor::startWorkLoop(std::string workloopName)
00049 {
00050   try
00051     {
00052       std::string identifier = utils::getIdentifier(app_->getApplicationDescriptor());
00053 
00054       processWL_ = toolbox::task::getWorkLoopFactory()->
00055         getWorkLoop( identifier + workloopName, "waiting" );
00056 
00057       if ( ! processWL_->isActive() )
00058         {
00059           toolbox::task::ActionSignature* processAction = 
00060             toolbox::task::bind(this, &FragmentProcessor::processMessages, 
00061                                 identifier + "ProcessMessages");
00062           processWL_->submit(processAction);
00063 
00064           processWL_->activate();
00065         }
00066     }
00067   catch (xcept::Exception& e)
00068     {
00069       std::string msg = "Failed to start workloop 'FragmentProcessor' with 'processMessages'.";
00070     XCEPT_RETHROW(stor::exception::FragmentProcessing, msg, e);
00071   }
00072 }
00073 
00074 bool FragmentProcessor::processMessages(toolbox::task::WorkLoop*)
00075 {
00076   std::string errorMsg;
00077 
00078   try
00079   {
00080     errorMsg = "Failed to process state machine events: ";
00081     processAllCommands();
00082     
00083     errorMsg = "Failed to process consumer registrations: ";
00084     processAllRegistrations();
00085     
00086     errorMsg = "Failed to process an event fragment: ";
00087     processOneFragmentIfPossible();
00088   }
00089   catch(stor::exception::RBLookupFailed &e)
00090   {
00091     sharedResources_->alarmHandler_->
00092       notifySentinel(AlarmHandler::ERROR, e);
00093   }
00094   catch(xcept::Exception &e)
00095   {
00096     XCEPT_DECLARE_NESTED( stor::exception::FragmentProcessing,
00097                           sentinelException, errorMsg, e );
00098     sharedResources_->alarmHandler_->moveToFailedState(sentinelException);
00099   }
00100   catch(std::exception &e)
00101   {
00102     errorMsg += e.what();
00103     XCEPT_DECLARE(stor::exception::FragmentProcessing,
00104       sentinelException, errorMsg);
00105     sharedResources_->alarmHandler_->moveToFailedState(sentinelException);
00106   }
00107   catch(...)
00108   {
00109     errorMsg += "Unknown exception";
00110     XCEPT_DECLARE(stor::exception::FragmentProcessing,
00111       sentinelException, errorMsg);
00112     sharedResources_->alarmHandler_->moveToFailedState(sentinelException);
00113   }
00114 
00115   return actionIsActive_;
00116 }
00117 
00118 void FragmentProcessor::processOneFragmentIfPossible()
00119 {
00120   if (fragmentStore_.full() || eventDistributor_.full()) 
00121   {
00122     utils::TimePoint_t startTime = utils::getCurrentTime();
00123 
00124     utils::sleep(timeout_);
00125 
00126     utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
00127     sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
00128       addFragmentProcessorIdleSample(elapsedTime);
00129 
00130     fragmentStore_.addToStaleEventTimes(elapsedTime);
00131   }
00132   else 
00133     processOneFragment();
00134 }
00135 
00136 void FragmentProcessor::processOneFragment()
00137 {
00138   I2OChain fragment;
00139   FragmentQueuePtr fq = sharedResources_->fragmentQueue_;
00140   utils::TimePoint_t startTime = utils::getCurrentTime();
00141   if (fq->deqTimedWait(fragment, timeout_))
00142     {
00143       utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
00144       sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
00145         addFragmentProcessorIdleSample(elapsedTime);
00146       sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
00147         addPoppedFragmentSample(fragment.memoryUsed());
00148 
00149       stateMachine_->getCurrentState().processI2OFragment(fragment);
00150     }
00151   else
00152     {
00153       utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
00154       sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
00155         addFragmentProcessorIdleSample(elapsedTime);
00156 
00157       stateMachine_->getCurrentState().noFragmentToProcess();  
00158     }
00159 }
00160 
00161 
00162 void FragmentProcessor::processAllCommands()
00163 {
00164   CommandQueuePtr cq = sharedResources_->commandQueue_;
00165   stor::EventPtr_t evt;
00166   bool gotCommand = false;
00167 
00168   while( cq->deqNowait( evt ) )
00169     {
00170       gotCommand = true;
00171       stateMachine_->process_event( *evt );
00172     }
00173 
00174   // the timeout value may have changed if the transition was
00175   // a Configuration transition, so check for a new value here
00176   if (gotCommand)
00177     {
00178       WorkerThreadParams workerParams =
00179         sharedResources_->configuration_->getWorkerThreadParams();
00180       timeout_ = workerParams.FPdeqWaitTime_;
00181     }
00182 }
00183 
00184 
00185 void FragmentProcessor::processAllRegistrations()
00186 {
00187   RegPtr regPtr;
00188   RegistrationQueuePtr regQueue =
00189     sharedResources_->registrationQueue_;
00190   while ( regQueue->deqNowait( regPtr ) )
00191   {
00192     regPtr->registerMe( &eventDistributor_ );
00193   }
00194 }
00195 
00196