CMS 3D CMS Logo

/afs/cern.ch/work/a/aaltunda/public/www/CMSSW_6_2_5/src/EventFilter/SMProxyServer/src/DataRetrieverMonitorCollection.cc

Go to the documentation of this file.
00001 // $Id: DataRetrieverMonitorCollection.cc,v 1.3 2011/05/09 11:03:34 mommsen Exp $
00003 
00004 #include <string>
00005 #include <sstream>
00006 #include <iomanip>
00007 
00008 #include <boost/pointer_cast.hpp>
00009 
00010 #include "EventFilter/SMProxyServer/interface/Exception.h"
00011 #include "EventFilter/SMProxyServer/interface/DataRetrieverMonitorCollection.h"
00012 
00013 
00014 namespace smproxy {
00015   
00016   DataRetrieverMonitorCollection::DataRetrieverMonitorCollection
00017   (
00018     const stor::utils::Duration_t& updateInterval,
00019     stor::AlarmHandlerPtr alarmHandler
00020   ) :
00021   MonitorCollection(updateInterval),
00022   updateInterval_(updateInterval),
00023   alarmHandler_(alarmHandler),
00024   totals_(updateInterval),
00025   eventTypeMqMap_(updateInterval)
00026   {}
00027   
00028   
00029   ConnectionID DataRetrieverMonitorCollection::addNewConnection
00030   (
00031     const stor::RegPtr regPtr
00032   )
00033   {
00034     boost::mutex::scoped_lock sl(statsMutex_);
00035     ++nextConnectionId_;
00036     
00037     DataRetrieverMQPtr dataRetrieverMQ( new DataRetrieverMQ(regPtr, updateInterval_) );
00038     retrieverMqMap_.insert(
00039       RetrieverMqMap::value_type(nextConnectionId_, dataRetrieverMQ)
00040     );
00041     
00042     eventTypeMqMap_.insert(regPtr);
00043     
00044     connectionMqMap_.insert(ConnectionMqMap::value_type(
00045         regPtr->sourceURL(),
00046         EventMQPtr(new EventMQ(updateInterval_))
00047       ));
00048     
00049     return nextConnectionId_;
00050   }
00051   
00052   
00053   bool DataRetrieverMonitorCollection::setConnectionStatus
00054   (
00055     const ConnectionID& connectionId,
00056     const ConnectionStatus& status
00057   )
00058   {
00059     boost::mutex::scoped_lock sl(statsMutex_);
00060     RetrieverMqMap::const_iterator pos = retrieverMqMap_.find(connectionId);
00061     if ( pos == retrieverMqMap_.end() ) return false;
00062     pos->second->connectionStatus_ = status;
00063     return true;
00064   }
00065   
00066   
00067   bool DataRetrieverMonitorCollection::getEventTypeStatsForConnection
00068   (
00069     const ConnectionID& connectionId,
00070     EventTypePerConnectionStats& stats
00071   )
00072   {
00073     boost::mutex::scoped_lock sl(statsMutex_);
00074     RetrieverMqMap::const_iterator pos = retrieverMqMap_.find(connectionId);
00075     
00076     if ( pos == retrieverMqMap_.end() ) return false;
00077     
00078     stats.regPtr = pos->second->regPtr_;
00079     stats.connectionStatus = pos->second->connectionStatus_;
00080     pos->second->eventMQ_->getStats(stats.eventStats);
00081     
00082     return true;
00083   }
00084   
00085   
00086   bool DataRetrieverMonitorCollection::addRetrievedSample
00087   (
00088     const ConnectionID& connectionId,
00089     const unsigned int& size
00090   )
00091   {
00092     boost::mutex::scoped_lock sl(statsMutex_);
00093     
00094     RetrieverMqMap::const_iterator retrieverPos = retrieverMqMap_.find(connectionId);
00095     if ( retrieverPos == retrieverMqMap_.end() ) return false;
00096     
00097     const double sizeKB = static_cast<double>(size) / 1024;
00098     retrieverPos->second->eventMQ_->size_.addSample(sizeKB);
00099     
00100     const stor::RegPtr regPtr = retrieverPos->second->regPtr_;
00101     
00102     eventTypeMqMap_.addSample(regPtr, sizeKB);
00103     
00104     const std::string sourceURL = regPtr->sourceURL();
00105     ConnectionMqMap::const_iterator connectionPos = connectionMqMap_.find(sourceURL);
00106     connectionPos->second->size_.addSample(sizeKB);
00107     
00108     totals_.size_.addSample(sizeKB);
00109     
00110     return true;
00111   }
00112   
00113   
00114   bool DataRetrieverMonitorCollection::receivedCorruptedEvent
00115   (
00116     const ConnectionID& connectionId
00117   )
00118   {
00119     boost::mutex::scoped_lock sl(statsMutex_);
00120     
00121     RetrieverMqMap::const_iterator retrieverPos = retrieverMqMap_.find(connectionId);
00122     if ( retrieverPos == retrieverMqMap_.end() ) return false;
00123     
00124     retrieverPos->second->eventMQ_->corruptedEvents_.addSample(1);
00125     
00126     const stor::RegPtr regPtr = retrieverPos->second->regPtr_;
00127     
00128     eventTypeMqMap_.receivedCorruptedEvent(regPtr);
00129     
00130     const std::string sourceURL = regPtr->sourceURL();
00131     ConnectionMqMap::const_iterator connectionPos = connectionMqMap_.find(sourceURL);
00132     connectionPos->second->corruptedEvents_.addSample(1);
00133     
00134     totals_.corruptedEvents_.addSample(1);
00135 
00136     return true;
00137   }
00138   
00139   
00140   void DataRetrieverMonitorCollection::getSummaryStats(SummaryStats& stats) const
00141   {
00142     boost::mutex::scoped_lock sl(statsMutex_);
00143     
00144     stats.registeredSMs = 0;
00145     stats.activeSMs = 0;
00146     
00147     for (RetrieverMqMap::const_iterator it = retrieverMqMap_.begin(),
00148            itEnd = retrieverMqMap_.end(); it != itEnd; ++it)
00149     {
00150       ++stats.registeredSMs;
00151       if ( it->second->connectionStatus_ == CONNECTED )
00152         ++stats.activeSMs;
00153     }
00154     
00155     eventTypeMqMap_.getStats(stats.eventTypeStats);
00156     
00157     totals_.getStats(stats.totals);
00158   }
00159   
00160   
00161   void DataRetrieverMonitorCollection::getStatsByConnection(ConnectionStats& cs) const
00162   {
00163     boost::mutex::scoped_lock sl(statsMutex_);
00164     cs.clear();
00165     
00166     for (ConnectionMqMap::const_iterator it = connectionMqMap_.begin(),
00167            itEnd = connectionMqMap_.end(); it != itEnd; ++it)
00168     {
00169       EventStats stats;
00170       it->second->getStats(stats);
00171       cs.insert(ConnectionStats::value_type(it->first, stats));
00172     }
00173   }
00174   
00175   
00176   void DataRetrieverMonitorCollection::getStatsByEventTypesPerConnection
00177   (
00178     EventTypePerConnectionStatList& etsl
00179   ) const
00180   {
00181     boost::mutex::scoped_lock sl(statsMutex_);
00182     etsl.clear();
00183     
00184     for (RetrieverMqMap::const_iterator it = retrieverMqMap_.begin(),
00185            itEnd = retrieverMqMap_.end(); it != itEnd; ++it)
00186     {
00187       const DataRetrieverMQPtr mq = it->second;
00188       EventTypePerConnectionStats stats;
00189       stats.regPtr = mq->regPtr_;
00190       stats.connectionStatus = mq->connectionStatus_;
00191       mq->eventMQ_->getStats(stats.eventStats);
00192       etsl.push_back(stats);
00193     }
00194     std::sort(etsl.begin(), etsl.end());
00195   }
00196   
00197   
00198   void DataRetrieverMonitorCollection::configureAlarms(AlarmParams const& alarmParams)
00199   {
00200     alarmParams_ = alarmParams;
00201   }
00202   
00203   
00204   void DataRetrieverMonitorCollection::sendAlarms()
00205   {
00206     if ( ! alarmParams_.sendAlarms_ ) return;
00207     
00208     checkForCorruptedEvents();
00209   }
00210   
00211   
00212   void DataRetrieverMonitorCollection::checkForCorruptedEvents()
00213   {
00214     const std::string alarmName = "CorruptedEvents";
00215     
00216     EventStats eventStats;
00217     totals_.getStats(eventStats);
00218     const double corruptedEventRate =
00219       eventStats.corruptedEventsStats.getValueRate(stor::MonitoredQuantity::RECENT);
00220     if ( corruptedEventRate > alarmParams_.corruptedEventRate_ )
00221     {
00222       std::ostringstream msg;
00223       msg << "Received " << corruptedEventRate << " Hz of corrupted events from StorageManagers.";
00224       XCEPT_DECLARE(exception::CorruptedEvents, ex, msg.str());
00225       alarmHandler_->raiseAlarm(alarmName, stor::AlarmHandler::ERROR, ex);
00226     }
00227     else if ( corruptedEventRate < (alarmParams_.corruptedEventRate_ * 0.9) )
00228       // avoid revoking the alarm if we're close to the limit
00229     {
00230       alarmHandler_->revokeAlarm(alarmName);
00231     }
00232   }
00233   
00234   
00235   void DataRetrieverMonitorCollection::do_calculateStatistics()
00236   {
00237     boost::mutex::scoped_lock sl(statsMutex_);
00238     
00239     totals_.calculateStatistics();
00240     
00241     for (RetrieverMqMap::const_iterator it = retrieverMqMap_.begin(),
00242            itEnd = retrieverMqMap_.end(); it != itEnd; ++it)
00243     {
00244       it->second->eventMQ_->calculateStatistics();
00245     }
00246     
00247     for (ConnectionMqMap::const_iterator it = connectionMqMap_.begin(),
00248            itEnd = connectionMqMap_.end(); it != itEnd; ++it)
00249     {
00250       it->second->calculateStatistics();
00251     }
00252     
00253     eventTypeMqMap_.calculateStatistics();
00254 
00255     sendAlarms();
00256   }
00257   
00258   
00259   void DataRetrieverMonitorCollection::do_reset()
00260   {
00261     boost::mutex::scoped_lock sl(statsMutex_);
00262     totals_.reset();
00263     retrieverMqMap_.clear();
00264     connectionMqMap_.clear();
00265     eventTypeMqMap_.clear();
00266   }
00267   
00268   
00269   bool DataRetrieverMonitorCollection::EventTypeMqMap::
00270   insert(const stor::RegPtr consumer)
00271   {
00272     return (
00273       insert(boost::dynamic_pointer_cast<stor::EventConsumerRegistrationInfo>(consumer)) ||
00274       insert(boost::dynamic_pointer_cast<stor::DQMEventConsumerRegistrationInfo>(consumer))
00275     );
00276   }
00277   
00278   
00279   bool DataRetrieverMonitorCollection::EventTypeMqMap::
00280   addSample(const stor::RegPtr consumer, const double& sizeKB)
00281   {
00282     return (
00283       addSample(boost::dynamic_pointer_cast<stor::EventConsumerRegistrationInfo>(consumer), sizeKB) ||
00284       addSample(boost::dynamic_pointer_cast<stor::DQMEventConsumerRegistrationInfo>(consumer), sizeKB)
00285     );
00286   }
00287   
00288   
00289   bool DataRetrieverMonitorCollection::EventTypeMqMap::
00290   receivedCorruptedEvent(const stor::RegPtr consumer)
00291   {
00292     return (
00293       receivedCorruptedEvent(boost::dynamic_pointer_cast<stor::EventConsumerRegistrationInfo>(consumer)) ||
00294       receivedCorruptedEvent(boost::dynamic_pointer_cast<stor::DQMEventConsumerRegistrationInfo>(consumer))
00295     );
00296   }
00297   
00298   
00299   void DataRetrieverMonitorCollection::EventTypeMqMap::
00300   getStats(SummaryStats::EventTypeStatList& eventTypeStats) const
00301   {
00302     eventTypeStats.clear();
00303     eventTypeStats.reserve(eventMap_.size()+dqmEventMap_.size());
00304     
00305     for (EventMap::const_iterator it = eventMap_.begin(),
00306            itEnd = eventMap_.end(); it != itEnd; ++it)
00307     {
00308       EventStats eventStats;
00309       it->second->size_.getStats(eventStats.sizeStats);
00310       it->second->corruptedEvents_.getStats(eventStats.corruptedEventsStats);
00311       eventTypeStats.push_back(
00312         std::make_pair(it->first, eventStats));
00313     }
00314     
00315     for (DQMEventMap::const_iterator it = dqmEventMap_.begin(),
00316            itEnd = dqmEventMap_.end(); it != itEnd; ++it)
00317     {
00318       EventStats eventStats;
00319       it->second->size_.getStats(eventStats.sizeStats);
00320       it->second->corruptedEvents_.getStats(eventStats.corruptedEventsStats);
00321       eventTypeStats.push_back(
00322         std::make_pair(it->first, eventStats));
00323     }
00324   }
00325   
00326   
00327   void DataRetrieverMonitorCollection::EventTypeMqMap::
00328   calculateStatistics()
00329   {
00330     for (EventMap::iterator it = eventMap_.begin(),
00331            itEnd = eventMap_.end(); it != itEnd; ++it)
00332     {
00333       it->second->size_.calculateStatistics();
00334       it->second->corruptedEvents_.calculateStatistics();
00335     }
00336     for (DQMEventMap::iterator it = dqmEventMap_.begin(),
00337            itEnd = dqmEventMap_.end(); it != itEnd; ++it)
00338     {
00339       it->second->size_.calculateStatistics();
00340       it->second->corruptedEvents_.calculateStatistics();
00341     }
00342   }
00343   
00344   
00345   void DataRetrieverMonitorCollection::EventTypeMqMap::
00346   clear()
00347   {
00348     eventMap_.clear();
00349     dqmEventMap_.clear();
00350   }
00351   
00352   
00353   bool DataRetrieverMonitorCollection::EventTypeMqMap::
00354   insert(const stor::EventConsRegPtr eventConsumer)
00355   {
00356     if ( eventConsumer == 0 ) return false;
00357     eventMap_.insert(EventMap::value_type(eventConsumer,
00358         EventMQPtr( new EventMQ(updateInterval_) )
00359       ));
00360     return true;
00361   }
00362   
00363   
00364   bool DataRetrieverMonitorCollection::EventTypeMqMap::
00365   insert(const stor::DQMEventConsRegPtr dqmEventConsumer)
00366   {
00367     if ( dqmEventConsumer == 0 ) return false;
00368     dqmEventMap_.insert(DQMEventMap::value_type(dqmEventConsumer,
00369         EventMQPtr( new EventMQ(updateInterval_) )
00370       ));
00371     return true;
00372   }
00373   
00374   
00375   bool DataRetrieverMonitorCollection::EventTypeMqMap::
00376   addSample(const stor::EventConsRegPtr eventConsumer, const double& sizeKB)
00377   {
00378     if ( eventConsumer == 0 ) return false;
00379     EventMap::const_iterator pos = eventMap_.find(eventConsumer);
00380     pos->second->size_.addSample(sizeKB);
00381     return true;
00382   }
00383   
00384   
00385   bool DataRetrieverMonitorCollection::EventTypeMqMap::
00386   addSample(const stor::DQMEventConsRegPtr dqmEventConsumer, const double& sizeKB)
00387   {
00388     if ( dqmEventConsumer == 0 ) return false;
00389     DQMEventMap::const_iterator pos = dqmEventMap_.find(dqmEventConsumer);
00390     pos->second->size_.addSample(sizeKB);
00391     return true;
00392   }
00393   
00394   
00395   bool DataRetrieverMonitorCollection::EventTypeMqMap::
00396   receivedCorruptedEvent(const stor::EventConsRegPtr eventConsumer)
00397   {
00398     if ( eventConsumer == 0 ) return false;
00399     EventMap::const_iterator pos = eventMap_.find(eventConsumer);
00400     pos->second->corruptedEvents_.addSample(1);
00401     return true;
00402   }
00403   
00404   
00405   bool DataRetrieverMonitorCollection::EventTypeMqMap::
00406   receivedCorruptedEvent(const stor::DQMEventConsRegPtr dqmEventConsumer)
00407   {
00408     if ( dqmEventConsumer == 0 ) return false;
00409     DQMEventMap::const_iterator pos = dqmEventMap_.find(dqmEventConsumer);
00410     pos->second->corruptedEvents_.addSample(1);
00411     return true;
00412   }
00413   
00414   
00415   bool DataRetrieverMonitorCollection::EventTypePerConnectionStats::
00416   operator<(const EventTypePerConnectionStats& other) const
00417   {
00418     if ( regPtr->sourceURL() != other.regPtr->sourceURL() )
00419       return ( regPtr->sourceURL() < other.regPtr->sourceURL() );
00420     
00421     stor::EventConsRegPtr ecrp =
00422       boost::dynamic_pointer_cast<stor::EventConsumerRegistrationInfo>(regPtr);
00423     stor::EventConsRegPtr ecrpOther =
00424       boost::dynamic_pointer_cast<stor::EventConsumerRegistrationInfo>(other.regPtr);
00425     if ( ecrp && ecrpOther )
00426       return ( *ecrp < *ecrpOther);
00427     
00428     stor::DQMEventConsRegPtr dcrp =
00429       boost::dynamic_pointer_cast<stor::DQMEventConsumerRegistrationInfo>(regPtr);
00430     stor::DQMEventConsRegPtr dcrpOther =
00431       boost::dynamic_pointer_cast<stor::DQMEventConsumerRegistrationInfo>(other.regPtr);
00432     if ( dcrp && dcrpOther )
00433       return ( *dcrp < *dcrpOther);
00434     
00435     return false;
00436   }
00437   
00438   
00439   DataRetrieverMonitorCollection::EventMQ::EventMQ
00440   (
00441     const stor::utils::Duration_t& updateInterval
00442   ):
00443   size_(updateInterval, boost::posix_time::seconds(60)),
00444   corruptedEvents_(updateInterval, boost::posix_time::seconds(60))
00445   {}
00446 
00447 
00448   void DataRetrieverMonitorCollection::EventMQ::getStats(EventStats& stats) const
00449   {
00450     size_.getStats(stats.sizeStats);
00451     corruptedEvents_.getStats(stats.corruptedEventsStats);
00452   }
00453   
00454   
00455   void DataRetrieverMonitorCollection::EventMQ::calculateStatistics()
00456   {
00457     size_.calculateStatistics();
00458     corruptedEvents_.calculateStatistics();
00459   }
00460   
00461   
00462   void DataRetrieverMonitorCollection::EventMQ::reset()
00463   {
00464     size_.reset();
00465     corruptedEvents_.reset();
00466   }
00467   
00468   
00469   DataRetrieverMonitorCollection::DataRetrieverMQ::DataRetrieverMQ
00470   (
00471     const stor::RegPtr regPtr,
00472     const stor::utils::Duration_t& updateInterval
00473   ):
00474   regPtr_(regPtr),
00475   connectionStatus_(UNKNOWN),
00476   eventMQ_(new EventMQ(updateInterval))
00477   {}
00478   
00479 } // namespace smproxy
00480 
00481 
00482 std::ostream& smproxy::operator<<
00483 (
00484   std::ostream& os,
00485   const DataRetrieverMonitorCollection::ConnectionStatus& status
00486 )
00487 {
00488   switch (status)
00489   {
00490     case DataRetrieverMonitorCollection::CONNECTED :
00491       os << "Connected";
00492       break;
00493     case DataRetrieverMonitorCollection::CONNECTION_FAILED :
00494       os << "Could not connect. SM not running?";
00495       break;
00496     case DataRetrieverMonitorCollection::DISCONNECTED :
00497       os << "Lost connection to SM. Did it fail?";
00498       break;
00499     case DataRetrieverMonitorCollection::UNKNOWN :
00500       os << "unknown";
00501       break;
00502   }
00503   
00504   return os;
00505 }
00506 
00507