Go to the documentation of this file.00001
00003
00004 #include "EventFilter/StorageManager/interface/ConsumerMonitorCollection.h"
00005 #include "EventFilter/StorageManager/interface/QueueID.h"
00006 #include "EventFilter/StorageManager/interface/MonitoredQuantity.h"
00007
00008
00009 namespace stor {
00010
00011 ConsumerMonitorCollection::ConsumerMonitorCollection
00012 (
00013 const utils::Duration_t& updateInterval,
00014 const utils::Duration_t& recentDuration
00015 ):
00016 MonitorCollection(updateInterval),
00017 updateInterval_(updateInterval),
00018 recentDuration_(recentDuration),
00019 totalQueuedMQ_(updateInterval, recentDuration),
00020 totalDroppedMQ_(updateInterval, recentDuration),
00021 totalServedMQ_(updateInterval, recentDuration)
00022 {}
00023
00024
00025 void ConsumerMonitorCollection::addQueuedEventSample
00026 (
00027 const QueueID& qid,
00028 const unsigned int& data_size
00029 )
00030 {
00031 boost::mutex::scoped_lock l( mutex_ );
00032 addEventSampleToMap(qid, data_size, qmap_);
00033 totalQueuedMQ_.addSample(data_size);
00034 }
00035
00036
00037 void ConsumerMonitorCollection::addDroppedEvents
00038 (
00039 const QueueID& qid,
00040 const size_t& count
00041 )
00042 {
00043 boost::mutex::scoped_lock l( mutex_ );
00044 addEventSampleToMap(qid, count, dmap_);
00045 totalDroppedMQ_.addSample(count);
00046 }
00047
00048
00049 void ConsumerMonitorCollection::addServedEventSample
00050 (
00051 const QueueID& qid,
00052 const unsigned int& data_size
00053 )
00054 {
00055 boost::mutex::scoped_lock l( mutex_ );
00056 addEventSampleToMap(qid, data_size, smap_);
00057 totalServedMQ_.addSample(data_size);
00058 }
00059
00060
00061 void ConsumerMonitorCollection::addEventSampleToMap
00062 (
00063 const QueueID& qid,
00064 const unsigned int& data_size,
00065 ConsStatMap& map
00066 )
00067 {
00068 ConsStatMap::iterator pos = map.lower_bound(qid);
00069
00070
00071
00072
00073
00074
00075
00076 if (pos == map.end() || (map.key_comp()(qid, pos->first)))
00077 {
00078
00079
00080 pos = map.insert(pos,
00081 ConsStatMap::value_type(qid,
00082 MonitoredQuantityPtr(
00083 new MonitoredQuantity(updateInterval_, recentDuration_)
00084 )
00085 )
00086 );
00087 }
00088
00089 pos->second->addSample( data_size );
00090 }
00091
00092
00093 bool ConsumerMonitorCollection::getQueued
00094 (
00095 const QueueID& qid,
00096 MonitoredQuantity::Stats& result
00097 ) const
00098 {
00099 boost::mutex::scoped_lock l( mutex_ );
00100 return getValueFromMap( qid, result, qmap_ );
00101 }
00102
00103
00104 bool ConsumerMonitorCollection::getServed
00105 (
00106 const QueueID& qid,
00107 MonitoredQuantity::Stats& result
00108 ) const
00109 {
00110 boost::mutex::scoped_lock l( mutex_ );
00111 return getValueFromMap( qid, result, smap_ );
00112 }
00113
00114
00115 bool ConsumerMonitorCollection::getDropped
00116 (
00117 const QueueID& qid,
00118 MonitoredQuantity::Stats& result
00119 ) const
00120 {
00121 boost::mutex::scoped_lock l( mutex_ );
00122 return getValueFromMap( qid, result, dmap_ );
00123 }
00124
00125 bool ConsumerMonitorCollection::getValueFromMap
00126 (
00127 const QueueID& qid,
00128 MonitoredQuantity::Stats& result,
00129 const ConsStatMap& map
00130 ) const
00131 {
00132 ConsStatMap::const_iterator pos = map.find(qid);
00133
00134 if (pos == map.end()) return false;
00135
00136 pos->second->getStats( result );
00137 return true;
00138 }
00139
00140
00141 void ConsumerMonitorCollection::getTotalStats( TotalStats& totalStats ) const
00142 {
00143 totalQueuedMQ_.getStats(totalStats.queuedStats);
00144 totalDroppedMQ_.getStats(totalStats.droppedStats);
00145 totalServedMQ_.getStats(totalStats.servedStats);
00146 }
00147
00148 void ConsumerMonitorCollection::resetCounters()
00149 {
00150 boost::mutex::scoped_lock l( mutex_ );
00151 for( ConsStatMap::iterator i = qmap_.begin(); i != qmap_.end(); ++i )
00152 i->second->reset();
00153 for( ConsStatMap::iterator i = smap_.begin(); i != smap_.end(); ++i )
00154 i->second->reset();
00155 for( ConsStatMap::iterator i = dmap_.begin(); i != dmap_.end(); ++i )
00156 i->second->reset();
00157
00158 totalQueuedMQ_.reset();
00159 totalDroppedMQ_.reset();
00160 totalServedMQ_.reset();
00161 }
00162
00163
00164 void ConsumerMonitorCollection::do_calculateStatistics()
00165 {
00166 boost::mutex::scoped_lock l( mutex_ );
00167 for( ConsStatMap::iterator i = qmap_.begin(); i != qmap_.end(); ++i )
00168 i->second->calculateStatistics();
00169 for( ConsStatMap::iterator i = smap_.begin(); i != smap_.end(); ++i )
00170 i->second->calculateStatistics();
00171 for( ConsStatMap::iterator i = dmap_.begin(); i != dmap_.end(); ++i )
00172 i->second->calculateStatistics();
00173
00174 totalQueuedMQ_.calculateStatistics();
00175 totalDroppedMQ_.calculateStatistics();
00176 totalServedMQ_.calculateStatistics();
00177 }
00178
00179
00180 void ConsumerMonitorCollection::do_reset()
00181 {
00182 boost::mutex::scoped_lock l( mutex_ );
00183 qmap_.clear();
00184 smap_.clear();
00185 dmap_.clear();
00186
00187 totalQueuedMQ_.reset();
00188 totalDroppedMQ_.reset();
00189 totalServedMQ_.reset();
00190 }
00191
00192 }
00193