8 #include <boost/pointer_cast.hpp>
20 MonitorCollection(updateInterval),
22 updateInterval_(updateInterval),
23 eventTypeMqMap_(updateInterval)
32 boost::mutex::scoped_lock sl(statsMutex_);
36 retrieverMqMap_.insert(
40 eventTypeMqMap_.insert(regPtr);
48 return nextConnectionId_;
58 boost::mutex::scoped_lock sl(statsMutex_);
59 RetrieverMqMap::const_iterator
pos = retrieverMqMap_.find(connectionId);
60 if ( pos == retrieverMqMap_.end() )
return false;
61 pos->second->connectionStatus_ =
status;
72 boost::mutex::scoped_lock sl(statsMutex_);
73 RetrieverMqMap::const_iterator
pos = retrieverMqMap_.find(connectionId);
75 if ( pos == retrieverMqMap_.end() )
return false;
77 stats.
regPtr = pos->second->regPtr_;
79 pos->second->size_.getStats(stats.
sizeStats);
88 const unsigned int&
size
91 boost::mutex::scoped_lock sl(statsMutex_);
93 RetrieverMqMap::const_iterator retrieverPos = retrieverMqMap_.find(connectionId);
94 if ( retrieverPos == retrieverMqMap_.end() )
return false;
96 const double sizeKB =
static_cast<double>(
size) / 1024;
97 retrieverPos->second->size_.addSample(sizeKB);
99 const stor::RegPtr regPtr = retrieverPos->second->regPtr_;
101 eventTypeMqMap_.addSample(regPtr, sizeKB);
103 const std::string
sourceURL = regPtr->sourceURL();
104 ConnectionMqMap::const_iterator connectionPos = connectionMqMap_.find(sourceURL);
105 connectionPos->second->addSample(sizeKB);
107 totalSize_.addSample(sizeKB);
124 if ( it->second->connectionStatus_ ==
CONNECTED )
143 it->second->getStats(stats);
159 stats.
regPtr = mq->regPtr_;
162 etsl.push_back(stats);
177 it->second->size_.calculateStatistics();
183 it->second->calculateStatistics();
204 insert(boost::dynamic_pointer_cast<stor::EventConsumerRegistrationInfo>(consumer)) ||
205 insert(boost::dynamic_pointer_cast<stor::DQMEventConsumerRegistrationInfo>(consumer))
214 addSample(boost::dynamic_pointer_cast<stor::EventConsumerRegistrationInfo>(consumer), sizeKB) ||
215 addSample(boost::dynamic_pointer_cast<stor::DQMEventConsumerRegistrationInfo>(consumer), sizeKB)
223 eventTypeStats.clear();
224 eventTypeStats.reserve(eventMap_.size()+dqmEventMap_.size());
226 for (EventMap::const_iterator it = eventMap_.begin(),
227 itEnd = eventMap_.end(); it != itEnd; ++it)
230 it->second->getStats(etStats);
231 eventTypeStats.push_back(
232 std::make_pair(it->first, etStats));
235 for (DQMEventMap::const_iterator it = dqmEventMap_.begin(),
236 itEnd = dqmEventMap_.end(); it != itEnd; ++it)
239 it->second->getStats(etStats);
240 eventTypeStats.push_back(
241 std::make_pair(it->first, etStats));
249 for (EventMap::iterator it = eventMap_.begin(),
250 itEnd = eventMap_.end(); it != itEnd; ++it)
252 it->second->calculateStatistics();
254 for (DQMEventMap::iterator it = dqmEventMap_.begin(),
255 itEnd = dqmEventMap_.end(); it != itEnd; ++it)
257 it->second->calculateStatistics();
266 dqmEventMap_.clear();
273 if ( eventConsumer == 0 )
return false;
285 if ( dqmEventConsumer == 0 )
return false;
297 if ( eventConsumer == 0 )
return false;
298 EventMap::const_iterator
pos = eventMap_.find(eventConsumer);
299 pos->second->addSample(sizeKB);
307 if ( dqmEventConsumer == 0 )
return false;
308 DQMEventMap::const_iterator
pos = dqmEventMap_.find(dqmEventConsumer);
309 pos->second->addSample(sizeKB);
316 if ( regPtr->sourceURL() != other.
regPtr->sourceURL() )
317 return ( regPtr->sourceURL() < other.
regPtr->sourceURL() );
323 if ( ecrp && ecrpOther )
324 return ( *ecrp < *ecrpOther);
330 if ( dcrp && dcrpOther )
331 return ( *dcrp < *dcrpOther);
362 os <<
"Could not connect. SM not running?";
365 os <<
"Lost connection to SM. Did it fail?";
bool operator<(const EventTypeStats &) const
std::vector< EventTypeStats > EventTypeStatList
void getStatsByConnection(ConnectionStats &) const
RetrieverMqMap retrieverMqMap_
boost::shared_ptr< RegistrationInfoBase > RegPtr
void getStatsByEventTypes(EventTypeStatList &) const
stor::MonitoredQuantity::Stats sizeStats
boost::shared_ptr< MonitoredQuantity > MonitoredQuantityPtr
bool addSample(const stor::RegPtr, const double &sizeKB)
bool setConnectionStatus(const ConnectionID &, const ConnectionStatus &)
bool getEventTypeStatsForConnection(const ConnectionID &, EventTypeStats &)
DataRetrieverMonitorCollection(const stor::utils::Duration_t &updateInterval)
bool insert(const stor::RegPtr)
boost::shared_ptr< stor::EventConsumerRegistrationInfo > EventConsRegPtr
Container::value_type value_type
void getStats(Stats &stats) const
ConnectionMqMap connectionMqMap_
void calculateStatistics()
bool addRetrievedSample(const ConnectionID &, const unsigned int &size)
virtual void do_calculateStatistics()
EventTypeMqMap eventTypeMqMap_
ConnectionStatus connectionStatus
void calculateStatistics(const utils::TimePoint_t ¤tTime=utils::getCurrentTime())
void getSummaryStats(SummaryStats &) const
boost::posix_time::time_duration Duration_t
stor::MonitoredQuantity::Stats sizeStats
boost::shared_ptr< DataRetrieverMQ > DataRetrieverMQPtr
const stor::utils::Duration_t updateInterval_
EventTypeStatList eventTypeStats
std::map< std::string, stor::MonitoredQuantity::Stats > ConnectionStats
void getStats(SummaryStats::EventTypeStatList &) const
stor::MonitoredQuantity totalSize_
DataRetrieverMQ(stor::RegPtr, const stor::utils::Duration_t &updateInterval)
ConnectionID addNewConnection(const stor::RegPtr)
std::vector< EventTypeStats > EventTypeStatList
tuple size
Write out results.
boost::shared_ptr< stor::DQMEventConsumerRegistrationInfo > DQMEventConsRegPtr