CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
EventDistributor.cc
Go to the documentation of this file.
1 // $Id: EventDistributor.cc,v 1.26 2012/04/20 10:48:02 mommsen Exp $
3 
21 
23 
24 using namespace stor;
25 
26 
28  sharedResources_(sr)
29 {}
30 
31 
33 {
34  clearStreams();
36 }
37 
39 {
40  // special handling for faulty or incomplete events
41  if ( ioc.faulty() || !ioc.complete() )
42  {
43  std::ostringstream msg;
44  msg << "Faulty or incomplete I2OChain for event "
45  << ioc.fragmentKey().event_
46  << ": 0x" << std::hex << ioc.faultyBits()
47  << " received from " << ioc.hltURL()
48  << " (rbBufferId " << ioc.rbBufferId() << ").";
49  XCEPT_DECLARE( stor::exception::IncompleteEventMessage,
50  xcept, msg.str());
51  sharedResources_->alarmHandler_->
52  notifySentinel(AlarmHandler::ERROR, xcept);
53 
54  DataSenderMonitorCollection& dataSenderMonColl =
55  sharedResources_->statisticsReporter_->getDataSenderMonitorCollection();
56  dataSenderMonColl.addFaultyEventSample(ioc);
57 
58  if ( !( sharedResources_->configuration_->getDiskWritingParams().faultyEventsStream_.empty() ) &&
59  ( ioc.i2oMessageCode() == I2O_SM_DATA || ioc.i2oMessageCode() == I2O_SM_ERROR) )
60  ioc.tagForStream(0); // special stream for faulty events
61  }
62  else
63  {
65  }
66 
67  // Check if event belongs here at all:
68  bool unexpected = true;
69 
70  if( ioc.isTaggedForAnyStream() )
71  {
72  unexpected = false;
73  sharedResources_->streamQueue_->enqWait( ioc );
74  }
75 
76  if( ioc.isTaggedForAnyEventConsumer() )
77  {
78  unexpected = false;
79  sharedResources_->eventQueueCollection_->addEvent( ioc );
80  }
81 
82  if( unexpected && ioc.messageCode() == Header::EVENT )
83  {
84  RunMonitorCollection& runMonColl =
85  sharedResources_->statisticsReporter_->getRunMonitorCollection();
86  runMonColl.addUnwantedEvent(ioc);
87  }
88 }
89 
91 {
92  switch( ioc.messageCode() )
93  {
94 
95  case Header::INIT:
96  {
97  InitMsgSharedPtr serializedProds;
98  if( sharedResources_->initMsgCollection_->addIfUnique(ioc, serializedProds) )
99  {
100  try
101  {
102  InitMsgView initMsgView(&(*serializedProds)[0]);
103 
104  for_each(eventStreamSelectors_.begin(),eventStreamSelectors_.end(),
105  boost::bind(&EventStreamSelector::initialize, _1, initMsgView));
106 
107  for_each(eventConsumerSelectors_.begin(), eventConsumerSelectors_.end(),
108  boost::bind(&EventConsumerSelector::initialize, _1, initMsgView));
109  }
110  catch( stor::exception::InvalidEventSelection& e )
111  {
112  sharedResources_->alarmHandler_->
113  notifySentinel(AlarmHandler::ERROR,e);
114  }
115  }
116 
117  DataSenderMonitorCollection& dataSenderMonColl = sharedResources_->
118  statisticsReporter_->getDataSenderMonitorCollection();
119  dataSenderMonColl.addInitSample(ioc);
120 
121  break;
122  }
123 
124  case Header::EVENT:
125  {
126  for( EvtSelList::iterator it = eventStreamSelectors_.begin(),
127  itEnd = eventStreamSelectors_.end();
128  it != itEnd;
129  ++it )
130  {
131  if( (*it)->acceptEvent( ioc ) )
132  {
133  ioc.tagForStream( (*it)->configInfo().streamId() );
134  }
135  }
136  for( ConsSelList::iterator it = eventConsumerSelectors_.begin(),
137  itEnd = eventConsumerSelectors_.end();
138  it != itEnd;
139  ++it )
140  {
141  if( (*it)->acceptEvent( ioc ) )
142  {
143  ioc.tagForEventConsumer( (*it)->queueId() );
144  }
145  }
146 
147  RunMonitorCollection& runMonCollection = sharedResources_->
148  statisticsReporter_->getRunMonitorCollection();
149  runMonCollection.getRunNumbersSeenMQ().addSampleIfLarger(ioc.runNumber());
150  runMonCollection.getLumiSectionsSeenMQ().addSampleIfLarger(ioc.lumiSection());
151  runMonCollection.getEventIDsReceivedMQ().addSample(ioc.eventNumber());
152 
153  DataSenderMonitorCollection& dataSenderMonColl = sharedResources_->
154  statisticsReporter_->getDataSenderMonitorCollection();
155  dataSenderMonColl.addEventSample(ioc);
156 
157  break;
158  }
159 
160  case Header::DQM_EVENT:
161  {
163 
164  for( DQMEvtSelList::iterator it = dqmEventSelectors_.begin(),
165  itEnd = dqmEventSelectors_.end();
166  it != itEnd;
167  ++it)
168  {
169  if( (*it)->acceptEvent( ioc, now ) )
170  {
171  ioc.tagForDQMEventConsumer( (*it)->queueId() );
172  }
173  }
174 
175  DataSenderMonitorCollection& dataSenderMonColl = sharedResources_->
176  statisticsReporter_->getDataSenderMonitorCollection();
177  dataSenderMonColl.addDQMEventSample(ioc);
178 
180  {
181  sharedResources_->dqmEventQueue_->enqNowait( ioc );
182  }
183  else
184  {
185  sharedResources_->statisticsReporter_->getDQMEventMonitorCollection().
186  getDroppedDQMEventCountsMQ().addSample(1);
187  }
188 
189  break;
190  }
191 
192  case Header::ERROR_EVENT:
193  {
194  for( ErrSelList::iterator it = errorStreamSelectors_.begin(),
195  itEnd = errorStreamSelectors_.end();
196  it != itEnd;
197  ++it )
198  {
199  if( (*it)->acceptEvent( ioc ) )
200  {
201  ioc.tagForStream( (*it)->configInfo().streamId() );
202  }
203  }
204 
205  RunMonitorCollection& runMonCollection = sharedResources_->
206  statisticsReporter_->getRunMonitorCollection();
207  runMonCollection.getRunNumbersSeenMQ().addSample(ioc.runNumber());
208  runMonCollection.getLumiSectionsSeenMQ().addSampleIfLarger(ioc.lumiSection());
209  runMonCollection.getErrorEventIDsReceivedMQ().addSample(ioc.eventNumber());
210 
211  DataSenderMonitorCollection& dataSenderMonColl = sharedResources_->
212  statisticsReporter_->getDataSenderMonitorCollection();
213  dataSenderMonColl.addErrorEventSample(ioc);
214 
215  break;
216  }
217 
218  default:
219  {
220  std::ostringstream msg;
221  msg << "I2OChain with unknown message type " <<
222  ioc.messageCode();
223  XCEPT_DECLARE( stor::exception::WrongI2OMessageType,
224  xcept, msg.str());
226  alarmHandler_->notifySentinel(AlarmHandler::ERROR, xcept);
227 
228  // 24-Jun-2009, KAB - this is not really the best way to track this,
229  // but it's probably better than nothing in the short term.
230  DataSenderMonitorCollection& dataSenderMonColl = sharedResources_->
231  statisticsReporter_->getDataSenderMonitorCollection();
232  dataSenderMonColl.addFaultyEventSample(ioc);
233 
234  break;
235  }
236  }
237 }
238 
239 const bool EventDistributor::full() const
240 {
241  return sharedResources_->streamQueue_->full();
242 }
243 
244 
246 (
247  const EventConsRegPtr regPtr
248 )
249 {
250  ConsSelPtr evtSel( new EventConsumerSelector(regPtr) );
251 
252  InitMsgSharedPtr initMsgPtr =
253  sharedResources_->initMsgCollection_->getElementForOutputModuleLabel(
254  regPtr->outputModuleLabel()
255  );
256  if ( initMsgPtr.get() != 0 )
257  {
258  uint8* initPtr = &(*initMsgPtr)[0];
259  InitMsgView initView(initPtr);
260  try
261  {
262  evtSel->initialize( initView );
263  }
264  catch( stor::exception::InvalidEventSelection& e )
265  {
266  sharedResources_->alarmHandler_->
267  notifySentinel(AlarmHandler::ERROR, e);
268  }
269  }
270 
271  eventConsumerSelectors_.insert( evtSel );
272 }
273 
275 {
276  DQMEvtSelPtr dqmEvtSel( new DQMEventSelector(ptr) );
277  dqmEventSelectors_.insert( dqmEvtSel );
278 }
279 
281 {
282  for( EvtStrConfigList::const_iterator it = cl->begin(), itEnd = cl->end();
283  it != itEnd;
284  ++it )
285  {
286  EvtSelPtr evtSel( new EventStreamSelector(*it) );
287  eventStreamSelectors_.insert( evtSel );
288  }
289 }
290 
291 
293 {
294  for( ErrStrConfigList::const_iterator it = cl->begin(), itEnd = cl->end();
295  it != itEnd;
296  ++it )
297  {
298  ErrSelPtr errSel( new ErrorStreamSelector(*it) );
299  errorStreamSelectors_.insert( errSel );
300  }
301 }
302 
303 
305 {
306  eventStreamSelectors_.clear();
307  errorStreamSelectors_.clear();
308 }
309 
310 
312 {
313  return eventStreamSelectors_.size() +
314  errorStreamSelectors_.size();
315 }
316 
317 
319 {
320  unsigned int counter = 0;
321  for (EvtSelList::const_iterator it = eventStreamSelectors_.begin(),
322  itEnd = eventStreamSelectors_.end();
323  it != itEnd;
324  ++it)
325  {
326  if ( (*it)->isInitialized() )
327  ++counter;
328  }
329  return counter;
330 }
331 
332 
334 {
335  eventConsumerSelectors_.clear();
336  dqmEventSelectors_.clear();
337 }
338 
339 
341 {
342  return eventConsumerSelectors_.size() + dqmEventSelectors_.size();
343 }
344 
345 
347 {
348  unsigned int counter = 0;
349  for (ConsSelList::const_iterator it = eventConsumerSelectors_.begin(),
350  itEnd = eventConsumerSelectors_.end();
351  it != itEnd;
352  ++it)
353  {
354  if ( (*it)->isInitialized() )
355  ++counter;
356  }
357  return counter;
358 }
359 
360 
362 {
364 
366  sharedResources_->eventQueueCollection_;
367  eqc->clearStaleQueues(now);
368 
370  sharedResources_->dqmEventQueueCollection_;
371  dqc->clearStaleQueues(now);
372 }
373 
374 
TimePoint_t getCurrentTime()
Definition: Utils.h:158
uint32_t runNumber() const
Definition: I2OChain.cc:585
#define I2O_SM_ERROR
Definition: i2oEvfMsgs.h:23
boost::shared_ptr< DQMEventSelector > DQMEvtSelPtr
const MonitoredQuantity & getLumiSectionsSeenMQ() const
boost::shared_ptr< ErrStrConfigList > ErrStrConfigListPtr
void registerErrorStreams(const ErrStrConfigListPtr)
EventDistributor(SharedResourcesPtr sr)
unsigned short i2oMessageCode() const
Definition: I2OChain.cc:224
void addSample(const double &value=1)
bool complete() const
Definition: I2OChain.cc:118
void tagForEventConsumer(QueueID)
Definition: I2OChain.cc:332
const MonitoredQuantity & getErrorEventIDsReceivedMQ() const
boost::shared_ptr< SharedResources > SharedResourcesPtr
boost::shared_ptr< stor::EventConsumerRegistrationInfo > EventConsRegPtr
uint32 event_
Definition: FragKey.h:34
boost::shared_ptr< InitMsgBuffer > InitMsgSharedPtr
const bool full() const
unsigned int faultyBits() const
Definition: I2OChain.cc:132
unsigned int messageCode() const
Definition: I2OChain.cc:218
SharedResourcesPtr sharedResources_
boost::shared_ptr< DQMEventQueueCollection > DQMEventQueueCollectionPtr
boost::shared_ptr< EvtStrConfigList > EvtStrConfigListPtr
uint32_t lumiSection() const
Definition: I2OChain.cc:595
void registerDQMEventConsumer(const DQMEventConsRegPtr)
DQMEvtSelList dqmEventSelectors_
unsigned int configuredStreamCount() const
boost::shared_ptr< EventStreamSelector > EvtSelPtr
unsigned int configuredConsumerCount() const
bool isTaggedForAnyStream() const
Definition: I2OChain.cc:356
boost::posix_time::ptime TimePoint_t
Definition: Utils.h:35
void registerEventConsumer(const EventConsRegPtr)
float cl
Definition: Combine.cc:71
boost::shared_ptr< EventConsumerSelector > ConsSelPtr
unsigned int initializedStreamCount() const
const MonitoredQuantity & getEventIDsReceivedMQ() const
boost::shared_ptr< ErrorStreamSelector > ErrSelPtr
void initialize(const InitMsgView &)
void initialize(const InitMsgView &)
unsigned int rbBufferId() const
Definition: I2OChain.cc:230
unsigned char uint8
Definition: MsgTools.h:11
FragKey fragmentKey() const
Definition: I2OChain.cc:278
boost::shared_ptr< EventQueueCollection > EventQueueCollectionPtr
void tagCompleteEventForQueues(I2OChain &)
void tagForStream(StreamID)
Definition: I2OChain.cc:320
ConsSelList eventConsumerSelectors_
void addUnwantedEvent(const I2OChain &)
void addSampleIfLarger(const double &value)
void tagForDQMEventConsumer(QueueID)
Definition: I2OChain.cc:344
bool isTaggedForAnyDQMEventConsumer() const
Definition: I2OChain.cc:368
void registerEventStreams(const EvtStrConfigListPtr)
uint32_t eventNumber() const
Definition: I2OChain.cc:605
bool faulty() const
Definition: I2OChain.cc:125
#define I2O_SM_DATA
Definition: i2oEvfMsgs.h:22
const MonitoredQuantity & getRunNumbersSeenMQ() const
boost::shared_ptr< stor::DQMEventConsumerRegistrationInfo > DQMEventConsRegPtr
void addEventToRelevantQueues(I2OChain &)
std::string hltURL() const
Definition: I2OChain.cc:254
unsigned int initializedConsumerCount() const
bool isTaggedForAnyEventConsumer() const
Definition: I2OChain.cc:362