CMS 3D CMS Logo

Public Member Functions | Private Member Functions | Private Attributes

stor::FragmentProcessor Class Reference

#include <FragmentProcessor.h>

List of all members.

Public Member Functions

 FragmentProcessor (xdaq::Application *app, SharedResourcesPtr sr)
bool processMessages (toolbox::task::WorkLoop *)
void startWorkLoop (std::string workloopName)
 ~FragmentProcessor ()

Private Member Functions

void processAllCommands ()
void processAllRegistrations ()
void processOneFragment ()
void processOneFragmentIfPossible ()

Private Attributes

bool actionIsActive_
xdaq::Application * app_
EventDistributor eventDistributor_
FragmentStore fragmentStore_
toolbox::task::WorkLoop * processWL_
SharedResourcesPtr sharedResources_
StateMachinePtr stateMachine_
boost::posix_time::time_duration timeout_
WrapperNotifier wrapperNotifier_

Detailed Description

Processes I2O event fragments

It pops the next fragment from the FragmentQueue and adds it to the FragmentStore. If this completes the event, it hands it to the EventDistributor.

Author:
mommsen
Revision:
1.6
Date:
2011/03/07 15:31:32

Definition at line 40 of file FragmentProcessor.h.


Constructor & Destructor Documentation

FragmentProcessor::FragmentProcessor ( xdaq::Application *  app,
SharedResourcesPtr  sr 
)

Definition at line 20 of file FragmentProcessor.cc.

References eventDistributor_, stor::WorkerThreadParams::FPdeqWaitTime_, fragmentStore_, sharedResources_, stateMachine_, timeout_, and wrapperNotifier_.

                                                              :
  app_(app),
  sharedResources_(sr),
  wrapperNotifier_( app ),
  fragmentStore_(sr->configuration_->getQueueConfigurationParams().fragmentStoreMemoryLimitMB_),
  eventDistributor_(sr),
  actionIsActive_(true)
{
  stateMachine_.reset( new StateMachine( &eventDistributor_,
                                         &fragmentStore_, &wrapperNotifier_,
                                         sharedResources_ ) );
  stateMachine_->initiate();

  WorkerThreadParams workerParams =
    sharedResources_->configuration_->getWorkerThreadParams();
  timeout_ = workerParams.FPdeqWaitTime_;
}
FragmentProcessor::~FragmentProcessor ( )

Definition at line 39 of file FragmentProcessor.cc.

References actionIsActive_, and processWL_.

{
  // Stop the activity
  actionIsActive_ = false;

  // Cancel the workloop (will wait until the action has finished)
  processWL_->cancel();
}

Member Function Documentation

void FragmentProcessor::processAllCommands ( ) [private]

Processes all state machine events in the command queue

Definition at line 162 of file FragmentProcessor.cc.

References stor::WorkerThreadParams::FPdeqWaitTime_, sharedResources_, stateMachine_, and timeout_.

Referenced by processMessages().

{
  CommandQueuePtr cq = sharedResources_->commandQueue_;
  stor::EventPtr_t evt;
  bool gotCommand = false;

  while( cq->deqNowait( evt ) )
    {
      gotCommand = true;
      stateMachine_->process_event( *evt );
    }

  // the timeout value may have changed if the transition was
  // a Configuration transition, so check for a new value here
  if (gotCommand)
    {
      WorkerThreadParams workerParams =
        sharedResources_->configuration_->getWorkerThreadParams();
      timeout_ = workerParams.FPdeqWaitTime_;
    }
}
void FragmentProcessor::processAllRegistrations ( ) [private]

Processes all consumer registrations in the registration queue

Definition at line 185 of file FragmentProcessor.cc.

References eventDistributor_, and sharedResources_.

Referenced by processMessages().

{
  RegPtr regPtr;
  RegistrationQueuePtr regQueue =
    sharedResources_->registrationQueue_;
  while ( regQueue->deqNowait( regPtr ) )
  {
    regPtr->registerMe( &eventDistributor_ );
  }
}
bool FragmentProcessor::processMessages ( toolbox::task::WorkLoop *  )

The workloop action processing state machine commands from the command queue and handling I2O messages retrieved from the FragmentQueue

Definition at line 74 of file FragmentProcessor.cc.

References actionIsActive_, alignCSCRings::e, stor::AlarmHandler::ERROR, exception, Exception, processAllCommands(), processAllRegistrations(), processOneFragmentIfPossible(), sharedResources_, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by startWorkLoop().

{
  std::string errorMsg;

  try
  {
    errorMsg = "Failed to process state machine events: ";
    processAllCommands();
    
    errorMsg = "Failed to process consumer registrations: ";
    processAllRegistrations();
    
    errorMsg = "Failed to process an event fragment: ";
    processOneFragmentIfPossible();
  }
  catch(stor::exception::RBLookupFailed &e)
  {
    sharedResources_->alarmHandler_->
      notifySentinel(AlarmHandler::ERROR, e);
  }
  catch(xcept::Exception &e)
  {
    XCEPT_DECLARE_NESTED( stor::exception::FragmentProcessing,
                          sentinelException, errorMsg, e );
    sharedResources_->alarmHandler_->moveToFailedState(sentinelException);
  }
  catch(std::exception &e)
  {
    errorMsg += e.what();
    XCEPT_DECLARE(stor::exception::FragmentProcessing,
      sentinelException, errorMsg);
    sharedResources_->alarmHandler_->moveToFailedState(sentinelException);
  }
  catch(...)
  {
    errorMsg += "Unknown exception";
    XCEPT_DECLARE(stor::exception::FragmentProcessing,
      sentinelException, errorMsg);
    sharedResources_->alarmHandler_->moveToFailedState(sentinelException);
  }

  return actionIsActive_;
}
void FragmentProcessor::processOneFragment ( ) [private]

Process a single fragment. This should only be called if it has already been determined there is a place to put it.

Definition at line 136 of file FragmentProcessor.cc.

References stor::utils::getCurrentTime(), stor::I2OChain::memoryUsed(), sharedResources_, stateMachine_, and timeout_.

Referenced by processOneFragmentIfPossible().

{
  I2OChain fragment;
  FragmentQueuePtr fq = sharedResources_->fragmentQueue_;
  utils::TimePoint_t startTime = utils::getCurrentTime();
  if (fq->deqTimedWait(fragment, timeout_))
    {
      utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
      sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
        addFragmentProcessorIdleSample(elapsedTime);
      sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
        addPoppedFragmentSample(fragment.memoryUsed());

      stateMachine_->getCurrentState().processI2OFragment(fragment);
    }
  else
    {
      utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
      sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
        addFragmentProcessorIdleSample(elapsedTime);

      stateMachine_->getCurrentState().noFragmentToProcess();  
    }
}
void FragmentProcessor::processOneFragmentIfPossible ( ) [private]

Process a single fragment, if there is place to put it.

Definition at line 118 of file FragmentProcessor.cc.

References stor::FragmentStore::addToStaleEventTimes(), eventDistributor_, fragmentStore_, stor::FragmentStore::full(), stor::EventDistributor::full(), stor::utils::getCurrentTime(), processOneFragment(), sharedResources_, stor::utils::sleep(), and timeout_.

Referenced by processMessages().

{
  if (fragmentStore_.full() || eventDistributor_.full()) 
  {
    utils::TimePoint_t startTime = utils::getCurrentTime();

    utils::sleep(timeout_);

    utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
    sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
      addFragmentProcessorIdleSample(elapsedTime);

    fragmentStore_.addToStaleEventTimes(elapsedTime);
  }
  else 
    processOneFragment();
}
void FragmentProcessor::startWorkLoop ( std::string  workloopName)

Create and start the fragment processing workloop

Definition at line 48 of file FragmentProcessor.cc.

References app_, alignCSCRings::e, Exception, stor::utils::getIdentifier(), lumiQueryAPI::msg, processMessages(), processWL_, and AlCaHLTBitMon_QueryRunRegistry::string.

{
  try
    {
      std::string identifier = utils::getIdentifier(app_->getApplicationDescriptor());

      processWL_ = toolbox::task::getWorkLoopFactory()->
        getWorkLoop( identifier + workloopName, "waiting" );

      if ( ! processWL_->isActive() )
        {
          toolbox::task::ActionSignature* processAction = 
            toolbox::task::bind(this, &FragmentProcessor::processMessages, 
                                identifier + "ProcessMessages");
          processWL_->submit(processAction);

          processWL_->activate();
        }
    }
  catch (xcept::Exception& e)
    {
      std::string msg = "Failed to start workloop 'FragmentProcessor' with 'processMessages'.";
    XCEPT_RETHROW(stor::exception::FragmentProcessing, msg, e);
  }
}

Member Data Documentation

Definition at line 92 of file FragmentProcessor.h.

Referenced by processMessages(), and ~FragmentProcessor().

xdaq::Application* stor::FragmentProcessor::app_ [private]

Definition at line 84 of file FragmentProcessor.h.

Referenced by startWorkLoop().

Definition at line 88 of file FragmentProcessor.h.

Referenced by FragmentProcessor(), and processOneFragmentIfPossible().

toolbox::task::WorkLoop* stor::FragmentProcessor::processWL_ [private]

Definition at line 94 of file FragmentProcessor.h.

Referenced by startWorkLoop(), and ~FragmentProcessor().

Definition at line 87 of file FragmentProcessor.h.

Referenced by FragmentProcessor(), processAllCommands(), and processOneFragment().

boost::posix_time::time_duration stor::FragmentProcessor::timeout_ [private]

Definition at line 86 of file FragmentProcessor.h.

Referenced by FragmentProcessor().