Go to the documentation of this file.00001
00003
00004 #include <string>
00005 #include <sstream>
00006 #include <iomanip>
00007
00008 #include <boost/pointer_cast.hpp>
00009
00010 #include "EventFilter/SMProxyServer/interface/Exception.h"
00011 #include "EventFilter/SMProxyServer/interface/DataRetrieverMonitorCollection.h"
00012
00013
00014 namespace smproxy {
00015
00016 DataRetrieverMonitorCollection::DataRetrieverMonitorCollection
00017 (
00018 const stor::utils::Duration_t& updateInterval
00019 ) :
00020 MonitorCollection(updateInterval),
00021 totalSize_(updateInterval, boost::posix_time::seconds(60)),
00022 updateInterval_(updateInterval),
00023 eventTypeMqMap_(updateInterval)
00024 {}
00025
00026
00027 ConnectionID DataRetrieverMonitorCollection::addNewConnection
00028 (
00029 const stor::RegPtr regPtr
00030 )
00031 {
00032 boost::mutex::scoped_lock sl(statsMutex_);
00033 ++nextConnectionId_;
00034
00035 DataRetrieverMQPtr dataRetrieverMQ( new DataRetrieverMQ(regPtr, updateInterval_) );
00036 retrieverMqMap_.insert(
00037 RetrieverMqMap::value_type(nextConnectionId_, dataRetrieverMQ)
00038 );
00039
00040 eventTypeMqMap_.insert(regPtr);
00041
00042 connectionMqMap_.insert(ConnectionMqMap::value_type(regPtr->sourceURL(),
00043 stor::MonitoredQuantityPtr(
00044 new stor::MonitoredQuantity(updateInterval_, boost::posix_time::seconds(60))
00045 )
00046 ));
00047
00048 return nextConnectionId_;
00049 }
00050
00051
00052 bool DataRetrieverMonitorCollection::setConnectionStatus
00053 (
00054 const ConnectionID& connectionId,
00055 const ConnectionStatus& status
00056 )
00057 {
00058 boost::mutex::scoped_lock sl(statsMutex_);
00059 RetrieverMqMap::const_iterator pos = retrieverMqMap_.find(connectionId);
00060 if ( pos == retrieverMqMap_.end() ) return false;
00061 pos->second->connectionStatus_ = status;
00062 return true;
00063 }
00064
00065
00066 bool DataRetrieverMonitorCollection::getEventTypeStatsForConnection
00067 (
00068 const ConnectionID& connectionId,
00069 EventTypeStats& stats
00070 )
00071 {
00072 boost::mutex::scoped_lock sl(statsMutex_);
00073 RetrieverMqMap::const_iterator pos = retrieverMqMap_.find(connectionId);
00074
00075 if ( pos == retrieverMqMap_.end() ) return false;
00076
00077 stats.regPtr = pos->second->regPtr_;
00078 stats.connectionStatus = pos->second->connectionStatus_;
00079 pos->second->size_.getStats(stats.sizeStats);
00080
00081 return true;
00082 }
00083
00084
00085 bool DataRetrieverMonitorCollection::addRetrievedSample
00086 (
00087 const ConnectionID& connectionId,
00088 const unsigned int& size
00089 )
00090 {
00091 boost::mutex::scoped_lock sl(statsMutex_);
00092
00093 RetrieverMqMap::const_iterator retrieverPos = retrieverMqMap_.find(connectionId);
00094 if ( retrieverPos == retrieverMqMap_.end() ) return false;
00095
00096 const double sizeKB = static_cast<double>(size) / 1024;
00097 retrieverPos->second->size_.addSample(sizeKB);
00098
00099 const stor::RegPtr regPtr = retrieverPos->second->regPtr_;
00100
00101 eventTypeMqMap_.addSample(regPtr, sizeKB);
00102
00103 const std::string sourceURL = regPtr->sourceURL();
00104 ConnectionMqMap::const_iterator connectionPos = connectionMqMap_.find(sourceURL);
00105 connectionPos->second->addSample(sizeKB);
00106
00107 totalSize_.addSample(sizeKB);
00108
00109 return true;
00110 }
00111
00112
00113 void DataRetrieverMonitorCollection::getSummaryStats(SummaryStats& stats) const
00114 {
00115 boost::mutex::scoped_lock sl(statsMutex_);
00116
00117 stats.registeredSMs = 0;
00118 stats.activeSMs = 0;
00119
00120 for (RetrieverMqMap::const_iterator it = retrieverMqMap_.begin(),
00121 itEnd = retrieverMqMap_.end(); it != itEnd; ++it)
00122 {
00123 ++stats.registeredSMs;
00124 if ( it->second->connectionStatus_ == CONNECTED )
00125 ++stats.activeSMs;
00126 }
00127
00128 eventTypeMqMap_.getStats(stats.eventTypeStats);
00129
00130 totalSize_.getStats(stats.sizeStats);
00131 }
00132
00133
00134 void DataRetrieverMonitorCollection::getStatsByConnection(ConnectionStats& cs) const
00135 {
00136 boost::mutex::scoped_lock sl(statsMutex_);
00137 cs.clear();
00138
00139 for (ConnectionMqMap::const_iterator it = connectionMqMap_.begin(),
00140 itEnd = connectionMqMap_.end(); it != itEnd; ++it)
00141 {
00142 stor::MonitoredQuantity::Stats stats;
00143 it->second->getStats(stats);
00144 cs.insert(ConnectionStats::value_type(it->first, stats));
00145 }
00146 }
00147
00148
00149 void DataRetrieverMonitorCollection::getStatsByEventTypes(EventTypeStatList& etsl) const
00150 {
00151 boost::mutex::scoped_lock sl(statsMutex_);
00152 etsl.clear();
00153
00154 for (RetrieverMqMap::const_iterator it = retrieverMqMap_.begin(),
00155 itEnd = retrieverMqMap_.end(); it != itEnd; ++it)
00156 {
00157 const DataRetrieverMQPtr mq = it->second;
00158 EventTypeStats stats;
00159 stats.regPtr = mq->regPtr_;
00160 stats.connectionStatus = mq->connectionStatus_;
00161 mq->size_.getStats(stats.sizeStats);
00162 etsl.push_back(stats);
00163 }
00164 std::sort(etsl.begin(), etsl.end());
00165 }
00166
00167
00168 void DataRetrieverMonitorCollection::do_calculateStatistics()
00169 {
00170 boost::mutex::scoped_lock sl(statsMutex_);
00171
00172 totalSize_.calculateStatistics();
00173
00174 for (RetrieverMqMap::const_iterator it = retrieverMqMap_.begin(),
00175 itEnd = retrieverMqMap_.end(); it != itEnd; ++it)
00176 {
00177 it->second->size_.calculateStatistics();
00178 }
00179
00180 for (ConnectionMqMap::const_iterator it = connectionMqMap_.begin(),
00181 itEnd = connectionMqMap_.end(); it != itEnd; ++it)
00182 {
00183 it->second->calculateStatistics();
00184 }
00185
00186 eventTypeMqMap_.calculateStatistics();
00187 }
00188
00189
00190 void DataRetrieverMonitorCollection::do_reset()
00191 {
00192 boost::mutex::scoped_lock sl(statsMutex_);
00193 totalSize_.reset();
00194 retrieverMqMap_.clear();
00195 connectionMqMap_.clear();
00196 eventTypeMqMap_.clear();
00197 }
00198
00199
00200 bool DataRetrieverMonitorCollection::EventTypeMqMap::
00201 insert(const stor::RegPtr consumer)
00202 {
00203 return (
00204 insert(boost::dynamic_pointer_cast<stor::EventConsumerRegistrationInfo>(consumer)) ||
00205 insert(boost::dynamic_pointer_cast<stor::DQMEventConsumerRegistrationInfo>(consumer))
00206 );
00207 }
00208
00209
00210 bool DataRetrieverMonitorCollection::EventTypeMqMap::
00211 addSample(const stor::RegPtr consumer, const double& sizeKB)
00212 {
00213 return (
00214 addSample(boost::dynamic_pointer_cast<stor::EventConsumerRegistrationInfo>(consumer), sizeKB) ||
00215 addSample(boost::dynamic_pointer_cast<stor::DQMEventConsumerRegistrationInfo>(consumer), sizeKB)
00216 );
00217 }
00218
00219
00220 void DataRetrieverMonitorCollection::EventTypeMqMap::
00221 getStats(SummaryStats::EventTypeStatList& eventTypeStats) const
00222 {
00223 eventTypeStats.clear();
00224 eventTypeStats.reserve(eventMap_.size()+dqmEventMap_.size());
00225
00226 for (EventMap::const_iterator it = eventMap_.begin(),
00227 itEnd = eventMap_.end(); it != itEnd; ++it)
00228 {
00229 stor::MonitoredQuantity::Stats etStats;
00230 it->second->getStats(etStats);
00231 eventTypeStats.push_back(
00232 std::make_pair(it->first, etStats));
00233 }
00234
00235 for (DQMEventMap::const_iterator it = dqmEventMap_.begin(),
00236 itEnd = dqmEventMap_.end(); it != itEnd; ++it)
00237 {
00238 stor::MonitoredQuantity::Stats etStats;
00239 it->second->getStats(etStats);
00240 eventTypeStats.push_back(
00241 std::make_pair(it->first, etStats));
00242 }
00243 }
00244
00245
00246 void DataRetrieverMonitorCollection::EventTypeMqMap::
00247 calculateStatistics()
00248 {
00249 for (EventMap::iterator it = eventMap_.begin(),
00250 itEnd = eventMap_.end(); it != itEnd; ++it)
00251 {
00252 it->second->calculateStatistics();
00253 }
00254 for (DQMEventMap::iterator it = dqmEventMap_.begin(),
00255 itEnd = dqmEventMap_.end(); it != itEnd; ++it)
00256 {
00257 it->second->calculateStatistics();
00258 }
00259 }
00260
00261
00262 void DataRetrieverMonitorCollection::EventTypeMqMap::
00263 clear()
00264 {
00265 eventMap_.clear();
00266 dqmEventMap_.clear();
00267 }
00268
00269
00270 bool DataRetrieverMonitorCollection::EventTypeMqMap::
00271 insert(const stor::EventConsRegPtr eventConsumer)
00272 {
00273 if ( eventConsumer == 0 ) return false;
00274 eventMap_.insert(EventMap::value_type(eventConsumer,
00275 stor::MonitoredQuantityPtr(
00276 new stor::MonitoredQuantity( updateInterval_, boost::posix_time::seconds(60) )
00277 )));
00278 return true;
00279 }
00280
00281
00282 bool DataRetrieverMonitorCollection::EventTypeMqMap::
00283 insert(const stor::DQMEventConsRegPtr dqmEventConsumer)
00284 {
00285 if ( dqmEventConsumer == 0 ) return false;
00286 dqmEventMap_.insert(DQMEventMap::value_type(dqmEventConsumer,
00287 stor::MonitoredQuantityPtr(
00288 new stor::MonitoredQuantity( updateInterval_, boost::posix_time::seconds(60) )
00289 )));
00290 return true;
00291 }
00292
00293
00294 bool DataRetrieverMonitorCollection::EventTypeMqMap::
00295 addSample(const stor::EventConsRegPtr eventConsumer, const double& sizeKB)
00296 {
00297 if ( eventConsumer == 0 ) return false;
00298 EventMap::const_iterator pos = eventMap_.find(eventConsumer);
00299 pos->second->addSample(sizeKB);
00300 return true;
00301 }
00302
00303
00304 bool DataRetrieverMonitorCollection::EventTypeMqMap::
00305 addSample(const stor::DQMEventConsRegPtr dqmEventConsumer, const double& sizeKB)
00306 {
00307 if ( dqmEventConsumer == 0 ) return false;
00308 DQMEventMap::const_iterator pos = dqmEventMap_.find(dqmEventConsumer);
00309 pos->second->addSample(sizeKB);
00310 return true;
00311 }
00312
00313
00314 bool DataRetrieverMonitorCollection::EventTypeStats::operator<(const EventTypeStats& other) const
00315 {
00316 if ( regPtr->sourceURL() != other.regPtr->sourceURL() )
00317 return ( regPtr->sourceURL() < other.regPtr->sourceURL() );
00318
00319 stor::EventConsRegPtr ecrp =
00320 boost::dynamic_pointer_cast<stor::EventConsumerRegistrationInfo>(regPtr);
00321 stor::EventConsRegPtr ecrpOther =
00322 boost::dynamic_pointer_cast<stor::EventConsumerRegistrationInfo>(other.regPtr);
00323 if ( ecrp && ecrpOther )
00324 return ( *ecrp < *ecrpOther);
00325
00326 stor::DQMEventConsRegPtr dcrp =
00327 boost::dynamic_pointer_cast<stor::DQMEventConsumerRegistrationInfo>(regPtr);
00328 stor::DQMEventConsRegPtr dcrpOther =
00329 boost::dynamic_pointer_cast<stor::DQMEventConsumerRegistrationInfo>(other.regPtr);
00330 if ( dcrp && dcrpOther )
00331 return ( *dcrp < *dcrpOther);
00332
00333 return false;
00334 }
00335
00336
00337 DataRetrieverMonitorCollection::DataRetrieverMQ::DataRetrieverMQ
00338 (
00339 const stor::RegPtr regPtr,
00340 const stor::utils::Duration_t& updateInterval
00341 ):
00342 regPtr_(regPtr),
00343 connectionStatus_(UNKNOWN),
00344 size_(updateInterval, boost::posix_time::seconds(60))
00345 {}
00346
00347 }
00348
00349
00350 std::ostream& smproxy::operator<<
00351 (
00352 std::ostream& os,
00353 const DataRetrieverMonitorCollection::ConnectionStatus& status
00354 )
00355 {
00356 switch (status)
00357 {
00358 case DataRetrieverMonitorCollection::CONNECTED :
00359 os << "Connected";
00360 break;
00361 case DataRetrieverMonitorCollection::CONNECTION_FAILED :
00362 os << "Could not connect. SM not running?";
00363 break;
00364 case DataRetrieverMonitorCollection::DISCONNECTED :
00365 os << "Lost connection to SM. Did it fail?";
00366 break;
00367 case DataRetrieverMonitorCollection::UNKNOWN :
00368 os << "unknown";
00369 break;
00370 }
00371
00372 return os;
00373 }
00374
00375