CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_4_1_8_patch9/src/EventFilter/SMProxyServer/src/DataRetrieverMonitorCollection.cc

Go to the documentation of this file.
00001 // $Id: DataRetrieverMonitorCollection.cc,v 1.1.4.2 2011/03/07 12:01:12 mommsen Exp $
00003 
00004 #include <string>
00005 #include <sstream>
00006 #include <iomanip>
00007 
00008 #include <boost/pointer_cast.hpp>
00009 
00010 #include "EventFilter/SMProxyServer/interface/Exception.h"
00011 #include "EventFilter/SMProxyServer/interface/DataRetrieverMonitorCollection.h"
00012 
00013 
00014 namespace smproxy {
00015   
00016   DataRetrieverMonitorCollection::DataRetrieverMonitorCollection
00017   (
00018     const stor::utils::Duration_t& updateInterval
00019   ) :
00020   MonitorCollection(updateInterval),
00021   totalSize_(updateInterval, boost::posix_time::seconds(60)),
00022   updateInterval_(updateInterval),
00023   eventTypeMqMap_(updateInterval)
00024   {}
00025   
00026   
00027   ConnectionID DataRetrieverMonitorCollection::addNewConnection
00028   (
00029     const stor::RegPtr regPtr
00030   )
00031   {
00032     boost::mutex::scoped_lock sl(statsMutex_);
00033     ++nextConnectionId_;
00034     
00035     DataRetrieverMQPtr dataRetrieverMQ( new DataRetrieverMQ(regPtr, updateInterval_) );
00036     retrieverMqMap_.insert(
00037       RetrieverMqMap::value_type(nextConnectionId_, dataRetrieverMQ)
00038     );
00039     
00040     eventTypeMqMap_.insert(regPtr);
00041     
00042     connectionMqMap_.insert(ConnectionMqMap::value_type(regPtr->sourceURL(),
00043         stor::MonitoredQuantityPtr(
00044           new stor::MonitoredQuantity(updateInterval_, boost::posix_time::seconds(60))
00045         )
00046       ));
00047     
00048     return nextConnectionId_;
00049   }
00050   
00051   
00052   bool DataRetrieverMonitorCollection::setConnectionStatus
00053   (
00054     const ConnectionID& connectionId,
00055     const ConnectionStatus& status
00056   )
00057   {
00058     boost::mutex::scoped_lock sl(statsMutex_);
00059     RetrieverMqMap::const_iterator pos = retrieverMqMap_.find(connectionId);
00060     if ( pos == retrieverMqMap_.end() ) return false;
00061     pos->second->connectionStatus_ = status;
00062     return true;
00063   }
00064   
00065   
00066   bool DataRetrieverMonitorCollection::getEventTypeStatsForConnection
00067   (
00068     const ConnectionID& connectionId,
00069     EventTypeStats& stats
00070   )
00071   {
00072     boost::mutex::scoped_lock sl(statsMutex_);
00073     RetrieverMqMap::const_iterator pos = retrieverMqMap_.find(connectionId);
00074     
00075     if ( pos == retrieverMqMap_.end() ) return false;
00076     
00077     stats.regPtr = pos->second->regPtr_;
00078     stats.connectionStatus = pos->second->connectionStatus_;
00079     pos->second->size_.getStats(stats.sizeStats);
00080     
00081     return true;
00082   }
00083   
00084   
00085   bool DataRetrieverMonitorCollection::addRetrievedSample
00086   (
00087     const ConnectionID& connectionId,
00088     const unsigned int& size
00089   )
00090   {
00091     boost::mutex::scoped_lock sl(statsMutex_);
00092     
00093     RetrieverMqMap::const_iterator retrieverPos = retrieverMqMap_.find(connectionId);
00094     if ( retrieverPos == retrieverMqMap_.end() ) return false;
00095     
00096     const double sizeKB = static_cast<double>(size) / 1024;
00097     retrieverPos->second->size_.addSample(sizeKB);
00098     
00099     const stor::RegPtr regPtr = retrieverPos->second->regPtr_;
00100     
00101     eventTypeMqMap_.addSample(regPtr, sizeKB);
00102     
00103     const std::string sourceURL = regPtr->sourceURL();
00104     ConnectionMqMap::const_iterator connectionPos = connectionMqMap_.find(sourceURL);
00105     connectionPos->second->addSample(sizeKB);
00106     
00107     totalSize_.addSample(sizeKB);
00108     
00109     return true;
00110   }
00111   
00112   
00113   void DataRetrieverMonitorCollection::getSummaryStats(SummaryStats& stats) const
00114   {
00115     boost::mutex::scoped_lock sl(statsMutex_);
00116     
00117     stats.registeredSMs = 0;
00118     stats.activeSMs = 0;
00119     
00120     for (RetrieverMqMap::const_iterator it = retrieverMqMap_.begin(),
00121            itEnd = retrieverMqMap_.end(); it != itEnd; ++it)
00122     {
00123       ++stats.registeredSMs;
00124       if ( it->second->connectionStatus_ == CONNECTED )
00125         ++stats.activeSMs;
00126     }
00127     
00128     eventTypeMqMap_.getStats(stats.eventTypeStats);
00129     
00130     totalSize_.getStats(stats.sizeStats);
00131   }
00132   
00133   
00134   void DataRetrieverMonitorCollection::getStatsByConnection(ConnectionStats& cs) const
00135   {
00136     boost::mutex::scoped_lock sl(statsMutex_);
00137     cs.clear();
00138     
00139     for (ConnectionMqMap::const_iterator it = connectionMqMap_.begin(),
00140            itEnd = connectionMqMap_.end(); it != itEnd; ++it)
00141     {
00142       stor::MonitoredQuantity::Stats stats;
00143       it->second->getStats(stats);
00144       cs.insert(ConnectionStats::value_type(it->first, stats));
00145     }
00146   }
00147   
00148   
00149   void DataRetrieverMonitorCollection::getStatsByEventTypes(EventTypeStatList& etsl) const
00150   {
00151     boost::mutex::scoped_lock sl(statsMutex_);
00152     etsl.clear();
00153     
00154     for (RetrieverMqMap::const_iterator it = retrieverMqMap_.begin(),
00155            itEnd = retrieverMqMap_.end(); it != itEnd; ++it)
00156     {
00157       const DataRetrieverMQPtr mq = it->second;
00158       EventTypeStats stats;
00159       stats.regPtr = mq->regPtr_;
00160       stats.connectionStatus = mq->connectionStatus_;
00161       mq->size_.getStats(stats.sizeStats);
00162       etsl.push_back(stats);
00163     }
00164     std::sort(etsl.begin(), etsl.end());
00165   }
00166   
00167   
00168   void DataRetrieverMonitorCollection::do_calculateStatistics()
00169   {
00170     boost::mutex::scoped_lock sl(statsMutex_);
00171     
00172     totalSize_.calculateStatistics();
00173     
00174     for (RetrieverMqMap::const_iterator it = retrieverMqMap_.begin(),
00175            itEnd = retrieverMqMap_.end(); it != itEnd; ++it)
00176     {
00177       it->second->size_.calculateStatistics();
00178     }
00179     
00180     for (ConnectionMqMap::const_iterator it = connectionMqMap_.begin(),
00181            itEnd = connectionMqMap_.end(); it != itEnd; ++it)
00182     {
00183       it->second->calculateStatistics();
00184     }
00185     
00186     eventTypeMqMap_.calculateStatistics();
00187   }
00188   
00189   
00190   void DataRetrieverMonitorCollection::do_reset()
00191   {
00192     boost::mutex::scoped_lock sl(statsMutex_);
00193     totalSize_.reset();
00194     retrieverMqMap_.clear();
00195     connectionMqMap_.clear();
00196     eventTypeMqMap_.clear();
00197   }
00198   
00199   
00200   bool DataRetrieverMonitorCollection::EventTypeMqMap::
00201   insert(const stor::RegPtr consumer)
00202   {
00203     return (
00204       insert(boost::dynamic_pointer_cast<stor::EventConsumerRegistrationInfo>(consumer)) ||
00205       insert(boost::dynamic_pointer_cast<stor::DQMEventConsumerRegistrationInfo>(consumer))
00206     );
00207   }
00208   
00209   
00210   bool DataRetrieverMonitorCollection::EventTypeMqMap::
00211   addSample(const stor::RegPtr consumer, const double& sizeKB)
00212   {
00213     return (
00214       addSample(boost::dynamic_pointer_cast<stor::EventConsumerRegistrationInfo>(consumer), sizeKB) ||
00215       addSample(boost::dynamic_pointer_cast<stor::DQMEventConsumerRegistrationInfo>(consumer), sizeKB)
00216     );
00217   }
00218   
00219   
00220   void DataRetrieverMonitorCollection::EventTypeMqMap::
00221   getStats(SummaryStats::EventTypeStatList& eventTypeStats) const
00222   {
00223     eventTypeStats.clear();
00224     eventTypeStats.reserve(eventMap_.size()+dqmEventMap_.size());
00225     
00226     for (EventMap::const_iterator it = eventMap_.begin(),
00227            itEnd = eventMap_.end(); it != itEnd; ++it)
00228     {
00229       stor::MonitoredQuantity::Stats etStats;
00230       it->second->getStats(etStats);
00231       eventTypeStats.push_back(
00232         std::make_pair(it->first, etStats));
00233     }
00234     
00235     for (DQMEventMap::const_iterator it = dqmEventMap_.begin(),
00236            itEnd = dqmEventMap_.end(); it != itEnd; ++it)
00237     {
00238       stor::MonitoredQuantity::Stats etStats;
00239       it->second->getStats(etStats);
00240       eventTypeStats.push_back(
00241         std::make_pair(it->first, etStats));
00242     }
00243   }
00244   
00245   
00246   void DataRetrieverMonitorCollection::EventTypeMqMap::
00247   calculateStatistics()
00248   {
00249     for (EventMap::iterator it = eventMap_.begin(),
00250            itEnd = eventMap_.end(); it != itEnd; ++it)
00251     {
00252       it->second->calculateStatistics();
00253     }
00254     for (DQMEventMap::iterator it = dqmEventMap_.begin(),
00255            itEnd = dqmEventMap_.end(); it != itEnd; ++it)
00256     {
00257       it->second->calculateStatistics();
00258     }
00259   }
00260   
00261   
00262   void DataRetrieverMonitorCollection::EventTypeMqMap::
00263   clear()
00264   {
00265     eventMap_.clear();
00266     dqmEventMap_.clear();
00267   }
00268   
00269   
00270   bool DataRetrieverMonitorCollection::EventTypeMqMap::
00271   insert(const stor::EventConsRegPtr eventConsumer)
00272   {
00273     if ( eventConsumer == 0 ) return false;
00274     eventMap_.insert(EventMap::value_type(eventConsumer,
00275         stor::MonitoredQuantityPtr(
00276           new stor::MonitoredQuantity( updateInterval_, boost::posix_time::seconds(60) )
00277         )));
00278     return true;
00279   }
00280   
00281   
00282   bool DataRetrieverMonitorCollection::EventTypeMqMap::
00283   insert(const stor::DQMEventConsRegPtr dqmEventConsumer)
00284   {
00285     if ( dqmEventConsumer == 0 ) return false;
00286     dqmEventMap_.insert(DQMEventMap::value_type(dqmEventConsumer,
00287         stor::MonitoredQuantityPtr(
00288           new stor::MonitoredQuantity( updateInterval_, boost::posix_time::seconds(60) )
00289         )));
00290     return true;
00291   }
00292   
00293   
00294   bool DataRetrieverMonitorCollection::EventTypeMqMap::
00295   addSample(const stor::EventConsRegPtr eventConsumer, const double& sizeKB)
00296   {
00297     if ( eventConsumer == 0 ) return false;
00298     EventMap::const_iterator pos = eventMap_.find(eventConsumer);
00299     pos->second->addSample(sizeKB);
00300     return true;
00301   }
00302   
00303   
00304   bool DataRetrieverMonitorCollection::EventTypeMqMap::
00305   addSample(const stor::DQMEventConsRegPtr dqmEventConsumer, const double& sizeKB)
00306   {
00307     if ( dqmEventConsumer == 0 ) return false;
00308     DQMEventMap::const_iterator pos = dqmEventMap_.find(dqmEventConsumer);
00309     pos->second->addSample(sizeKB);
00310     return true;
00311   }
00312   
00313   
00314   bool DataRetrieverMonitorCollection::EventTypeStats::operator<(const EventTypeStats& other) const
00315   {
00316     if ( regPtr->sourceURL() != other.regPtr->sourceURL() )
00317       return ( regPtr->sourceURL() < other.regPtr->sourceURL() );
00318     
00319     stor::EventConsRegPtr ecrp =
00320       boost::dynamic_pointer_cast<stor::EventConsumerRegistrationInfo>(regPtr);
00321     stor::EventConsRegPtr ecrpOther =
00322       boost::dynamic_pointer_cast<stor::EventConsumerRegistrationInfo>(other.regPtr);
00323     if ( ecrp && ecrpOther )
00324       return ( *ecrp < *ecrpOther);
00325     
00326     stor::DQMEventConsRegPtr dcrp =
00327       boost::dynamic_pointer_cast<stor::DQMEventConsumerRegistrationInfo>(regPtr);
00328     stor::DQMEventConsRegPtr dcrpOther =
00329       boost::dynamic_pointer_cast<stor::DQMEventConsumerRegistrationInfo>(other.regPtr);
00330     if ( dcrp && dcrpOther )
00331       return ( *dcrp < *dcrpOther);
00332     
00333     return false;
00334   }
00335   
00336   
00337   DataRetrieverMonitorCollection::DataRetrieverMQ::DataRetrieverMQ
00338   (
00339     const stor::RegPtr regPtr,
00340     const stor::utils::Duration_t& updateInterval
00341   ):
00342   regPtr_(regPtr),
00343   connectionStatus_(UNKNOWN),
00344   size_(updateInterval, boost::posix_time::seconds(60))
00345   {}
00346   
00347 } // namespace smproxy
00348 
00349 
00350 std::ostream& smproxy::operator<<
00351 (
00352   std::ostream& os,
00353   const DataRetrieverMonitorCollection::ConnectionStatus& status
00354 )
00355 {
00356   switch (status)
00357   {
00358     case DataRetrieverMonitorCollection::CONNECTED :
00359       os << "Connected";
00360       break;
00361     case DataRetrieverMonitorCollection::CONNECTION_FAILED :
00362       os << "Could not connect. SM not running?";
00363       break;
00364     case DataRetrieverMonitorCollection::DISCONNECTED :
00365       os << "Lost connection to SM. Did it fail?";
00366       break;
00367     case DataRetrieverMonitorCollection::UNKNOWN :
00368       os << "unknown";
00369       break;
00370   }
00371   
00372   return os;
00373 }
00374 
00375