CMS 3D CMS Logo

Public Member Functions | Private Types | Private Member Functions | Private Attributes

stor::EventDistributor Class Reference

#include <EventDistributor.h>

List of all members.

Public Member Functions

void addEventToRelevantQueues (I2OChain &)
void checkForStaleConsumers ()
void clearConsumers ()
void clearStreams ()
unsigned int configuredConsumerCount () const
unsigned int configuredStreamCount () const
 EventDistributor (SharedResourcesPtr sr)
const bool full () const
unsigned int initializedConsumerCount () const
unsigned int initializedStreamCount () const
void registerDQMEventConsumer (const DQMEventConsRegPtr)
void registerErrorStreams (const ErrStrConfigListPtr)
void registerEventConsumer (const EventConsRegPtr)
void registerEventStreams (const EvtStrConfigListPtr)
 ~EventDistributor ()

Private Types

typedef std::set< ConsSelPtr,
utils::ptrComp
< EventConsumerSelector > > 
ConsSelList
typedef boost::shared_ptr
< EventConsumerSelector
ConsSelPtr
typedef std::set< DQMEvtSelPtr,
utils::ptrComp
< DQMEventSelector > > 
DQMEvtSelList
typedef boost::shared_ptr
< DQMEventSelector
DQMEvtSelPtr
typedef std::set< ErrSelPtr,
utils::ptrComp
< ErrorStreamSelector > > 
ErrSelList
typedef boost::shared_ptr
< ErrorStreamSelector
ErrSelPtr
typedef std::set< EvtSelPtr,
utils::ptrComp
< EventStreamSelector > > 
EvtSelList
typedef boost::shared_ptr
< EventStreamSelector
EvtSelPtr

Private Member Functions

void tagCompleteEventForQueues (I2OChain &)

Private Attributes

DQMEvtSelList dqmEventSelectors_
ErrSelList errorStreamSelectors_
ConsSelList eventConsumerSelectors_
EvtSelList eventStreamSelectors_
SharedResourcesPtr sharedResources_

Detailed Description

Distributes complete events to appropriate queues

It receives complete events in form of I2OChains and distributes it to the appropriate queues by checking the I2O message type and the trigger bits in the event header.

Author:
mommsen
Revision:
1.8
Date:
2011/03/07 15:31:31

Definition at line 44 of file EventDistributor.h.


Member Typedef Documentation

Definition at line 137 of file EventDistributor.h.

typedef boost::shared_ptr<EventConsumerSelector> stor::EventDistributor::ConsSelPtr [private]

Definition at line 136 of file EventDistributor.h.

Definition at line 129 of file EventDistributor.h.

typedef boost::shared_ptr<DQMEventSelector> stor::EventDistributor::DQMEvtSelPtr [private]

Definition at line 128 of file EventDistributor.h.

Definition at line 133 of file EventDistributor.h.

typedef boost::shared_ptr<ErrorStreamSelector> stor::EventDistributor::ErrSelPtr [private]

Definition at line 132 of file EventDistributor.h.

Definition at line 125 of file EventDistributor.h.

typedef boost::shared_ptr<EventStreamSelector> stor::EventDistributor::EvtSelPtr [private]

Definition at line 124 of file EventDistributor.h.


Constructor & Destructor Documentation

EventDistributor::EventDistributor ( SharedResourcesPtr  sr)

Definition at line 27 of file EventDistributor.cc.

                                                       :
  sharedResources_(sr)
{}
EventDistributor::~EventDistributor ( )

Definition at line 32 of file EventDistributor.cc.

References clearConsumers(), and clearStreams().


Member Function Documentation

void EventDistributor::addEventToRelevantQueues ( I2OChain ioc)

Add the event given as I2OChain to the appropriate queues

Definition at line 38 of file EventDistributor.cc.

References stor::DataSenderMonitorCollection::addFaultyEventSample(), stor::RunMonitorCollection::addUnwantedEvent(), stor::I2OChain::complete(), stor::AlarmHandler::ERROR, Header::EVENT, stor::FragKey::event_, stor::I2OChain::faulty(), stor::I2OChain::faultyBits(), stor::I2OChain::fragmentKey(), stor::I2OChain::hltURL(), I2O_SM_DATA, I2O_SM_ERROR, stor::I2OChain::i2oMessageCode(), stor::I2OChain::isTaggedForAnyEventConsumer(), stor::I2OChain::isTaggedForAnyStream(), stor::I2OChain::messageCode(), runTheMatrix::msg, stor::I2OChain::rbBufferId(), sharedResources_, tagCompleteEventForQueues(), and stor::I2OChain::tagForStream().

Referenced by stor::DrainingQueues::processStaleFragments().

{
  // special handling for faulty or incomplete events
  if ( ioc.faulty() || !ioc.complete() )
  {
    std::ostringstream msg;
    msg << "Faulty or incomplete I2OChain for event " 
      << ioc.fragmentKey().event_
      << ": 0x" << std::hex << ioc.faultyBits()
      << " received from " << ioc.hltURL()
      << " (rbBufferId " << ioc.rbBufferId() << ").";
    XCEPT_DECLARE( stor::exception::IncompleteEventMessage,
      xcept, msg.str());
    sharedResources_->statisticsReporter_->alarmHandler()->
      notifySentinel(AlarmHandler::ERROR, xcept);

    DataSenderMonitorCollection& dataSenderMonColl =
      sharedResources_->statisticsReporter_->getDataSenderMonitorCollection();
    dataSenderMonColl.addFaultyEventSample(ioc);

    if ( !( sharedResources_->configuration_->getDiskWritingParams().faultyEventsStream_.empty() ) &&
      ( ioc.i2oMessageCode() == I2O_SM_DATA || ioc.i2oMessageCode() == I2O_SM_ERROR) )
      ioc.tagForStream(0); // special stream for faulty events
  }
  else
  {
    tagCompleteEventForQueues( ioc );
  }

  // Check if event belongs here at all:
  bool unexpected = true;

  if( ioc.isTaggedForAnyStream() )
  {
    unexpected = false;
    sharedResources_->streamQueue_->enqWait( ioc );
  }
  
  if( ioc.isTaggedForAnyEventConsumer() )
  {
    unexpected = false;
    sharedResources_->eventQueueCollection_->addEvent( ioc );
  }

  if( unexpected && ioc.messageCode() == Header::EVENT )
  {
    RunMonitorCollection& runMonColl =
      sharedResources_->statisticsReporter_->getRunMonitorCollection();
    runMonColl.addUnwantedEvent(ioc);
  }
}
void EventDistributor::checkForStaleConsumers ( )

Updates staleness info for consumers.

Definition at line 361 of file EventDistributor.cc.

References stor::utils::getCurrentTime(), cmsPerfSuiteHarvest::now, and sharedResources_.

{
  utils::TimePoint_t now = utils::getCurrentTime();

  EventQueueCollectionPtr eqc =
    sharedResources_->eventQueueCollection_;
  eqc->clearStaleQueues(now);

  DQMEventQueueCollectionPtr dqc =
    sharedResources_->dqmEventQueueCollection_;
  dqc->clearStaleQueues(now);
}
void EventDistributor::clearConsumers ( )

Clears out all existing consumer registrations.

Definition at line 333 of file EventDistributor.cc.

References dqmEventSelectors_, and eventConsumerSelectors_.

Referenced by stor::Running::do_entryActionWork(), and ~EventDistributor().

void EventDistributor::clearStreams ( )

Clears out all existing event and error streams.

Definition at line 304 of file EventDistributor.cc.

References errorStreamSelectors_, and eventStreamSelectors_.

Referenced by ~EventDistributor().

unsigned int EventDistributor::configuredConsumerCount ( ) const

Returns the number of consumers that have been configured.

Definition at line 340 of file EventDistributor.cc.

References dqmEventSelectors_, and eventConsumerSelectors_.

{
  return eventConsumerSelectors_.size() + dqmEventSelectors_.size();
}
unsigned int EventDistributor::configuredStreamCount ( ) const

Returns the number of streams that have been configured.

Definition at line 311 of file EventDistributor.cc.

References errorStreamSelectors_, and eventStreamSelectors_.

{
  return eventStreamSelectors_.size() +
    errorStreamSelectors_.size();
}
const bool EventDistributor::full ( ) const

Returns true if no further events can be processed, e.g. the StreamQueue is full

Definition at line 239 of file EventDistributor.cc.

References sharedResources_.

Referenced by stor::DrainingQueues::allQueuesAndWorkersAreEmpty(), stor::FragmentProcessor::processOneFragmentIfPossible(), and stor::DrainingQueues::processStaleFragments().

{
  return sharedResources_->streamQueue_->full();
}
unsigned int EventDistributor::initializedConsumerCount ( ) const

Returns the number of consumers that have been configured and initialized.

Definition at line 346 of file EventDistributor.cc.

References cmsDriverOptions::counter, and eventConsumerSelectors_.

{
  unsigned int counter = 0;
  for (ConsSelList::const_iterator it = eventConsumerSelectors_.begin(),
         itEnd = eventConsumerSelectors_.end();
       it != itEnd;
       ++it)
  {
    if ( (*it)->isInitialized() )
      ++counter;
  }
  return counter;
}
unsigned int EventDistributor::initializedStreamCount ( ) const

Returns the number of streams that have been configured and initialized.

Definition at line 318 of file EventDistributor.cc.

References cmsDriverOptions::counter, and eventStreamSelectors_.

{
  unsigned int counter = 0;
  for (EvtSelList::const_iterator it = eventStreamSelectors_.begin(),
         itEnd = eventStreamSelectors_.end();
       it != itEnd;
       ++it)
  {
    if ( (*it)->isInitialized() )
      ++counter;
  }
  return counter;
}
void EventDistributor::registerDQMEventConsumer ( const DQMEventConsRegPtr  ptr)

Registers a new DQM consumer

Definition at line 274 of file EventDistributor.cc.

References dqmEventSelectors_.

Referenced by stor::DQMEventConsumerRegistrationInfo::do_registerMe().

{
  DQMEvtSelPtr dqmEvtSel( new DQMEventSelector(ptr) );
  dqmEventSelectors_.insert( dqmEvtSel );
}
void EventDistributor::registerErrorStreams ( const ErrStrConfigListPtr  cl)

Registers the full set of error event streams.

Definition at line 292 of file EventDistributor.cc.

References errorStreamSelectors_.

Referenced by stor::Running::do_entryActionWork().

{
  for( ErrStrConfigList::const_iterator it = cl->begin(), itEnd = cl->end();
       it != itEnd;
       ++it )
  {
    ErrSelPtr errSel( new ErrorStreamSelector(*it) );
    errorStreamSelectors_.insert( errSel );
  }
}
void EventDistributor::registerEventConsumer ( const EventConsRegPtr  regPtr)

Registers a new consumer

Definition at line 246 of file EventDistributor.cc.

References stor::AlarmHandler::ERROR.

Referenced by stor::EventConsumerRegistrationInfo::do_registerMe().

{
  ConsSelPtr evtSel( new EventConsumerSelector(regPtr) );

  InitMsgSharedPtr initMsgPtr =
    sharedResources_->initMsgCollection_->getElementForOutputModule(
      regPtr->outputModuleLabel()
    );
  if ( initMsgPtr.get() != 0 )
  {
    uint8* initPtr = &(*initMsgPtr)[0];
    InitMsgView initView(initPtr);
    try
    {
      evtSel->initialize( initView );
    }
    catch( stor::exception::InvalidEventSelection& e )
    {
      sharedResources_->statisticsReporter_->alarmHandler()->
        notifySentinel(AlarmHandler::ERROR, e);
    }
  }
  
  eventConsumerSelectors_.insert( evtSel );
}
void EventDistributor::registerEventStreams ( const EvtStrConfigListPtr  cl)

Registers the full set of event streams.

Definition at line 280 of file EventDistributor.cc.

References eventStreamSelectors_.

Referenced by stor::Running::do_entryActionWork().

{
  for( EvtStrConfigList::const_iterator it = cl->begin(), itEnd = cl->end();
       it != itEnd;
       ++it )
  {
    EvtSelPtr evtSel( new EventStreamSelector(*it) );
    eventStreamSelectors_.insert( evtSel );
  }
}
void EventDistributor::tagCompleteEventForQueues ( I2OChain ioc) [private]

Definition at line 90 of file EventDistributor.cc.

References stor::DataSenderMonitorCollection::addDQMEventSample(), stor::DataSenderMonitorCollection::addErrorEventSample(), stor::DataSenderMonitorCollection::addEventSample(), stor::DataSenderMonitorCollection::addFaultyEventSample(), stor::DataSenderMonitorCollection::addInitSample(), stor::MonitoredQuantity::addSample(), stor::MonitoredQuantity::addSampleIfLarger(), b, stor::I2OChain::copyFragmentsIntoBuffer(), Header::DQM_EVENT, dqmEventSelectors_, stor::AlarmHandler::ERROR, Header::ERROR_EVENT, errorStreamSelectors_, Header::EVENT, eventConsumerSelectors_, stor::I2OChain::eventNumber(), eventStreamSelectors_, stor::utils::getCurrentTime(), stor::RunMonitorCollection::getErrorEventIDsReceivedMQ(), stor::RunMonitorCollection::getEventIDsReceivedMQ(), stor::RunMonitorCollection::getLumiSectionsSeenMQ(), stor::RunMonitorCollection::getRunNumbersSeenMQ(), Header::INIT, stor::EventConsumerSelector::initialize(), stor::EventStreamSelector::initialize(), stor::I2OChain::isTaggedForAnyDQMEventConsumer(), stor::I2OChain::lumiSection(), stor::I2OChain::messageCode(), runTheMatrix::msg, cmsPerfSuiteHarvest::now, stor::I2OChain::runNumber(), sharedResources_, stor::I2OChain::tagForDQMEventConsumer(), stor::I2OChain::tagForEventConsumer(), and stor::I2OChain::tagForStream().

Referenced by addEventToRelevantQueues().

{
  switch( ioc.messageCode() )
  {
    
    case Header::INIT:
    {
      std::vector<unsigned char> b;
      ioc.copyFragmentsIntoBuffer(b);
      InitMsgView imv( &b[0] );
      if( sharedResources_->initMsgCollection_->addIfUnique( imv ) )
      {
        try
        {
          for_each(eventStreamSelectors_.begin(),eventStreamSelectors_.end(),
            boost::bind(&EventStreamSelector::initialize, _1, imv));

          for_each(eventConsumerSelectors_.begin(), eventConsumerSelectors_.end(),
            boost::bind(&EventConsumerSelector::initialize, _1, imv));
        }
        catch( stor::exception::InvalidEventSelection& e )
        {
          sharedResources_->statisticsReporter_->alarmHandler()->
            notifySentinel(AlarmHandler::ERROR,e);
        }
      }
      
      DataSenderMonitorCollection& dataSenderMonColl = sharedResources_->
        statisticsReporter_->getDataSenderMonitorCollection();
      dataSenderMonColl.addInitSample(ioc);
      
      break;
    }
    
    case Header::EVENT:
    {
      for( EvtSelList::iterator it = eventStreamSelectors_.begin(),
             itEnd = eventStreamSelectors_.end();
           it != itEnd;
           ++it )
      {
        if( (*it)->acceptEvent( ioc ) )
        {
          ioc.tagForStream( (*it)->configInfo().streamId() );
        }
      }
      for( ConsSelList::iterator it = eventConsumerSelectors_.begin(),
             itEnd = eventConsumerSelectors_.end();
           it != itEnd;
           ++it )
      {
        if( (*it)->acceptEvent( ioc ) )
        {
          ioc.tagForEventConsumer( (*it)->queueId() );
        }
      }
      
      RunMonitorCollection& runMonCollection = sharedResources_->
        statisticsReporter_->getRunMonitorCollection();
      runMonCollection.getRunNumbersSeenMQ().addSampleIfLarger(ioc.runNumber());
      runMonCollection.getLumiSectionsSeenMQ().addSampleIfLarger(ioc.lumiSection());
      runMonCollection.getEventIDsReceivedMQ().addSample(ioc.eventNumber());
      
      DataSenderMonitorCollection& dataSenderMonColl = sharedResources_->
        statisticsReporter_->getDataSenderMonitorCollection();
      dataSenderMonColl.addEventSample(ioc);

      break;
    }
    
    case Header::DQM_EVENT:
    {
      utils::TimePoint_t now = utils::getCurrentTime();

      for( DQMEvtSelList::iterator it = dqmEventSelectors_.begin(),
             itEnd = dqmEventSelectors_.end();
           it != itEnd;
           ++it)
      {
        if( (*it)->acceptEvent( ioc, now ) )
        {
          ioc.tagForDQMEventConsumer( (*it)->queueId() );
        }
      }
      
      DataSenderMonitorCollection& dataSenderMonColl = sharedResources_->
        statisticsReporter_->getDataSenderMonitorCollection();
      dataSenderMonColl.addDQMEventSample(ioc);

      if( ioc.isTaggedForAnyDQMEventConsumer() )
      {
        sharedResources_->dqmEventQueue_->enqNowait( ioc );
      }
      else
      {
        sharedResources_->statisticsReporter_->getDQMEventMonitorCollection().
          getDroppedDQMEventCountsMQ().addSample(1);
      }
      
      break;
    }
    
    case Header::ERROR_EVENT:
    {
      for( ErrSelList::iterator it = errorStreamSelectors_.begin(),
             itEnd = errorStreamSelectors_.end();
           it != itEnd;
           ++it )
      {
        if( (*it)->acceptEvent( ioc ) )
        {
          ioc.tagForStream( (*it)->configInfo().streamId() );
        }
      }
      
      RunMonitorCollection& runMonCollection = sharedResources_->
        statisticsReporter_->getRunMonitorCollection();
      runMonCollection.getRunNumbersSeenMQ().addSample(ioc.runNumber());
      runMonCollection.getLumiSectionsSeenMQ().addSampleIfLarger(ioc.lumiSection());
      runMonCollection.getErrorEventIDsReceivedMQ().addSample(ioc.eventNumber());
      
      DataSenderMonitorCollection& dataSenderMonColl = sharedResources_->
        statisticsReporter_->getDataSenderMonitorCollection();
      dataSenderMonColl.addErrorEventSample(ioc);

      break;
    }
    
    default:
    {
      std::ostringstream msg;
      msg << "I2OChain with unknown message type " <<
        ioc.messageCode();
      XCEPT_DECLARE( stor::exception::WrongI2OMessageType,
                     xcept, msg.str());
      sharedResources_->statisticsReporter_->
        alarmHandler()->notifySentinel(AlarmHandler::ERROR, xcept);

      // 24-Jun-2009, KAB - this is not really the best way to track this,
      // but it's probably better than nothing in the short term.
      DataSenderMonitorCollection& dataSenderMonColl = sharedResources_->
        statisticsReporter_->getDataSenderMonitorCollection();
      dataSenderMonColl.addFaultyEventSample(ioc);

      break;
    }
  }
}

Member Data Documentation