#include <EventDistributor.h>
Distributes complete events to appropriate queues
It receives complete events in form of I2OChains and distributes it to the appropriate queues by checking the I2O message type and the trigger bits in the event header.
Definition at line 44 of file EventDistributor.h.
typedef std::set<ConsSelPtr, utils::ptrComp<EventConsumerSelector> > stor::EventDistributor::ConsSelList [private] |
Definition at line 137 of file EventDistributor.h.
typedef boost::shared_ptr<EventConsumerSelector> stor::EventDistributor::ConsSelPtr [private] |
Definition at line 136 of file EventDistributor.h.
typedef std::set<DQMEvtSelPtr, utils::ptrComp<DQMEventSelector> > stor::EventDistributor::DQMEvtSelList [private] |
Definition at line 129 of file EventDistributor.h.
typedef boost::shared_ptr<DQMEventSelector> stor::EventDistributor::DQMEvtSelPtr [private] |
Definition at line 128 of file EventDistributor.h.
typedef std::set<ErrSelPtr, utils::ptrComp<ErrorStreamSelector> > stor::EventDistributor::ErrSelList [private] |
Definition at line 133 of file EventDistributor.h.
typedef boost::shared_ptr<ErrorStreamSelector> stor::EventDistributor::ErrSelPtr [private] |
Definition at line 132 of file EventDistributor.h.
typedef std::set<EvtSelPtr, utils::ptrComp<EventStreamSelector> > stor::EventDistributor::EvtSelList [private] |
Definition at line 125 of file EventDistributor.h.
typedef boost::shared_ptr<EventStreamSelector> stor::EventDistributor::EvtSelPtr [private] |
Definition at line 124 of file EventDistributor.h.
EventDistributor::EventDistributor | ( | SharedResourcesPtr | sr | ) |
Definition at line 27 of file EventDistributor.cc.
: sharedResources_(sr) {}
EventDistributor::~EventDistributor | ( | ) |
Definition at line 32 of file EventDistributor.cc.
References clearConsumers(), and clearStreams().
{ clearStreams(); clearConsumers(); }
void EventDistributor::addEventToRelevantQueues | ( | I2OChain & | ioc | ) |
Add the event given as I2OChain to the appropriate queues
Definition at line 38 of file EventDistributor.cc.
References stor::DataSenderMonitorCollection::addFaultyEventSample(), stor::RunMonitorCollection::addUnwantedEvent(), stor::I2OChain::complete(), stor::AlarmHandler::ERROR, Header::EVENT, stor::FragKey::event_, stor::I2OChain::faulty(), stor::I2OChain::faultyBits(), stor::I2OChain::fragmentKey(), stor::I2OChain::hltURL(), I2O_SM_DATA, I2O_SM_ERROR, stor::I2OChain::i2oMessageCode(), stor::I2OChain::isTaggedForAnyEventConsumer(), stor::I2OChain::isTaggedForAnyStream(), stor::I2OChain::messageCode(), runTheMatrix::msg, stor::I2OChain::rbBufferId(), sharedResources_, tagCompleteEventForQueues(), and stor::I2OChain::tagForStream().
Referenced by stor::DrainingQueues::processStaleFragments().
{ // special handling for faulty or incomplete events if ( ioc.faulty() || !ioc.complete() ) { std::ostringstream msg; msg << "Faulty or incomplete I2OChain for event " << ioc.fragmentKey().event_ << ": 0x" << std::hex << ioc.faultyBits() << " received from " << ioc.hltURL() << " (rbBufferId " << ioc.rbBufferId() << ")."; XCEPT_DECLARE( stor::exception::IncompleteEventMessage, xcept, msg.str()); sharedResources_->statisticsReporter_->alarmHandler()-> notifySentinel(AlarmHandler::ERROR, xcept); DataSenderMonitorCollection& dataSenderMonColl = sharedResources_->statisticsReporter_->getDataSenderMonitorCollection(); dataSenderMonColl.addFaultyEventSample(ioc); if ( !( sharedResources_->configuration_->getDiskWritingParams().faultyEventsStream_.empty() ) && ( ioc.i2oMessageCode() == I2O_SM_DATA || ioc.i2oMessageCode() == I2O_SM_ERROR) ) ioc.tagForStream(0); // special stream for faulty events } else { tagCompleteEventForQueues( ioc ); } // Check if event belongs here at all: bool unexpected = true; if( ioc.isTaggedForAnyStream() ) { unexpected = false; sharedResources_->streamQueue_->enqWait( ioc ); } if( ioc.isTaggedForAnyEventConsumer() ) { unexpected = false; sharedResources_->eventQueueCollection_->addEvent( ioc ); } if( unexpected && ioc.messageCode() == Header::EVENT ) { RunMonitorCollection& runMonColl = sharedResources_->statisticsReporter_->getRunMonitorCollection(); runMonColl.addUnwantedEvent(ioc); } }
void EventDistributor::checkForStaleConsumers | ( | ) |
Updates staleness info for consumers.
Definition at line 361 of file EventDistributor.cc.
References stor::utils::getCurrentTime(), cmsPerfSuiteHarvest::now, and sharedResources_.
{ utils::TimePoint_t now = utils::getCurrentTime(); EventQueueCollectionPtr eqc = sharedResources_->eventQueueCollection_; eqc->clearStaleQueues(now); DQMEventQueueCollectionPtr dqc = sharedResources_->dqmEventQueueCollection_; dqc->clearStaleQueues(now); }
void EventDistributor::clearConsumers | ( | ) |
Clears out all existing consumer registrations.
Definition at line 333 of file EventDistributor.cc.
References dqmEventSelectors_, and eventConsumerSelectors_.
Referenced by stor::Running::do_entryActionWork(), and ~EventDistributor().
{ eventConsumerSelectors_.clear(); dqmEventSelectors_.clear(); }
void EventDistributor::clearStreams | ( | ) |
Clears out all existing event and error streams.
Definition at line 304 of file EventDistributor.cc.
References errorStreamSelectors_, and eventStreamSelectors_.
Referenced by ~EventDistributor().
{ eventStreamSelectors_.clear(); errorStreamSelectors_.clear(); }
unsigned int EventDistributor::configuredConsumerCount | ( | ) | const |
Returns the number of consumers that have been configured.
Definition at line 340 of file EventDistributor.cc.
References dqmEventSelectors_, and eventConsumerSelectors_.
{ return eventConsumerSelectors_.size() + dqmEventSelectors_.size(); }
unsigned int EventDistributor::configuredStreamCount | ( | ) | const |
Returns the number of streams that have been configured.
Definition at line 311 of file EventDistributor.cc.
References errorStreamSelectors_, and eventStreamSelectors_.
{ return eventStreamSelectors_.size() + errorStreamSelectors_.size(); }
const bool EventDistributor::full | ( | ) | const |
Returns true if no further events can be processed, e.g. the StreamQueue is full
Definition at line 239 of file EventDistributor.cc.
References sharedResources_.
Referenced by stor::DrainingQueues::allQueuesAndWorkersAreEmpty(), stor::FragmentProcessor::processOneFragmentIfPossible(), and stor::DrainingQueues::processStaleFragments().
{ return sharedResources_->streamQueue_->full(); }
unsigned int EventDistributor::initializedConsumerCount | ( | ) | const |
Returns the number of consumers that have been configured and initialized.
Definition at line 346 of file EventDistributor.cc.
References cmsDriverOptions::counter, and eventConsumerSelectors_.
{ unsigned int counter = 0; for (ConsSelList::const_iterator it = eventConsumerSelectors_.begin(), itEnd = eventConsumerSelectors_.end(); it != itEnd; ++it) { if ( (*it)->isInitialized() ) ++counter; } return counter; }
unsigned int EventDistributor::initializedStreamCount | ( | ) | const |
Returns the number of streams that have been configured and initialized.
Definition at line 318 of file EventDistributor.cc.
References cmsDriverOptions::counter, and eventStreamSelectors_.
{ unsigned int counter = 0; for (EvtSelList::const_iterator it = eventStreamSelectors_.begin(), itEnd = eventStreamSelectors_.end(); it != itEnd; ++it) { if ( (*it)->isInitialized() ) ++counter; } return counter; }
void EventDistributor::registerDQMEventConsumer | ( | const DQMEventConsRegPtr | ptr | ) |
Registers a new DQM consumer
Definition at line 274 of file EventDistributor.cc.
References dqmEventSelectors_.
Referenced by stor::DQMEventConsumerRegistrationInfo::do_registerMe().
{ DQMEvtSelPtr dqmEvtSel( new DQMEventSelector(ptr) ); dqmEventSelectors_.insert( dqmEvtSel ); }
void EventDistributor::registerErrorStreams | ( | const ErrStrConfigListPtr | cl | ) |
Registers the full set of error event streams.
Definition at line 292 of file EventDistributor.cc.
References errorStreamSelectors_.
Referenced by stor::Running::do_entryActionWork().
{ for( ErrStrConfigList::const_iterator it = cl->begin(), itEnd = cl->end(); it != itEnd; ++it ) { ErrSelPtr errSel( new ErrorStreamSelector(*it) ); errorStreamSelectors_.insert( errSel ); } }
void EventDistributor::registerEventConsumer | ( | const EventConsRegPtr | regPtr | ) |
Registers a new consumer
Definition at line 246 of file EventDistributor.cc.
References stor::AlarmHandler::ERROR.
Referenced by stor::EventConsumerRegistrationInfo::do_registerMe().
{ ConsSelPtr evtSel( new EventConsumerSelector(regPtr) ); InitMsgSharedPtr initMsgPtr = sharedResources_->initMsgCollection_->getElementForOutputModule( regPtr->outputModuleLabel() ); if ( initMsgPtr.get() != 0 ) { uint8* initPtr = &(*initMsgPtr)[0]; InitMsgView initView(initPtr); try { evtSel->initialize( initView ); } catch( stor::exception::InvalidEventSelection& e ) { sharedResources_->statisticsReporter_->alarmHandler()-> notifySentinel(AlarmHandler::ERROR, e); } } eventConsumerSelectors_.insert( evtSel ); }
void EventDistributor::registerEventStreams | ( | const EvtStrConfigListPtr | cl | ) |
Registers the full set of event streams.
Definition at line 280 of file EventDistributor.cc.
References eventStreamSelectors_.
Referenced by stor::Running::do_entryActionWork().
{ for( EvtStrConfigList::const_iterator it = cl->begin(), itEnd = cl->end(); it != itEnd; ++it ) { EvtSelPtr evtSel( new EventStreamSelector(*it) ); eventStreamSelectors_.insert( evtSel ); } }
void EventDistributor::tagCompleteEventForQueues | ( | I2OChain & | ioc | ) | [private] |
Definition at line 90 of file EventDistributor.cc.
References stor::DataSenderMonitorCollection::addDQMEventSample(), stor::DataSenderMonitorCollection::addErrorEventSample(), stor::DataSenderMonitorCollection::addEventSample(), stor::DataSenderMonitorCollection::addFaultyEventSample(), stor::DataSenderMonitorCollection::addInitSample(), stor::MonitoredQuantity::addSample(), stor::MonitoredQuantity::addSampleIfLarger(), b, stor::I2OChain::copyFragmentsIntoBuffer(), Header::DQM_EVENT, dqmEventSelectors_, stor::AlarmHandler::ERROR, Header::ERROR_EVENT, errorStreamSelectors_, Header::EVENT, eventConsumerSelectors_, stor::I2OChain::eventNumber(), eventStreamSelectors_, stor::utils::getCurrentTime(), stor::RunMonitorCollection::getErrorEventIDsReceivedMQ(), stor::RunMonitorCollection::getEventIDsReceivedMQ(), stor::RunMonitorCollection::getLumiSectionsSeenMQ(), stor::RunMonitorCollection::getRunNumbersSeenMQ(), Header::INIT, stor::EventConsumerSelector::initialize(), stor::EventStreamSelector::initialize(), stor::I2OChain::isTaggedForAnyDQMEventConsumer(), stor::I2OChain::lumiSection(), stor::I2OChain::messageCode(), runTheMatrix::msg, cmsPerfSuiteHarvest::now, stor::I2OChain::runNumber(), sharedResources_, stor::I2OChain::tagForDQMEventConsumer(), stor::I2OChain::tagForEventConsumer(), and stor::I2OChain::tagForStream().
Referenced by addEventToRelevantQueues().
{ switch( ioc.messageCode() ) { case Header::INIT: { std::vector<unsigned char> b; ioc.copyFragmentsIntoBuffer(b); InitMsgView imv( &b[0] ); if( sharedResources_->initMsgCollection_->addIfUnique( imv ) ) { try { for_each(eventStreamSelectors_.begin(),eventStreamSelectors_.end(), boost::bind(&EventStreamSelector::initialize, _1, imv)); for_each(eventConsumerSelectors_.begin(), eventConsumerSelectors_.end(), boost::bind(&EventConsumerSelector::initialize, _1, imv)); } catch( stor::exception::InvalidEventSelection& e ) { sharedResources_->statisticsReporter_->alarmHandler()-> notifySentinel(AlarmHandler::ERROR,e); } } DataSenderMonitorCollection& dataSenderMonColl = sharedResources_-> statisticsReporter_->getDataSenderMonitorCollection(); dataSenderMonColl.addInitSample(ioc); break; } case Header::EVENT: { for( EvtSelList::iterator it = eventStreamSelectors_.begin(), itEnd = eventStreamSelectors_.end(); it != itEnd; ++it ) { if( (*it)->acceptEvent( ioc ) ) { ioc.tagForStream( (*it)->configInfo().streamId() ); } } for( ConsSelList::iterator it = eventConsumerSelectors_.begin(), itEnd = eventConsumerSelectors_.end(); it != itEnd; ++it ) { if( (*it)->acceptEvent( ioc ) ) { ioc.tagForEventConsumer( (*it)->queueId() ); } } RunMonitorCollection& runMonCollection = sharedResources_-> statisticsReporter_->getRunMonitorCollection(); runMonCollection.getRunNumbersSeenMQ().addSampleIfLarger(ioc.runNumber()); runMonCollection.getLumiSectionsSeenMQ().addSampleIfLarger(ioc.lumiSection()); runMonCollection.getEventIDsReceivedMQ().addSample(ioc.eventNumber()); DataSenderMonitorCollection& dataSenderMonColl = sharedResources_-> statisticsReporter_->getDataSenderMonitorCollection(); dataSenderMonColl.addEventSample(ioc); break; } case Header::DQM_EVENT: { utils::TimePoint_t now = utils::getCurrentTime(); for( DQMEvtSelList::iterator it = dqmEventSelectors_.begin(), itEnd = dqmEventSelectors_.end(); it != itEnd; ++it) { if( (*it)->acceptEvent( ioc, now ) ) { ioc.tagForDQMEventConsumer( (*it)->queueId() ); } } DataSenderMonitorCollection& dataSenderMonColl = sharedResources_-> statisticsReporter_->getDataSenderMonitorCollection(); dataSenderMonColl.addDQMEventSample(ioc); if( ioc.isTaggedForAnyDQMEventConsumer() ) { sharedResources_->dqmEventQueue_->enqNowait( ioc ); } else { sharedResources_->statisticsReporter_->getDQMEventMonitorCollection(). getDroppedDQMEventCountsMQ().addSample(1); } break; } case Header::ERROR_EVENT: { for( ErrSelList::iterator it = errorStreamSelectors_.begin(), itEnd = errorStreamSelectors_.end(); it != itEnd; ++it ) { if( (*it)->acceptEvent( ioc ) ) { ioc.tagForStream( (*it)->configInfo().streamId() ); } } RunMonitorCollection& runMonCollection = sharedResources_-> statisticsReporter_->getRunMonitorCollection(); runMonCollection.getRunNumbersSeenMQ().addSample(ioc.runNumber()); runMonCollection.getLumiSectionsSeenMQ().addSampleIfLarger(ioc.lumiSection()); runMonCollection.getErrorEventIDsReceivedMQ().addSample(ioc.eventNumber()); DataSenderMonitorCollection& dataSenderMonColl = sharedResources_-> statisticsReporter_->getDataSenderMonitorCollection(); dataSenderMonColl.addErrorEventSample(ioc); break; } default: { std::ostringstream msg; msg << "I2OChain with unknown message type " << ioc.messageCode(); XCEPT_DECLARE( stor::exception::WrongI2OMessageType, xcept, msg.str()); sharedResources_->statisticsReporter_-> alarmHandler()->notifySentinel(AlarmHandler::ERROR, xcept); // 24-Jun-2009, KAB - this is not really the best way to track this, // but it's probably better than nothing in the short term. DataSenderMonitorCollection& dataSenderMonColl = sharedResources_-> statisticsReporter_->getDataSenderMonitorCollection(); dataSenderMonColl.addFaultyEventSample(ioc); break; } } }
Definition at line 130 of file EventDistributor.h.
Referenced by clearConsumers(), configuredConsumerCount(), registerDQMEventConsumer(), and tagCompleteEventForQueues().
Definition at line 134 of file EventDistributor.h.
Referenced by clearStreams(), configuredStreamCount(), registerErrorStreams(), and tagCompleteEventForQueues().
Definition at line 138 of file EventDistributor.h.
Referenced by clearConsumers(), configuredConsumerCount(), initializedConsumerCount(), and tagCompleteEventForQueues().
Definition at line 126 of file EventDistributor.h.
Referenced by clearStreams(), configuredStreamCount(), initializedStreamCount(), registerEventStreams(), and tagCompleteEventForQueues().
Definition at line 122 of file EventDistributor.h.
Referenced by addEventToRelevantQueues(), checkForStaleConsumers(), full(), and tagCompleteEventForQueues().