CMS 3D CMS Logo

Public Member Functions | Private Member Functions | Private Attributes

stor::DQMEventProcessor Class Reference

#include <DQMEventProcessor.h>

List of all members.

Public Member Functions

 DQMEventProcessor (xdaq::Application *, SharedResourcesPtr sr)
bool processDQMEvents (toolbox::task::WorkLoop *)
void startWorkLoop (std::string workloopName)
 ~DQMEventProcessor ()

Private Member Functions

 DQMEventProcessor (DQMEventProcessor const &)
void endOfRun ()
DQMEventProcessoroperator= (DQMEventProcessor const &)
void processNextDQMEvent ()

Private Attributes

bool actionIsActive_
xdaq::Application * app_
unsigned int discardDQMUpdatesForOlderLS_
DQMEventStore< I2OChain,
InitMsgCollection,
SharedResources
dqmEventStore_
uint32_t latestLumiSection_
toolbox::task::WorkLoop * processWL_
SharedResourcesPtr sharedResources_
boost::posix_time::time_duration timeout_

Detailed Description

Processes the DQM event (histograms)

It retrieves the next DQM event from the DQMEventQueue, adds up the histograms belonging to one lumi-section, and puts it into the appropriate DQMConsumerQueues. Depending on the configuration, it also writes the histograms to disk every N lumi-sections.

Author:
mommsen
Revision:
1.7
Date:
2011/04/19 16:01:55

Definition at line 38 of file DQMEventProcessor.h.


Constructor & Destructor Documentation

stor::DQMEventProcessor::DQMEventProcessor ( xdaq::Application *  app,
SharedResourcesPtr  sr 
)

Definition at line 34 of file DQMEventProcessor.cc.

References stor::WorkerThreadParams::DQMEPdeqWaitTime_, sharedResources_, and timeout_.

                                                                                  :
  app_(app),
  sharedResources_(sr),
  actionIsActive_(true),
  latestLumiSection_(0),
  discardDQMUpdatesForOlderLS_(0),
  dqmEventStore_
  (
    app->getApplicationDescriptor(),
    sr->dqmEventQueueCollection_,
    sr->statisticsReporter_->getDQMEventMonitorCollection(),
    sr->initMsgCollection_.get(),
    &stor::InitMsgCollection::maxMsgCount,
    sr.get(),
    &stor::SharedResources::moveToFailedState,
    sr->statisticsReporter_->alarmHandler()
  )
  {
    WorkerThreadParams workerParams =
      sharedResources_->configuration_->getWorkerThreadParams();
    timeout_ = workerParams.DQMEPdeqWaitTime_;
  }
stor::DQMEventProcessor::~DQMEventProcessor ( )

Definition at line 58 of file DQMEventProcessor.cc.

References actionIsActive_, and processWL_.

  {
    // Stop the activity
    actionIsActive_ = false;
    
    // Cancel the workloop (will wait until the action has finished)
    processWL_->cancel();
  }
stor::DQMEventProcessor::DQMEventProcessor ( DQMEventProcessor const &  ) [private]

Member Function Documentation

void stor::DQMEventProcessor::endOfRun ( ) [private]

Purge instances and process all completed DQM records

Definition at line 191 of file DQMEventProcessor.cc.

References dqmEventStore_, latestLumiSection_, and stor::DQMEventStore< EventType, ConnectionType, StateMachineType >::purge().

Referenced by processNextDQMEvent().

DQMEventProcessor& stor::DQMEventProcessor::operator= ( DQMEventProcessor const &  ) [private]
bool stor::DQMEventProcessor::processDQMEvents ( toolbox::task::WorkLoop *  )

The workloop action taking the next DQM event from the DQMEventQueue, processes it, and puts it into the appropriate DQMConsumerQueues when the lumi-section has finished.

Definition at line 95 of file DQMEventProcessor.cc.

References actionIsActive_, exception, Exception, processNextDQMEvent(), and sharedResources_.

Referenced by startWorkLoop().

  {
    std::string errorMsg = "Failed to process a DQM event: ";
    
    try
    {
      processNextDQMEvent();
    }
    catch(xcept::Exception &e)
    {
      XCEPT_DECLARE_NESTED( stor::exception::DQMEventProcessing,
        sentinelException, errorMsg, e );
      sharedResources_->moveToFailedState(sentinelException);
    }
    catch(std::exception &e)
    {
      errorMsg += e.what();
      XCEPT_DECLARE( stor::exception::DQMEventProcessing,
        sentinelException, errorMsg );
      sharedResources_->moveToFailedState(sentinelException);
    }
    catch(...)
    {
      errorMsg += "Unknown exception";
      XCEPT_DECLARE( stor::exception::DQMEventProcessing,
        sentinelException, errorMsg );
      sharedResources_->moveToFailedState(sentinelException);
    }
    
    return actionIsActive_;
  }
void stor::DQMEventProcessor::processNextDQMEvent ( ) [private]

Pops the next DQM event from the DQMEventQueue and adds it to the DQMStore

Definition at line 128 of file DQMEventProcessor.cc.

References stor::DQMEventStore< EventType, ConnectionType, StateMachineType >::addDQMEvent(), stor::DQMEventStore< EventType, ConnectionType, StateMachineType >::clear(), stor::DQMEventProcessorResources::Requests::configuration, stor::DQMProcessingParams::discardDQMUpdatesForOlderLS_, discardDQMUpdatesForOlderLS_, dqmEventStore_, endOfRun(), stor::DQMEventProcessorResources::Requests::endOfRun, python::Vispa::Plugins::EdmBrowser::EdmDataAccessor::eq(), stor::utils::getCurrentTime(), latestLumiSection_, max(), stor::DQMEventStore< EventType, ConnectionType, StateMachineType >::setParameters(), sharedResources_, stor::DQMEventProcessorResources::Requests::storeDestruction, and timeout_.

Referenced by processDQMEvents().

  {
    DQMEventQueue::ValueType dqmEvent;
    DQMEventQueuePtr eq = sharedResources_->dqmEventQueue_;
    utils::TimePoint_t startTime = utils::getCurrentTime();
    if (eq->deqTimedWait(dqmEvent, timeout_))
    {
      utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
      sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
        addDQMEventProcessorIdleSample(elapsedTime);

      if (
        (discardDQMUpdatesForOlderLS_ > 0) &&
        (dqmEvent.first.lumiSection() + discardDQMUpdatesForOlderLS_ < latestLumiSection_)
      )
        // subtracting unsigned quantities might not yield the right result!
      {
        // discard very old LS
        sharedResources_->statisticsReporter_->getDQMEventMonitorCollection().
          getDroppedDQMEventCountsMQ().addSample(dqmEvent.second + 1);        
      }
      else
      {
        sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
          addPoppedDQMEventSample(dqmEvent.first.memoryUsed());
        sharedResources_->statisticsReporter_->getDQMEventMonitorCollection().
          getDroppedDQMEventCountsMQ().addSample(dqmEvent.second);
        
        latestLumiSection_ = std::max(latestLumiSection_, dqmEvent.first.lumiSection());
        dqmEventStore_.addDQMEvent(dqmEvent.first);
      }
    }
    else
    {
      utils::Duration_t elapsedTime = utils::getCurrentTime() - startTime;
      sharedResources_->statisticsReporter_->getThroughputMonitorCollection().
        addDQMEventProcessorIdleSample(elapsedTime);
    }
    
    DQMEventProcessorResources::Requests requests;
    DQMProcessingParams dqmParams;
    boost::posix_time::time_duration newTimeoutValue;
    if (sharedResources_->dqmEventProcessorResources_->
      getRequests(requests, dqmParams, newTimeoutValue))
    {
      if (requests.configuration)
      {
        timeout_ = newTimeoutValue;
        dqmEventStore_.setParameters(dqmParams);
        discardDQMUpdatesForOlderLS_ = dqmParams.discardDQMUpdatesForOlderLS_;
      }
      if (requests.endOfRun)
      {
        endOfRun();
      }
      if (requests.storeDestruction)
      {
        dqmEventStore_.clear();
      }
      sharedResources_->dqmEventProcessorResources_->requestsDone();
    }
  }
void stor::DQMEventProcessor::startWorkLoop ( std::string  workloopName)

Creates and starts the DQM event processing workloop

Definition at line 68 of file DQMEventProcessor.cc.

References app_, Exception, stor::utils::getIdentifier(), runTheMatrix::msg, processDQMEvents(), and processWL_.

  {
    try
    {
      std::string identifier = utils::getIdentifier(app_->getApplicationDescriptor());
      
      processWL_ = toolbox::task::getWorkLoopFactory()->
      getWorkLoop( identifier + workloopName, "waiting" );
      
      if ( ! processWL_->isActive() )
      {
        toolbox::task::ActionSignature* processAction = 
          toolbox::task::bind(this, &DQMEventProcessor::processDQMEvents,
          identifier + "ProcessNextDQMEvent");
        processWL_->submit(processAction);
        
        processWL_->activate();
      }
    }
    catch (xcept::Exception& e)
    {
      std::string msg = "Failed to start workloop 'DQMEventProcessor' with 'processNextDQMEvent'.";
      XCEPT_RETHROW(stor::exception::DQMEventProcessing, msg, e);
    }
  }

Member Data Documentation

Definition at line 82 of file DQMEventProcessor.h.

Referenced by processDQMEvents(), and ~DQMEventProcessor().

xdaq::Application* stor::DQMEventProcessor::app_ [private]

Definition at line 78 of file DQMEventProcessor.h.

Referenced by startWorkLoop().

Definition at line 84 of file DQMEventProcessor.h.

Referenced by processNextDQMEvent().

Definition at line 88 of file DQMEventProcessor.h.

Referenced by endOfRun(), and processNextDQMEvent().

Definition at line 83 of file DQMEventProcessor.h.

Referenced by endOfRun(), and processNextDQMEvent().

toolbox::task::WorkLoop* stor::DQMEventProcessor::processWL_ [private]

Definition at line 86 of file DQMEventProcessor.h.

Referenced by startWorkLoop(), and ~DQMEventProcessor().

Definition at line 79 of file DQMEventProcessor.h.

Referenced by DQMEventProcessor(), processDQMEvents(), and processNextDQMEvent().

boost::posix_time::time_duration stor::DQMEventProcessor::timeout_ [private]

Definition at line 81 of file DQMEventProcessor.h.

Referenced by DQMEventProcessor(), and processNextDQMEvent().