8 #include <boost/pointer_cast.hpp>
21 MonitorCollection(updateInterval),
22 updateInterval_(updateInterval),
23 alarmHandler_(alarmHandler),
24 totals_(updateInterval),
25 eventTypeMqMap_(updateInterval)
34 boost::mutex::scoped_lock sl(statsMutex_);
38 retrieverMqMap_.insert(
42 eventTypeMqMap_.insert(regPtr);
49 return nextConnectionId_;
59 boost::mutex::scoped_lock sl(statsMutex_);
60 RetrieverMqMap::const_iterator
pos = retrieverMqMap_.find(connectionId);
61 if ( pos == retrieverMqMap_.end() )
return false;
62 pos->second->connectionStatus_ =
status;
73 boost::mutex::scoped_lock sl(statsMutex_);
74 RetrieverMqMap::const_iterator
pos = retrieverMqMap_.find(connectionId);
76 if ( pos == retrieverMqMap_.end() )
return false;
78 stats.
regPtr = pos->second->regPtr_;
80 pos->second->eventMQ_->getStats(stats.
eventStats);
89 const unsigned int&
size
92 boost::mutex::scoped_lock sl(statsMutex_);
94 RetrieverMqMap::const_iterator retrieverPos = retrieverMqMap_.find(connectionId);
95 if ( retrieverPos == retrieverMqMap_.end() )
return false;
97 const double sizeKB =
static_cast<double>(
size) / 1024;
98 retrieverPos->second->eventMQ_->size_.addSample(sizeKB);
100 const stor::RegPtr regPtr = retrieverPos->second->regPtr_;
102 eventTypeMqMap_.addSample(regPtr, sizeKB);
104 const std::string
sourceURL = regPtr->sourceURL();
105 ConnectionMqMap::const_iterator connectionPos = connectionMqMap_.find(sourceURL);
106 connectionPos->second->size_.addSample(sizeKB);
108 totals_.size_.addSample(sizeKB);
119 boost::mutex::scoped_lock sl(statsMutex_);
121 RetrieverMqMap::const_iterator retrieverPos = retrieverMqMap_.find(connectionId);
122 if ( retrieverPos == retrieverMqMap_.end() )
return false;
124 retrieverPos->second->eventMQ_->corruptedEvents_.addSample(1);
126 const stor::RegPtr regPtr = retrieverPos->second->regPtr_;
128 eventTypeMqMap_.receivedCorruptedEvent(regPtr);
130 const std::string
sourceURL = regPtr->sourceURL();
131 ConnectionMqMap::const_iterator connectionPos = connectionMqMap_.find(sourceURL);
132 connectionPos->second->corruptedEvents_.addSample(1);
134 totals_.corruptedEvents_.addSample(1);
151 if ( it->second->connectionStatus_ ==
CONNECTED )
170 it->second->getStats(stats);
181 boost::mutex::scoped_lock sl(statsMutex_);
184 for (RetrieverMqMap::const_iterator it = retrieverMqMap_.begin(),
185 itEnd = retrieverMqMap_.end(); it != itEnd; ++it)
189 stats.
regPtr = mq->regPtr_;
192 etsl.push_back(stats);
214 const std::string alarmName =
"CorruptedEvents";
218 const double corruptedEventRate =
222 std::ostringstream
msg;
223 msg <<
"Received " << corruptedEventRate <<
" Hz of corrupted events from StorageManagers.";
224 XCEPT_DECLARE(exception::CorruptedEvents, ex, msg.str());
244 it->second->eventMQ_->calculateStatistics();
250 it->second->calculateStatistics();
273 insert(boost::dynamic_pointer_cast<stor::EventConsumerRegistrationInfo>(consumer)) ||
274 insert(boost::dynamic_pointer_cast<stor::DQMEventConsumerRegistrationInfo>(consumer))
283 addSample(boost::dynamic_pointer_cast<stor::EventConsumerRegistrationInfo>(consumer), sizeKB) ||
284 addSample(boost::dynamic_pointer_cast<stor::DQMEventConsumerRegistrationInfo>(consumer), sizeKB)
302 eventTypeStats.clear();
303 eventTypeStats.reserve(eventMap_.size()+dqmEventMap_.size());
305 for (EventMap::const_iterator it = eventMap_.begin(),
306 itEnd = eventMap_.end(); it != itEnd; ++it)
309 it->second->size_.getStats(eventStats.
sizeStats);
311 eventTypeStats.push_back(
312 std::make_pair(it->first, eventStats));
315 for (DQMEventMap::const_iterator it = dqmEventMap_.begin(),
316 itEnd = dqmEventMap_.end(); it != itEnd; ++it)
319 it->second->size_.getStats(eventStats.
sizeStats);
321 eventTypeStats.push_back(
322 std::make_pair(it->first, eventStats));
330 for (EventMap::iterator it = eventMap_.begin(),
331 itEnd = eventMap_.end(); it != itEnd; ++it)
333 it->second->size_.calculateStatistics();
334 it->second->corruptedEvents_.calculateStatistics();
336 for (DQMEventMap::iterator it = dqmEventMap_.begin(),
337 itEnd = dqmEventMap_.end(); it != itEnd; ++it)
339 it->second->size_.calculateStatistics();
340 it->second->corruptedEvents_.calculateStatistics();
349 dqmEventMap_.clear();
356 if ( eventConsumer == 0 )
return false;
367 if ( dqmEventConsumer == 0 )
return false;
378 if ( eventConsumer == 0 )
return false;
379 EventMap::const_iterator
pos = eventMap_.find(eventConsumer);
380 pos->second->size_.addSample(sizeKB);
388 if ( dqmEventConsumer == 0 )
return false;
389 DQMEventMap::const_iterator
pos = dqmEventMap_.find(dqmEventConsumer);
390 pos->second->size_.addSample(sizeKB);
398 if ( eventConsumer == 0 )
return false;
399 EventMap::const_iterator
pos = eventMap_.find(eventConsumer);
400 pos->second->corruptedEvents_.addSample(1);
408 if ( dqmEventConsumer == 0 )
return false;
409 DQMEventMap::const_iterator
pos = dqmEventMap_.find(dqmEventConsumer);
410 pos->second->corruptedEvents_.addSample(1);
418 if ( regPtr->sourceURL() != other.
regPtr->sourceURL() )
419 return ( regPtr->sourceURL() < other.
regPtr->sourceURL() );
425 if ( ecrp && ecrpOther )
426 return ( *ecrp < *ecrpOther);
432 if ( dcrp && dcrpOther )
433 return ( *dcrp < *dcrpOther);
457 size_.calculateStatistics();
458 corruptedEvents_.calculateStatistics();
465 corruptedEvents_.reset();
476 eventMQ_(
new EventMQ(updateInterval))
494 os <<
"Could not connect. SM not running?";
497 os <<
"Lost connection to SM. Did it fail?";
double getValueRate(DataSetType t=FULL) const
void getStats(EventStats &) const
void getStatsByConnection(ConnectionStats &) const
std::map< std::string, EventStats > ConnectionStats
auto_ptr< ClusterSequence > cs
RetrieverMqMap retrieverMqMap_
boost::shared_ptr< RegistrationInfoBase > RegPtr
double corruptedEventRate_
boost::shared_ptr< EventMQ > EventMQPtr
void getStatsByEventTypesPerConnection(EventTypePerConnectionStatList &) const
bool addSample(const stor::RegPtr, const double &sizeKB)
bool setConnectionStatus(const ConnectionID &, const ConnectionStatus &)
bool receivedCorruptedEvent(const stor::RegPtr)
bool receivedCorruptedEvent(const ConnectionID &)
bool insert(const stor::RegPtr)
boost::shared_ptr< stor::EventConsumerRegistrationInfo > EventConsRegPtr
ConnectionMqMap connectionMqMap_
void calculateStatistics()
stor::MonitoredQuantity::Stats sizeStats
bool addRetrievedSample(const ConnectionID &, const unsigned int &size)
virtual void do_calculateStatistics()
EventTypeMqMap eventTypeMqMap_
void getSummaryStats(SummaryStats &) const
boost::posix_time::time_duration Duration_t
std::vector< EventTypePerConnectionStats > EventTypePerConnectionStatList
bool operator<(const EventTypePerConnectionStats &) const
boost::shared_ptr< DataRetrieverMQ > DataRetrieverMQPtr
void calculateStatistics()
Container::value_type value_type
boost::shared_ptr< AlarmHandler > AlarmHandlerPtr
void configureAlarms(AlarmParams const &)
const stor::utils::Duration_t updateInterval_
EventTypeStatList eventTypeStats
void getStats(SummaryStats::EventTypeStatList &) const
ConnectionStatus connectionStatus
stor::MonitoredQuantity::Stats corruptedEventsStats
DataRetrieverMQ(stor::RegPtr, const stor::utils::Duration_t &updateInterval)
EventMQ(const stor::utils::Duration_t &updateInterval)
ConnectionID addNewConnection(const stor::RegPtr)
stor::AlarmHandlerPtr alarmHandler_
void checkForCorruptedEvents()
std::vector< EventTypeStats > EventTypeStatList
DataRetrieverMonitorCollection(const stor::utils::Duration_t &updateInterval, stor::AlarmHandlerPtr)
bool getEventTypeStatsForConnection(const ConnectionID &, EventTypePerConnectionStats &)
tuple size
Write out results.
boost::shared_ptr< stor::DQMEventConsumerRegistrationInfo > DQMEventConsRegPtr