Go to the documentation of this file.00001
00003
00004 #include <string>
00005 #include <sstream>
00006 #include <iomanip>
00007
00008 #include <boost/pointer_cast.hpp>
00009
00010 #include "EventFilter/SMProxyServer/interface/Exception.h"
00011 #include "EventFilter/SMProxyServer/interface/DataRetrieverMonitorCollection.h"
00012
00013
00014 namespace smproxy {
00015
00016 DataRetrieverMonitorCollection::DataRetrieverMonitorCollection
00017 (
00018 const stor::utils::Duration_t& updateInterval,
00019 stor::AlarmHandlerPtr alarmHandler
00020 ) :
00021 MonitorCollection(updateInterval),
00022 updateInterval_(updateInterval),
00023 alarmHandler_(alarmHandler),
00024 totals_(updateInterval),
00025 eventTypeMqMap_(updateInterval)
00026 {}
00027
00028
00029 ConnectionID DataRetrieverMonitorCollection::addNewConnection
00030 (
00031 const stor::RegPtr regPtr
00032 )
00033 {
00034 boost::mutex::scoped_lock sl(statsMutex_);
00035 ++nextConnectionId_;
00036
00037 DataRetrieverMQPtr dataRetrieverMQ( new DataRetrieverMQ(regPtr, updateInterval_) );
00038 retrieverMqMap_.insert(
00039 RetrieverMqMap::value_type(nextConnectionId_, dataRetrieverMQ)
00040 );
00041
00042 eventTypeMqMap_.insert(regPtr);
00043
00044 connectionMqMap_.insert(ConnectionMqMap::value_type(
00045 regPtr->sourceURL(),
00046 EventMQPtr(new EventMQ(updateInterval_))
00047 ));
00048
00049 return nextConnectionId_;
00050 }
00051
00052
00053 bool DataRetrieverMonitorCollection::setConnectionStatus
00054 (
00055 const ConnectionID& connectionId,
00056 const ConnectionStatus& status
00057 )
00058 {
00059 boost::mutex::scoped_lock sl(statsMutex_);
00060 RetrieverMqMap::const_iterator pos = retrieverMqMap_.find(connectionId);
00061 if ( pos == retrieverMqMap_.end() ) return false;
00062 pos->second->connectionStatus_ = status;
00063 return true;
00064 }
00065
00066
00067 bool DataRetrieverMonitorCollection::getEventTypeStatsForConnection
00068 (
00069 const ConnectionID& connectionId,
00070 EventTypePerConnectionStats& stats
00071 )
00072 {
00073 boost::mutex::scoped_lock sl(statsMutex_);
00074 RetrieverMqMap::const_iterator pos = retrieverMqMap_.find(connectionId);
00075
00076 if ( pos == retrieverMqMap_.end() ) return false;
00077
00078 stats.regPtr = pos->second->regPtr_;
00079 stats.connectionStatus = pos->second->connectionStatus_;
00080 pos->second->eventMQ_->getStats(stats.eventStats);
00081
00082 return true;
00083 }
00084
00085
00086 bool DataRetrieverMonitorCollection::addRetrievedSample
00087 (
00088 const ConnectionID& connectionId,
00089 const unsigned int& size
00090 )
00091 {
00092 boost::mutex::scoped_lock sl(statsMutex_);
00093
00094 RetrieverMqMap::const_iterator retrieverPos = retrieverMqMap_.find(connectionId);
00095 if ( retrieverPos == retrieverMqMap_.end() ) return false;
00096
00097 const double sizeKB = static_cast<double>(size) / 1024;
00098 retrieverPos->second->eventMQ_->size_.addSample(sizeKB);
00099
00100 const stor::RegPtr regPtr = retrieverPos->second->regPtr_;
00101
00102 eventTypeMqMap_.addSample(regPtr, sizeKB);
00103
00104 const std::string sourceURL = regPtr->sourceURL();
00105 ConnectionMqMap::const_iterator connectionPos = connectionMqMap_.find(sourceURL);
00106 connectionPos->second->size_.addSample(sizeKB);
00107
00108 totals_.size_.addSample(sizeKB);
00109
00110 return true;
00111 }
00112
00113
00114 bool DataRetrieverMonitorCollection::receivedCorruptedEvent
00115 (
00116 const ConnectionID& connectionId
00117 )
00118 {
00119 boost::mutex::scoped_lock sl(statsMutex_);
00120
00121 RetrieverMqMap::const_iterator retrieverPos = retrieverMqMap_.find(connectionId);
00122 if ( retrieverPos == retrieverMqMap_.end() ) return false;
00123
00124 retrieverPos->second->eventMQ_->corruptedEvents_.addSample(1);
00125
00126 const stor::RegPtr regPtr = retrieverPos->second->regPtr_;
00127
00128 eventTypeMqMap_.receivedCorruptedEvent(regPtr);
00129
00130 const std::string sourceURL = regPtr->sourceURL();
00131 ConnectionMqMap::const_iterator connectionPos = connectionMqMap_.find(sourceURL);
00132 connectionPos->second->corruptedEvents_.addSample(1);
00133
00134 totals_.corruptedEvents_.addSample(1);
00135
00136 return true;
00137 }
00138
00139
00140 void DataRetrieverMonitorCollection::getSummaryStats(SummaryStats& stats) const
00141 {
00142 boost::mutex::scoped_lock sl(statsMutex_);
00143
00144 stats.registeredSMs = 0;
00145 stats.activeSMs = 0;
00146
00147 for (RetrieverMqMap::const_iterator it = retrieverMqMap_.begin(),
00148 itEnd = retrieverMqMap_.end(); it != itEnd; ++it)
00149 {
00150 ++stats.registeredSMs;
00151 if ( it->second->connectionStatus_ == CONNECTED )
00152 ++stats.activeSMs;
00153 }
00154
00155 eventTypeMqMap_.getStats(stats.eventTypeStats);
00156
00157 totals_.getStats(stats.totals);
00158 }
00159
00160
00161 void DataRetrieverMonitorCollection::getStatsByConnection(ConnectionStats& cs) const
00162 {
00163 boost::mutex::scoped_lock sl(statsMutex_);
00164 cs.clear();
00165
00166 for (ConnectionMqMap::const_iterator it = connectionMqMap_.begin(),
00167 itEnd = connectionMqMap_.end(); it != itEnd; ++it)
00168 {
00169 EventStats stats;
00170 it->second->getStats(stats);
00171 cs.insert(ConnectionStats::value_type(it->first, stats));
00172 }
00173 }
00174
00175
00176 void DataRetrieverMonitorCollection::getStatsByEventTypesPerConnection
00177 (
00178 EventTypePerConnectionStatList& etsl
00179 ) const
00180 {
00181 boost::mutex::scoped_lock sl(statsMutex_);
00182 etsl.clear();
00183
00184 for (RetrieverMqMap::const_iterator it = retrieverMqMap_.begin(),
00185 itEnd = retrieverMqMap_.end(); it != itEnd; ++it)
00186 {
00187 const DataRetrieverMQPtr mq = it->second;
00188 EventTypePerConnectionStats stats;
00189 stats.regPtr = mq->regPtr_;
00190 stats.connectionStatus = mq->connectionStatus_;
00191 mq->eventMQ_->getStats(stats.eventStats);
00192 etsl.push_back(stats);
00193 }
00194 std::sort(etsl.begin(), etsl.end());
00195 }
00196
00197
00198 void DataRetrieverMonitorCollection::configureAlarms(AlarmParams const& alarmParams)
00199 {
00200 alarmParams_ = alarmParams;
00201 }
00202
00203
00204 void DataRetrieverMonitorCollection::sendAlarms()
00205 {
00206 if ( ! alarmParams_.sendAlarms_ ) return;
00207
00208 checkForCorruptedEvents();
00209 }
00210
00211
00212 void DataRetrieverMonitorCollection::checkForCorruptedEvents()
00213 {
00214 const std::string alarmName = "CorruptedEvents";
00215
00216 EventStats eventStats;
00217 totals_.getStats(eventStats);
00218 const double corruptedEventRate =
00219 eventStats.corruptedEventsStats.getValueRate(stor::MonitoredQuantity::RECENT);
00220 if ( corruptedEventRate > alarmParams_.corruptedEventRate_ )
00221 {
00222 std::ostringstream msg;
00223 msg << "Received " << corruptedEventRate << " Hz of corrupted events from StorageManagers.";
00224 XCEPT_DECLARE(exception::CorruptedEvents, ex, msg.str());
00225 alarmHandler_->raiseAlarm(alarmName, stor::AlarmHandler::ERROR, ex);
00226 }
00227 else if ( corruptedEventRate < (alarmParams_.corruptedEventRate_ * 0.9) )
00228
00229 {
00230 alarmHandler_->revokeAlarm(alarmName);
00231 }
00232 }
00233
00234
00235 void DataRetrieverMonitorCollection::do_calculateStatistics()
00236 {
00237 boost::mutex::scoped_lock sl(statsMutex_);
00238
00239 totals_.calculateStatistics();
00240
00241 for (RetrieverMqMap::const_iterator it = retrieverMqMap_.begin(),
00242 itEnd = retrieverMqMap_.end(); it != itEnd; ++it)
00243 {
00244 it->second->eventMQ_->calculateStatistics();
00245 }
00246
00247 for (ConnectionMqMap::const_iterator it = connectionMqMap_.begin(),
00248 itEnd = connectionMqMap_.end(); it != itEnd; ++it)
00249 {
00250 it->second->calculateStatistics();
00251 }
00252
00253 eventTypeMqMap_.calculateStatistics();
00254
00255 sendAlarms();
00256 }
00257
00258
00259 void DataRetrieverMonitorCollection::do_reset()
00260 {
00261 boost::mutex::scoped_lock sl(statsMutex_);
00262 totals_.reset();
00263 retrieverMqMap_.clear();
00264 connectionMqMap_.clear();
00265 eventTypeMqMap_.clear();
00266 }
00267
00268
00269 bool DataRetrieverMonitorCollection::EventTypeMqMap::
00270 insert(const stor::RegPtr consumer)
00271 {
00272 return (
00273 insert(boost::dynamic_pointer_cast<stor::EventConsumerRegistrationInfo>(consumer)) ||
00274 insert(boost::dynamic_pointer_cast<stor::DQMEventConsumerRegistrationInfo>(consumer))
00275 );
00276 }
00277
00278
00279 bool DataRetrieverMonitorCollection::EventTypeMqMap::
00280 addSample(const stor::RegPtr consumer, const double& sizeKB)
00281 {
00282 return (
00283 addSample(boost::dynamic_pointer_cast<stor::EventConsumerRegistrationInfo>(consumer), sizeKB) ||
00284 addSample(boost::dynamic_pointer_cast<stor::DQMEventConsumerRegistrationInfo>(consumer), sizeKB)
00285 );
00286 }
00287
00288
00289 bool DataRetrieverMonitorCollection::EventTypeMqMap::
00290 receivedCorruptedEvent(const stor::RegPtr consumer)
00291 {
00292 return (
00293 receivedCorruptedEvent(boost::dynamic_pointer_cast<stor::EventConsumerRegistrationInfo>(consumer)) ||
00294 receivedCorruptedEvent(boost::dynamic_pointer_cast<stor::DQMEventConsumerRegistrationInfo>(consumer))
00295 );
00296 }
00297
00298
00299 void DataRetrieverMonitorCollection::EventTypeMqMap::
00300 getStats(SummaryStats::EventTypeStatList& eventTypeStats) const
00301 {
00302 eventTypeStats.clear();
00303 eventTypeStats.reserve(eventMap_.size()+dqmEventMap_.size());
00304
00305 for (EventMap::const_iterator it = eventMap_.begin(),
00306 itEnd = eventMap_.end(); it != itEnd; ++it)
00307 {
00308 EventStats eventStats;
00309 it->second->size_.getStats(eventStats.sizeStats);
00310 it->second->corruptedEvents_.getStats(eventStats.corruptedEventsStats);
00311 eventTypeStats.push_back(
00312 std::make_pair(it->first, eventStats));
00313 }
00314
00315 for (DQMEventMap::const_iterator it = dqmEventMap_.begin(),
00316 itEnd = dqmEventMap_.end(); it != itEnd; ++it)
00317 {
00318 EventStats eventStats;
00319 it->second->size_.getStats(eventStats.sizeStats);
00320 it->second->corruptedEvents_.getStats(eventStats.corruptedEventsStats);
00321 eventTypeStats.push_back(
00322 std::make_pair(it->first, eventStats));
00323 }
00324 }
00325
00326
00327 void DataRetrieverMonitorCollection::EventTypeMqMap::
00328 calculateStatistics()
00329 {
00330 for (EventMap::iterator it = eventMap_.begin(),
00331 itEnd = eventMap_.end(); it != itEnd; ++it)
00332 {
00333 it->second->size_.calculateStatistics();
00334 it->second->corruptedEvents_.calculateStatistics();
00335 }
00336 for (DQMEventMap::iterator it = dqmEventMap_.begin(),
00337 itEnd = dqmEventMap_.end(); it != itEnd; ++it)
00338 {
00339 it->second->size_.calculateStatistics();
00340 it->second->corruptedEvents_.calculateStatistics();
00341 }
00342 }
00343
00344
00345 void DataRetrieverMonitorCollection::EventTypeMqMap::
00346 clear()
00347 {
00348 eventMap_.clear();
00349 dqmEventMap_.clear();
00350 }
00351
00352
00353 bool DataRetrieverMonitorCollection::EventTypeMqMap::
00354 insert(const stor::EventConsRegPtr eventConsumer)
00355 {
00356 if ( eventConsumer == 0 ) return false;
00357 eventMap_.insert(EventMap::value_type(eventConsumer,
00358 EventMQPtr( new EventMQ(updateInterval_) )
00359 ));
00360 return true;
00361 }
00362
00363
00364 bool DataRetrieverMonitorCollection::EventTypeMqMap::
00365 insert(const stor::DQMEventConsRegPtr dqmEventConsumer)
00366 {
00367 if ( dqmEventConsumer == 0 ) return false;
00368 dqmEventMap_.insert(DQMEventMap::value_type(dqmEventConsumer,
00369 EventMQPtr( new EventMQ(updateInterval_) )
00370 ));
00371 return true;
00372 }
00373
00374
00375 bool DataRetrieverMonitorCollection::EventTypeMqMap::
00376 addSample(const stor::EventConsRegPtr eventConsumer, const double& sizeKB)
00377 {
00378 if ( eventConsumer == 0 ) return false;
00379 EventMap::const_iterator pos = eventMap_.find(eventConsumer);
00380 pos->second->size_.addSample(sizeKB);
00381 return true;
00382 }
00383
00384
00385 bool DataRetrieverMonitorCollection::EventTypeMqMap::
00386 addSample(const stor::DQMEventConsRegPtr dqmEventConsumer, const double& sizeKB)
00387 {
00388 if ( dqmEventConsumer == 0 ) return false;
00389 DQMEventMap::const_iterator pos = dqmEventMap_.find(dqmEventConsumer);
00390 pos->second->size_.addSample(sizeKB);
00391 return true;
00392 }
00393
00394
00395 bool DataRetrieverMonitorCollection::EventTypeMqMap::
00396 receivedCorruptedEvent(const stor::EventConsRegPtr eventConsumer)
00397 {
00398 if ( eventConsumer == 0 ) return false;
00399 EventMap::const_iterator pos = eventMap_.find(eventConsumer);
00400 pos->second->corruptedEvents_.addSample(1);
00401 return true;
00402 }
00403
00404
00405 bool DataRetrieverMonitorCollection::EventTypeMqMap::
00406 receivedCorruptedEvent(const stor::DQMEventConsRegPtr dqmEventConsumer)
00407 {
00408 if ( dqmEventConsumer == 0 ) return false;
00409 DQMEventMap::const_iterator pos = dqmEventMap_.find(dqmEventConsumer);
00410 pos->second->corruptedEvents_.addSample(1);
00411 return true;
00412 }
00413
00414
00415 bool DataRetrieverMonitorCollection::EventTypePerConnectionStats::
00416 operator<(const EventTypePerConnectionStats& other) const
00417 {
00418 if ( regPtr->sourceURL() != other.regPtr->sourceURL() )
00419 return ( regPtr->sourceURL() < other.regPtr->sourceURL() );
00420
00421 stor::EventConsRegPtr ecrp =
00422 boost::dynamic_pointer_cast<stor::EventConsumerRegistrationInfo>(regPtr);
00423 stor::EventConsRegPtr ecrpOther =
00424 boost::dynamic_pointer_cast<stor::EventConsumerRegistrationInfo>(other.regPtr);
00425 if ( ecrp && ecrpOther )
00426 return ( *ecrp < *ecrpOther);
00427
00428 stor::DQMEventConsRegPtr dcrp =
00429 boost::dynamic_pointer_cast<stor::DQMEventConsumerRegistrationInfo>(regPtr);
00430 stor::DQMEventConsRegPtr dcrpOther =
00431 boost::dynamic_pointer_cast<stor::DQMEventConsumerRegistrationInfo>(other.regPtr);
00432 if ( dcrp && dcrpOther )
00433 return ( *dcrp < *dcrpOther);
00434
00435 return false;
00436 }
00437
00438
00439 DataRetrieverMonitorCollection::EventMQ::EventMQ
00440 (
00441 const stor::utils::Duration_t& updateInterval
00442 ):
00443 size_(updateInterval, boost::posix_time::seconds(60)),
00444 corruptedEvents_(updateInterval, boost::posix_time::seconds(60))
00445 {}
00446
00447
00448 void DataRetrieverMonitorCollection::EventMQ::getStats(EventStats& stats) const
00449 {
00450 size_.getStats(stats.sizeStats);
00451 corruptedEvents_.getStats(stats.corruptedEventsStats);
00452 }
00453
00454
00455 void DataRetrieverMonitorCollection::EventMQ::calculateStatistics()
00456 {
00457 size_.calculateStatistics();
00458 corruptedEvents_.calculateStatistics();
00459 }
00460
00461
00462 void DataRetrieverMonitorCollection::EventMQ::reset()
00463 {
00464 size_.reset();
00465 corruptedEvents_.reset();
00466 }
00467
00468
00469 DataRetrieverMonitorCollection::DataRetrieverMQ::DataRetrieverMQ
00470 (
00471 const stor::RegPtr regPtr,
00472 const stor::utils::Duration_t& updateInterval
00473 ):
00474 regPtr_(regPtr),
00475 connectionStatus_(UNKNOWN),
00476 eventMQ_(new EventMQ(updateInterval))
00477 {}
00478
00479 }
00480
00481
00482 std::ostream& smproxy::operator<<
00483 (
00484 std::ostream& os,
00485 const DataRetrieverMonitorCollection::ConnectionStatus& status
00486 )
00487 {
00488 switch (status)
00489 {
00490 case DataRetrieverMonitorCollection::CONNECTED :
00491 os << "Connected";
00492 break;
00493 case DataRetrieverMonitorCollection::CONNECTION_FAILED :
00494 os << "Could not connect. SM not running?";
00495 break;
00496 case DataRetrieverMonitorCollection::DISCONNECTED :
00497 os << "Lost connection to SM. Did it fail?";
00498 break;
00499 case DataRetrieverMonitorCollection::UNKNOWN :
00500 os << "unknown";
00501 break;
00502 }
00503
00504 return os;
00505 }
00506
00507