CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Private Types | Private Member Functions | Private Attributes
stor::EventDistributor Class Reference

#include <EventDistributor.h>

Public Member Functions

void addEventToRelevantQueues (I2OChain &)
 
void checkForStaleConsumers ()
 
void clearConsumers ()
 
void clearStreams ()
 
unsigned int configuredConsumerCount () const
 
unsigned int configuredStreamCount () const
 
 EventDistributor (SharedResourcesPtr sr)
 
const bool full () const
 
unsigned int initializedConsumerCount () const
 
unsigned int initializedStreamCount () const
 
void registerDQMEventConsumer (const DQMEventConsRegPtr)
 
void registerErrorStreams (const ErrStrConfigListPtr)
 
void registerEventConsumer (const EventConsRegPtr)
 
void registerEventStreams (const EvtStrConfigListPtr)
 
 ~EventDistributor ()
 

Private Types

typedef std::set< ConsSelPtr,
utils::ptrComp
< EventConsumerSelector > > 
ConsSelList
 
typedef boost::shared_ptr
< EventConsumerSelector
ConsSelPtr
 
typedef std::set< DQMEvtSelPtr,
utils::ptrComp
< DQMEventSelector > > 
DQMEvtSelList
 
typedef boost::shared_ptr
< DQMEventSelector
DQMEvtSelPtr
 
typedef std::set< ErrSelPtr,
utils::ptrComp
< ErrorStreamSelector > > 
ErrSelList
 
typedef boost::shared_ptr
< ErrorStreamSelector
ErrSelPtr
 
typedef std::set< EvtSelPtr,
utils::ptrComp
< EventStreamSelector > > 
EvtSelList
 
typedef boost::shared_ptr
< EventStreamSelector
EvtSelPtr
 

Private Member Functions

void tagCompleteEventForQueues (I2OChain &)
 

Private Attributes

DQMEvtSelList dqmEventSelectors_
 
ErrSelList errorStreamSelectors_
 
ConsSelList eventConsumerSelectors_
 
EvtSelList eventStreamSelectors_
 
SharedResourcesPtr sharedResources_
 

Detailed Description

Distributes complete events to appropriate queues

It receives complete events in form of I2OChains and distributes it to the appropriate queues by checking the I2O message type and the trigger bits in the event header.

Author:
mommsen
Revision:
1.8
Date:
2011/03/07 15:31:31

Definition at line 44 of file EventDistributor.h.

Member Typedef Documentation

Definition at line 137 of file EventDistributor.h.

typedef boost::shared_ptr<EventConsumerSelector> stor::EventDistributor::ConsSelPtr
private

Definition at line 136 of file EventDistributor.h.

Definition at line 129 of file EventDistributor.h.

typedef boost::shared_ptr<DQMEventSelector> stor::EventDistributor::DQMEvtSelPtr
private

Definition at line 128 of file EventDistributor.h.

Definition at line 133 of file EventDistributor.h.

typedef boost::shared_ptr<ErrorStreamSelector> stor::EventDistributor::ErrSelPtr
private

Definition at line 132 of file EventDistributor.h.

Definition at line 125 of file EventDistributor.h.

typedef boost::shared_ptr<EventStreamSelector> stor::EventDistributor::EvtSelPtr
private

Definition at line 124 of file EventDistributor.h.

Constructor & Destructor Documentation

EventDistributor::EventDistributor ( SharedResourcesPtr  sr)

Definition at line 27 of file EventDistributor.cc.

27  :
29 {}
SharedResourcesPtr sharedResources_
EventDistributor::~EventDistributor ( )

Definition at line 32 of file EventDistributor.cc.

References clearConsumers(), and clearStreams().

Member Function Documentation

void EventDistributor::addEventToRelevantQueues ( I2OChain ioc)

Add the event given as I2OChain to the appropriate queues

Definition at line 38 of file EventDistributor.cc.

References stor::DataSenderMonitorCollection::addFaultyEventSample(), stor::RunMonitorCollection::addUnwantedEvent(), stor::I2OChain::complete(), stor::AlarmHandler::ERROR, Header::EVENT, stor::FragKey::event_, stor::I2OChain::faulty(), stor::I2OChain::faultyBits(), stor::I2OChain::fragmentKey(), stor::I2OChain::hltURL(), I2O_SM_DATA, I2O_SM_ERROR, stor::I2OChain::i2oMessageCode(), stor::I2OChain::isTaggedForAnyEventConsumer(), stor::I2OChain::isTaggedForAnyStream(), stor::I2OChain::messageCode(), lumiQueryAPI::msg, stor::I2OChain::rbBufferId(), sharedResources_, tagCompleteEventForQueues(), and stor::I2OChain::tagForStream().

Referenced by stor::DrainingQueues::processStaleFragments().

39 {
40  // special handling for faulty or incomplete events
41  if ( ioc.faulty() || !ioc.complete() )
42  {
43  std::ostringstream msg;
44  msg << "Faulty or incomplete I2OChain for event "
45  << ioc.fragmentKey().event_
46  << ": 0x" << std::hex << ioc.faultyBits()
47  << " received from " << ioc.hltURL()
48  << " (rbBufferId " << ioc.rbBufferId() << ").";
49  XCEPT_DECLARE( stor::exception::IncompleteEventMessage,
50  xcept, msg.str());
51  sharedResources_->alarmHandler_->
52  notifySentinel(AlarmHandler::ERROR, xcept);
53 
54  DataSenderMonitorCollection& dataSenderMonColl =
55  sharedResources_->statisticsReporter_->getDataSenderMonitorCollection();
56  dataSenderMonColl.addFaultyEventSample(ioc);
57 
58  if ( !( sharedResources_->configuration_->getDiskWritingParams().faultyEventsStream_.empty() ) &&
59  ( ioc.i2oMessageCode() == I2O_SM_DATA || ioc.i2oMessageCode() == I2O_SM_ERROR) )
60  ioc.tagForStream(0); // special stream for faulty events
61  }
62  else
63  {
65  }
66 
67  // Check if event belongs here at all:
68  bool unexpected = true;
69 
70  if( ioc.isTaggedForAnyStream() )
71  {
72  unexpected = false;
73  sharedResources_->streamQueue_->enqWait( ioc );
74  }
75 
76  if( ioc.isTaggedForAnyEventConsumer() )
77  {
78  unexpected = false;
79  sharedResources_->eventQueueCollection_->addEvent( ioc );
80  }
81 
82  if( unexpected && ioc.messageCode() == Header::EVENT )
83  {
84  RunMonitorCollection& runMonColl =
85  sharedResources_->statisticsReporter_->getRunMonitorCollection();
86  runMonColl.addUnwantedEvent(ioc);
87  }
88 }
#define I2O_SM_ERROR
Definition: i2oEvfMsgs.h:23
unsigned short i2oMessageCode() const
Definition: I2OChain.cc:224
bool complete() const
Definition: I2OChain.cc:118
uint32 event_
Definition: FragKey.h:34
unsigned int faultyBits() const
Definition: I2OChain.cc:132
unsigned int messageCode() const
Definition: I2OChain.cc:218
SharedResourcesPtr sharedResources_
bool isTaggedForAnyStream() const
Definition: I2OChain.cc:356
unsigned int rbBufferId() const
Definition: I2OChain.cc:230
FragKey fragmentKey() const
Definition: I2OChain.cc:278
void tagCompleteEventForQueues(I2OChain &)
void tagForStream(StreamID)
Definition: I2OChain.cc:320
void addUnwantedEvent(const I2OChain &)
bool faulty() const
Definition: I2OChain.cc:125
#define I2O_SM_DATA
Definition: i2oEvfMsgs.h:22
std::string hltURL() const
Definition: I2OChain.cc:254
bool isTaggedForAnyEventConsumer() const
Definition: I2OChain.cc:362
void EventDistributor::checkForStaleConsumers ( )

Updates staleness info for consumers.

Definition at line 361 of file EventDistributor.cc.

References stor::utils::getCurrentTime(), cmsPerfSuiteHarvest::now, and sharedResources_.

362 {
364 
366  sharedResources_->eventQueueCollection_;
367  eqc->clearStaleQueues(now);
368 
370  sharedResources_->dqmEventQueueCollection_;
371  dqc->clearStaleQueues(now);
372 }
TimePoint_t getCurrentTime()
Definition: Utils.h:158
SharedResourcesPtr sharedResources_
boost::shared_ptr< DQMEventQueueCollection > DQMEventQueueCollectionPtr
boost::posix_time::ptime TimePoint_t
Definition: Utils.h:35
boost::shared_ptr< EventQueueCollection > EventQueueCollectionPtr
void EventDistributor::clearConsumers ( )

Clears out all existing consumer registrations.

Definition at line 333 of file EventDistributor.cc.

References dqmEventSelectors_, and eventConsumerSelectors_.

Referenced by stor::Running::do_entryActionWork(), and ~EventDistributor().

334 {
335  eventConsumerSelectors_.clear();
336  dqmEventSelectors_.clear();
337 }
DQMEvtSelList dqmEventSelectors_
ConsSelList eventConsumerSelectors_
void EventDistributor::clearStreams ( )

Clears out all existing event and error streams.

Definition at line 304 of file EventDistributor.cc.

References errorStreamSelectors_, and eventStreamSelectors_.

Referenced by ~EventDistributor().

305 {
306  eventStreamSelectors_.clear();
307  errorStreamSelectors_.clear();
308 }
unsigned int EventDistributor::configuredConsumerCount ( ) const

Returns the number of consumers that have been configured.

Definition at line 340 of file EventDistributor.cc.

References dqmEventSelectors_, and eventConsumerSelectors_.

341 {
342  return eventConsumerSelectors_.size() + dqmEventSelectors_.size();
343 }
DQMEvtSelList dqmEventSelectors_
ConsSelList eventConsumerSelectors_
unsigned int EventDistributor::configuredStreamCount ( ) const

Returns the number of streams that have been configured.

Definition at line 311 of file EventDistributor.cc.

References errorStreamSelectors_, and eventStreamSelectors_.

312 {
313  return eventStreamSelectors_.size() +
314  errorStreamSelectors_.size();
315 }
const bool EventDistributor::full ( ) const

Returns true if no further events can be processed, e.g. the StreamQueue is full

Definition at line 239 of file EventDistributor.cc.

References sharedResources_.

Referenced by stor::DrainingQueues::allQueuesAndWorkersAreEmpty(), stor::FragmentProcessor::processOneFragmentIfPossible(), and stor::DrainingQueues::processStaleFragments().

240 {
241  return sharedResources_->streamQueue_->full();
242 }
SharedResourcesPtr sharedResources_
unsigned int EventDistributor::initializedConsumerCount ( ) const

Returns the number of consumers that have been configured and initialized.

Definition at line 346 of file EventDistributor.cc.

References eventConsumerSelectors_.

347 {
348  unsigned int counter = 0;
349  for (ConsSelList::const_iterator it = eventConsumerSelectors_.begin(),
350  itEnd = eventConsumerSelectors_.end();
351  it != itEnd;
352  ++it)
353  {
354  if ( (*it)->isInitialized() )
355  ++counter;
356  }
357  return counter;
358 }
ConsSelList eventConsumerSelectors_
unsigned int EventDistributor::initializedStreamCount ( ) const

Returns the number of streams that have been configured and initialized.

Definition at line 318 of file EventDistributor.cc.

References eventStreamSelectors_.

319 {
320  unsigned int counter = 0;
321  for (EvtSelList::const_iterator it = eventStreamSelectors_.begin(),
322  itEnd = eventStreamSelectors_.end();
323  it != itEnd;
324  ++it)
325  {
326  if ( (*it)->isInitialized() )
327  ++counter;
328  }
329  return counter;
330 }
void EventDistributor::registerDQMEventConsumer ( const DQMEventConsRegPtr  ptr)

Registers a new DQM consumer

Definition at line 274 of file EventDistributor.cc.

References dqmEventSelectors_.

Referenced by stor::DQMEventConsumerRegistrationInfo::do_registerMe().

275 {
276  DQMEvtSelPtr dqmEvtSel( new DQMEventSelector(ptr) );
277  dqmEventSelectors_.insert( dqmEvtSel );
278 }
boost::shared_ptr< DQMEventSelector > DQMEvtSelPtr
DQMEvtSelList dqmEventSelectors_
void EventDistributor::registerErrorStreams ( const ErrStrConfigListPtr  cl)

Registers the full set of error event streams.

Definition at line 292 of file EventDistributor.cc.

References errorStreamSelectors_.

Referenced by stor::Running::do_entryActionWork().

293 {
294  for( ErrStrConfigList::const_iterator it = cl->begin(), itEnd = cl->end();
295  it != itEnd;
296  ++it )
297  {
298  ErrSelPtr errSel( new ErrorStreamSelector(*it) );
299  errorStreamSelectors_.insert( errSel );
300  }
301 }
float cl
Definition: Combine.cc:71
boost::shared_ptr< ErrorStreamSelector > ErrSelPtr
void EventDistributor::registerEventConsumer ( const EventConsRegPtr  regPtr)

Registers a new consumer

Definition at line 246 of file EventDistributor.cc.

References alignCSCRings::e, and stor::AlarmHandler::ERROR.

Referenced by stor::EventConsumerRegistrationInfo::do_registerMe().

249 {
250  ConsSelPtr evtSel( new EventConsumerSelector(regPtr) );
251 
252  InitMsgSharedPtr initMsgPtr =
253  sharedResources_->initMsgCollection_->getElementForOutputModuleLabel(
254  regPtr->outputModuleLabel()
255  );
256  if ( initMsgPtr.get() != 0 )
257  {
258  uint8* initPtr = &(*initMsgPtr)[0];
259  InitMsgView initView(initPtr);
260  try
261  {
262  evtSel->initialize( initView );
263  }
264  catch( stor::exception::InvalidEventSelection& e )
265  {
266  sharedResources_->alarmHandler_->
267  notifySentinel(AlarmHandler::ERROR, e);
268  }
269  }
270 
271  eventConsumerSelectors_.insert( evtSel );
272 }
boost::shared_ptr< InitMsgBuffer > InitMsgSharedPtr
SharedResourcesPtr sharedResources_
boost::shared_ptr< EventConsumerSelector > ConsSelPtr
unsigned char uint8
Definition: MsgTools.h:11
ConsSelList eventConsumerSelectors_
void EventDistributor::registerEventStreams ( const EvtStrConfigListPtr  cl)

Registers the full set of event streams.

Definition at line 280 of file EventDistributor.cc.

References eventStreamSelectors_.

Referenced by stor::Running::do_entryActionWork().

281 {
282  for( EvtStrConfigList::const_iterator it = cl->begin(), itEnd = cl->end();
283  it != itEnd;
284  ++it )
285  {
286  EvtSelPtr evtSel( new EventStreamSelector(*it) );
287  eventStreamSelectors_.insert( evtSel );
288  }
289 }
boost::shared_ptr< EventStreamSelector > EvtSelPtr
float cl
Definition: Combine.cc:71
void EventDistributor::tagCompleteEventForQueues ( I2OChain ioc)
private

Definition at line 90 of file EventDistributor.cc.

References stor::DataSenderMonitorCollection::addDQMEventSample(), stor::DataSenderMonitorCollection::addErrorEventSample(), stor::DataSenderMonitorCollection::addEventSample(), stor::DataSenderMonitorCollection::addFaultyEventSample(), stor::DataSenderMonitorCollection::addInitSample(), stor::MonitoredQuantity::addSample(), stor::MonitoredQuantity::addSampleIfLarger(), Header::DQM_EVENT, dqmEventSelectors_, alignCSCRings::e, stor::AlarmHandler::ERROR, Header::ERROR_EVENT, errorStreamSelectors_, Header::EVENT, eventConsumerSelectors_, stor::I2OChain::eventNumber(), eventStreamSelectors_, stor::utils::getCurrentTime(), stor::RunMonitorCollection::getErrorEventIDsReceivedMQ(), stor::RunMonitorCollection::getEventIDsReceivedMQ(), stor::RunMonitorCollection::getLumiSectionsSeenMQ(), stor::RunMonitorCollection::getRunNumbersSeenMQ(), Header::INIT, stor::EventStreamSelector::initialize(), stor::EventConsumerSelector::initialize(), stor::I2OChain::isTaggedForAnyDQMEventConsumer(), stor::I2OChain::lumiSection(), stor::I2OChain::messageCode(), lumiQueryAPI::msg, cmsPerfSuiteHarvest::now, stor::I2OChain::runNumber(), sharedResources_, stor::I2OChain::tagForDQMEventConsumer(), stor::I2OChain::tagForEventConsumer(), and stor::I2OChain::tagForStream().

Referenced by addEventToRelevantQueues().

91 {
92  switch( ioc.messageCode() )
93  {
94 
95  case Header::INIT:
96  {
97  InitMsgSharedPtr serializedProds;
98  if( sharedResources_->initMsgCollection_->addIfUnique(ioc, serializedProds) )
99  {
100  try
101  {
102  InitMsgView initMsgView(&(*serializedProds)[0]);
103 
104  for_each(eventStreamSelectors_.begin(),eventStreamSelectors_.end(),
105  boost::bind(&EventStreamSelector::initialize, _1, initMsgView));
106 
107  for_each(eventConsumerSelectors_.begin(), eventConsumerSelectors_.end(),
108  boost::bind(&EventConsumerSelector::initialize, _1, initMsgView));
109  }
110  catch( stor::exception::InvalidEventSelection& e )
111  {
112  sharedResources_->alarmHandler_->
113  notifySentinel(AlarmHandler::ERROR,e);
114  }
115  }
116 
117  DataSenderMonitorCollection& dataSenderMonColl = sharedResources_->
118  statisticsReporter_->getDataSenderMonitorCollection();
119  dataSenderMonColl.addInitSample(ioc);
120 
121  break;
122  }
123 
124  case Header::EVENT:
125  {
126  for( EvtSelList::iterator it = eventStreamSelectors_.begin(),
127  itEnd = eventStreamSelectors_.end();
128  it != itEnd;
129  ++it )
130  {
131  if( (*it)->acceptEvent( ioc ) )
132  {
133  ioc.tagForStream( (*it)->configInfo().streamId() );
134  }
135  }
136  for( ConsSelList::iterator it = eventConsumerSelectors_.begin(),
137  itEnd = eventConsumerSelectors_.end();
138  it != itEnd;
139  ++it )
140  {
141  if( (*it)->acceptEvent( ioc ) )
142  {
143  ioc.tagForEventConsumer( (*it)->queueId() );
144  }
145  }
146 
147  RunMonitorCollection& runMonCollection = sharedResources_->
148  statisticsReporter_->getRunMonitorCollection();
149  runMonCollection.getRunNumbersSeenMQ().addSampleIfLarger(ioc.runNumber());
150  runMonCollection.getLumiSectionsSeenMQ().addSampleIfLarger(ioc.lumiSection());
151  runMonCollection.getEventIDsReceivedMQ().addSample(ioc.eventNumber());
152 
153  DataSenderMonitorCollection& dataSenderMonColl = sharedResources_->
154  statisticsReporter_->getDataSenderMonitorCollection();
155  dataSenderMonColl.addEventSample(ioc);
156 
157  break;
158  }
159 
160  case Header::DQM_EVENT:
161  {
163 
164  for( DQMEvtSelList::iterator it = dqmEventSelectors_.begin(),
165  itEnd = dqmEventSelectors_.end();
166  it != itEnd;
167  ++it)
168  {
169  if( (*it)->acceptEvent( ioc, now ) )
170  {
171  ioc.tagForDQMEventConsumer( (*it)->queueId() );
172  }
173  }
174 
175  DataSenderMonitorCollection& dataSenderMonColl = sharedResources_->
176  statisticsReporter_->getDataSenderMonitorCollection();
177  dataSenderMonColl.addDQMEventSample(ioc);
178 
180  {
181  sharedResources_->dqmEventQueue_->enqNowait( ioc );
182  }
183  else
184  {
185  sharedResources_->statisticsReporter_->getDQMEventMonitorCollection().
186  getDroppedDQMEventCountsMQ().addSample(1);
187  }
188 
189  break;
190  }
191 
192  case Header::ERROR_EVENT:
193  {
194  for( ErrSelList::iterator it = errorStreamSelectors_.begin(),
195  itEnd = errorStreamSelectors_.end();
196  it != itEnd;
197  ++it )
198  {
199  if( (*it)->acceptEvent( ioc ) )
200  {
201  ioc.tagForStream( (*it)->configInfo().streamId() );
202  }
203  }
204 
205  RunMonitorCollection& runMonCollection = sharedResources_->
206  statisticsReporter_->getRunMonitorCollection();
207  runMonCollection.getRunNumbersSeenMQ().addSample(ioc.runNumber());
208  runMonCollection.getLumiSectionsSeenMQ().addSampleIfLarger(ioc.lumiSection());
209  runMonCollection.getErrorEventIDsReceivedMQ().addSample(ioc.eventNumber());
210 
211  DataSenderMonitorCollection& dataSenderMonColl = sharedResources_->
212  statisticsReporter_->getDataSenderMonitorCollection();
213  dataSenderMonColl.addErrorEventSample(ioc);
214 
215  break;
216  }
217 
218  default:
219  {
220  std::ostringstream msg;
221  msg << "I2OChain with unknown message type " <<
222  ioc.messageCode();
223  XCEPT_DECLARE( stor::exception::WrongI2OMessageType,
224  xcept, msg.str());
226  alarmHandler_->notifySentinel(AlarmHandler::ERROR, xcept);
227 
228  // 24-Jun-2009, KAB - this is not really the best way to track this,
229  // but it's probably better than nothing in the short term.
230  DataSenderMonitorCollection& dataSenderMonColl = sharedResources_->
231  statisticsReporter_->getDataSenderMonitorCollection();
232  dataSenderMonColl.addFaultyEventSample(ioc);
233 
234  break;
235  }
236  }
237 }
TimePoint_t getCurrentTime()
Definition: Utils.h:158
uint32_t runNumber() const
Definition: I2OChain.cc:585
const MonitoredQuantity & getLumiSectionsSeenMQ() const
void addSample(const double &value=1)
void tagForEventConsumer(QueueID)
Definition: I2OChain.cc:332
const MonitoredQuantity & getErrorEventIDsReceivedMQ() const
boost::shared_ptr< InitMsgBuffer > InitMsgSharedPtr
unsigned int messageCode() const
Definition: I2OChain.cc:218
SharedResourcesPtr sharedResources_
uint32_t lumiSection() const
Definition: I2OChain.cc:595
DQMEvtSelList dqmEventSelectors_
boost::posix_time::ptime TimePoint_t
Definition: Utils.h:35
const MonitoredQuantity & getEventIDsReceivedMQ() const
void initialize(const InitMsgView &)
void initialize(const InitMsgView &)
void tagForStream(StreamID)
Definition: I2OChain.cc:320
ConsSelList eventConsumerSelectors_
void addSampleIfLarger(const double &value)
void tagForDQMEventConsumer(QueueID)
Definition: I2OChain.cc:344
bool isTaggedForAnyDQMEventConsumer() const
Definition: I2OChain.cc:368
uint32_t eventNumber() const
Definition: I2OChain.cc:605
const MonitoredQuantity & getRunNumbersSeenMQ() const

Member Data Documentation

DQMEvtSelList stor::EventDistributor::dqmEventSelectors_
private
ErrSelList stor::EventDistributor::errorStreamSelectors_
private
ConsSelList stor::EventDistributor::eventConsumerSelectors_
private
EvtSelList stor::EventDistributor::eventStreamSelectors_
private
SharedResourcesPtr stor::EventDistributor::sharedResources_
private