CMS 3D CMS Logo

/afs/cern.ch/work/a/aaltunda/public/www/CMSSW_5_3_14/src/EventFilter/StorageManager/interface/ThroughputMonitorCollection.h

Go to the documentation of this file.
00001 // $Id: ThroughputMonitorCollection.h,v 1.19 2011/03/07 15:31:32 mommsen Exp $
00003 
00004 #ifndef EventFilter_StorageManager_ThroughputMonitorCollection_h
00005 #define EventFilter_StorageManager_ThroughputMonitorCollection_h
00006 
00007 #include <boost/shared_ptr.hpp>
00008 #include <boost/thread/mutex.hpp>
00009 
00010 #include "toolbox/mem/Pool.h"
00011 #include "xdata/Double.h"
00012 #include "xdata/UnsignedInteger32.h"
00013 
00014 #include "EventFilter/StorageManager/interface/DQMEventQueue.h"
00015 #include "EventFilter/StorageManager/interface/FragmentQueue.h"
00016 #include "EventFilter/StorageManager/interface/MonitorCollection.h"
00017 #include "EventFilter/StorageManager/interface/StreamQueue.h"
00018 #include "EventFilter/StorageManager/interface/Utils.h"
00019 
00020 namespace stor {
00021 
00031   class ThroughputMonitorCollection : public MonitorCollection
00032   {
00033   public:
00034 
00035     explicit ThroughputMonitorCollection
00036     (
00037       const utils::Duration_t& updateInterval,
00038       const unsigned int& throuphputAveragingCycles
00039     );
00040 
00041     int getBinCount() const {return binCount_;}
00042 
00047     void setMemoryPoolPointer(toolbox::mem::Pool*);
00048 
00049     void setFragmentQueue(FragmentQueuePtr fragmentQueue) {
00050       fragmentQueue_ = fragmentQueue;
00051     }
00052 
00053     const MonitoredQuantity& getPoolUsageMQ() const {
00054       return poolUsageMQ_;
00055     }
00056     MonitoredQuantity& getPoolUsageMQ() {
00057       return poolUsageMQ_;
00058     }
00059  
00060     const MonitoredQuantity& getFragmentQueueEntryCountMQ() const {
00061       return entriesInFragmentQueueMQ_;
00062     }
00063     MonitoredQuantity& getFragmentQueueEntryCountMQ() {
00064       return entriesInFragmentQueueMQ_;
00065     }
00066  
00067     const MonitoredQuantity& getFragmentQueueMemoryUsedMQ() const {
00068       return memoryUsedInFragmentQueueMQ_;
00069     }
00070     MonitoredQuantity& getFragmentQueueMemoryUsedMQ() {
00071       return memoryUsedInFragmentQueueMQ_;
00072     }
00073 
00074     void addPoppedFragmentSample(double dataSize);
00075 
00076     const MonitoredQuantity& getPoppedFragmentSizeMQ() const {
00077       return poppedFragmentSizeMQ_;
00078     }
00079     MonitoredQuantity& getPoppedFragmentSizeMQ() {
00080       return poppedFragmentSizeMQ_;
00081     }
00082 
00083     void addFragmentProcessorIdleSample(utils::Duration_t idleTime);
00084 
00085     const MonitoredQuantity& getFragmentProcessorIdleMQ() const {
00086       return fragmentProcessorIdleTimeMQ_;
00087     }
00088     MonitoredQuantity& getFragmentProcessorIdleMQ() {
00089       return fragmentProcessorIdleTimeMQ_;
00090     }
00091 
00092     const MonitoredQuantity& getFragmentStoreEntryCountMQ() const {
00093       return entriesInFragmentStoreMQ_;
00094     }
00095     MonitoredQuantity& getFragmentStoreEntryCountMQ() {
00096       return entriesInFragmentStoreMQ_;
00097     }
00098 
00099     const MonitoredQuantity& getFragmentStoreMemoryUsedMQ() const {
00100       return memoryUsedInFragmentStoreMQ_;
00101     }
00102     MonitoredQuantity& getFragmentStoreMemoryUsedMQ() {
00103       return memoryUsedInFragmentStoreMQ_;
00104     }
00105 
00106     void setStreamQueue(StreamQueuePtr streamQueue) {
00107       streamQueue_ = streamQueue;
00108     }
00109 
00110     const MonitoredQuantity& getStreamQueueEntryCountMQ() const {
00111       return entriesInStreamQueueMQ_;
00112     }
00113     MonitoredQuantity& getStreamQueueEntryCountMQ() {
00114       return entriesInStreamQueueMQ_;
00115     }
00116 
00117     const MonitoredQuantity& getStreamQueueMemoryUsedMQ() const {
00118       return memoryUsedInStreamQueueMQ_;
00119     }
00120     MonitoredQuantity& getStreamQueueMemoryUsedMQ() {
00121       return memoryUsedInStreamQueueMQ_;
00122     }
00123 
00124     void addPoppedEventSample(double dataSize);
00125 
00126     const MonitoredQuantity& getPoppedEventSizeMQ() const {
00127       return poppedEventSizeMQ_;
00128     }
00129     MonitoredQuantity& getPoppedEventSizeMQ() {
00130       return poppedEventSizeMQ_;
00131     }
00132 
00133     void addDiskWriterIdleSample(utils::Duration_t idleTime);
00134 
00135     const MonitoredQuantity& getDiskWriterIdleMQ() const {
00136       return diskWriterIdleTimeMQ_;
00137     }
00138     MonitoredQuantity& getDiskWriterIdleMQ() {
00139       return diskWriterIdleTimeMQ_;
00140     }
00141 
00142     void addDiskWriteSample(double dataSize);
00143 
00144     const MonitoredQuantity& getDiskWriteMQ() const {
00145       return diskWriteSizeMQ_;
00146     }
00147     MonitoredQuantity& getDiskWriteMQ() {
00148       return diskWriteSizeMQ_;
00149     }
00150 
00151     void setDQMEventQueue(DQMEventQueuePtr dqmEventQueue) {
00152       dqmEventQueue_ = dqmEventQueue;
00153     }
00154 
00155     const MonitoredQuantity& getDQMEventQueueEntryCountMQ() const {
00156       return entriesInDQMEventQueueMQ_;
00157     }
00158     MonitoredQuantity& getDQMEventQueueEntryCountMQ() {
00159       return entriesInDQMEventQueueMQ_;
00160     }
00161 
00162     const MonitoredQuantity& getDQMEventQueueMemoryUsedMQ() const {
00163       return memoryUsedInDQMEventQueueMQ_;
00164     }
00165     MonitoredQuantity& getDQMEventQueueMemoryUsedMQ() {
00166       return memoryUsedInDQMEventQueueMQ_;
00167     }
00168 
00169     void addPoppedDQMEventSample(double dataSize);
00170 
00171     const MonitoredQuantity& getPoppedDQMEventSizeMQ() const {
00172       return poppedDQMEventSizeMQ_;
00173     }
00174     MonitoredQuantity& getPoppedDQMEventSizeMQ() {
00175       return poppedDQMEventSizeMQ_;
00176     }
00177 
00178     void addDQMEventProcessorIdleSample(utils::Duration_t idleTime);
00179 
00180     const MonitoredQuantity& getDQMEventProcessorIdleMQ() const {
00181       return dqmEventProcessorIdleTimeMQ_;
00182     }
00183     MonitoredQuantity& getDQMEventProcessorIdleMQ() {
00184       return dqmEventProcessorIdleTimeMQ_;
00185     }
00186 
00190     inline void setFragmentStoreSize(unsigned int size) {
00191       currentFragmentStoreSize_ = size;
00192     }
00193 
00197     inline void setFragmentStoreMemoryUsed(size_t memoryUsed) {
00198       currentFragmentStoreMemoryUsedMB_ = static_cast<double>(memoryUsed) / (1024*1024);
00199     }
00200 
00201     struct Stats
00202     {
00203 
00204       struct Snapshot
00205       {
00206         utils::Duration_t duration;
00207         utils::TimePoint_t absoluteTime;
00208         double poolUsage; //bytes
00209         double entriesInFragmentQueue;
00210         double memoryUsedInFragmentQueue; //MB
00211         double fragmentQueueRate; //Hz
00212         double fragmentQueueBandwidth; //MB/s
00213         double fragmentStoreSize;
00214         double fragmentStoreMemoryUsed; //MB
00215         double entriesInStreamQueue;
00216         double memoryUsedInStreamQueue; //MB
00217         double streamQueueRate; //Hz
00218         double streamQueueBandwidth; //MB/s
00219         double writtenEventsRate; //Hz
00220         double writtenEventsBandwidth; //MB/s
00221         double entriesInDQMQueue;
00222         double memoryUsedInDQMQueue; //MB
00223         double dqmQueueRate; //Hz
00224         double dqmQueueBandwidth; //MB/s
00225         
00226         double fragmentProcessorBusy; //%
00227         double diskWriterBusy; //%
00228         double dqmEventProcessorBusy; //%
00229 
00230         Snapshot();
00231         Snapshot operator=(const Snapshot&);
00232         Snapshot operator+=(const Snapshot&);
00233         Snapshot operator/=(const double&);
00234 
00235       };
00236       
00237       typedef std::vector<Snapshot> Snapshots;
00238       Snapshots snapshots; // time sorted with newest entry first
00239       Snapshot average;
00240 
00241       void reset();
00242     };
00243 
00247     void getStats(Stats&) const;
00248 
00252     void getStats(Stats&, const unsigned int sampleCount) const;
00253 
00254 
00255   private:
00256 
00257     //Prevent copying of the ThroughputMonitorCollection
00258     ThroughputMonitorCollection(ThroughputMonitorCollection const&);
00259     ThroughputMonitorCollection& operator=(ThroughputMonitorCollection const&);
00260 
00261     virtual void do_calculateStatistics();
00262     virtual void do_reset();
00263     virtual void do_appendInfoSpaceItems(InfoSpaceItems&);
00264     virtual void do_updateInfoSpaceItems();
00265 
00266     void do_getStats(Stats&, const unsigned int sampleCount) const;
00267 
00275     void smoothIdleTimes(MonitoredQuantity::Stats&) const;
00276 
00277     int smoothIdleTimesHelper
00278     (
00279       std::vector<double>& idleTimes,
00280       std::vector<utils::Duration_t>& durations,
00281       int firstIndex, int lastIndex
00282     ) const;
00283 
00284     void getRateAndBandwidth
00285     (
00286       MonitoredQuantity::Stats& stats,
00287       const int& idx,
00288       double& rate,
00289       double& bandwidth
00290     ) const;
00291 
00292     double calcBusyPercentage(
00293       MonitoredQuantity::Stats&,
00294       const int& idx
00295     ) const;
00296 
00297     void calcPoolUsage();
00298 
00299     const unsigned int binCount_;
00300     mutable boost::mutex statsMutex_;
00301 
00302     MonitoredQuantity poolUsageMQ_;
00303     MonitoredQuantity entriesInFragmentQueueMQ_;
00304     MonitoredQuantity memoryUsedInFragmentQueueMQ_;
00305     MonitoredQuantity poppedFragmentSizeMQ_;
00306     MonitoredQuantity fragmentProcessorIdleTimeMQ_;
00307     MonitoredQuantity entriesInFragmentStoreMQ_;
00308     MonitoredQuantity memoryUsedInFragmentStoreMQ_;
00309 
00310     MonitoredQuantity entriesInStreamQueueMQ_;
00311     MonitoredQuantity memoryUsedInStreamQueueMQ_;
00312     MonitoredQuantity poppedEventSizeMQ_;
00313     MonitoredQuantity diskWriterIdleTimeMQ_;
00314     MonitoredQuantity diskWriteSizeMQ_;
00315 
00316     MonitoredQuantity entriesInDQMEventQueueMQ_;
00317     MonitoredQuantity memoryUsedInDQMEventQueueMQ_;
00318     MonitoredQuantity poppedDQMEventSizeMQ_;
00319     MonitoredQuantity dqmEventProcessorIdleTimeMQ_;
00320 
00321     FragmentQueuePtr fragmentQueue_;
00322     StreamQueuePtr streamQueue_;
00323     DQMEventQueuePtr dqmEventQueue_;
00324 
00325     unsigned int currentFragmentStoreSize_;
00326     double currentFragmentStoreMemoryUsedMB_;
00327     unsigned int throuphputAveragingCycles_;
00328 
00329     toolbox::mem::Pool* pool_;
00330 
00331     xdata::UnsignedInteger32 poolUsage_;                 //I2O message pool usage in bytes
00332     xdata::UnsignedInteger32 entriesInFragmentQueue_;    //Instantaneous number of fragments in fragment queue
00333     xdata::Double            memoryUsedInFragmentQueue_; //Instantaneous memory usage of events in fragment queue (MB)
00334     xdata::Double            fragmentQueueRate_;         //Rate of fragments popped from fragment queue
00335     xdata::Double            fragmentQueueBandwidth_;    //Bandwidth of fragments popped from fragment queue (MB/s)
00336     xdata::UnsignedInteger32 fragmentStoreSize_;         //Instantaneous number of fragments in fragment store
00337     xdata::Double            fragmentStoreMemoryUsed_;   //Instantaneous memory usage of events in fragment store (MB)
00338     xdata::UnsignedInteger32 entriesInStreamQueue_;      //Instantaneous number of events in stream queue
00339     xdata::Double            memoryUsedInStreamQueue_;   //Instantaneous memory usage of events in stream queue (MB)
00340     xdata::Double            streamQueueRate_;           //Rate of events popped from fragment queue
00341     xdata::Double            streamQueueBandwidth_;      //Bandwidth of events popped from fragment queue (MB/s)
00342     xdata::Double            writtenEventsRate_;         //Rate of (non-unique) events written to disk
00343     xdata::Double            writtenEventsBandwidth_;    //Bandwidth of (non-unique) events written to disk
00344     xdata::UnsignedInteger32 entriesInDQMQueue_;         //Instantaneous number of events in dqm event queue
00345     xdata::Double            memoryUsedInDQMQueue_;      //Instantaneous memory usage of events in dqm event queue (MB)
00346     xdata::Double            dqmQueueRate_;              //Rate of events popped from dqm event queue
00347     xdata::Double            dqmQueueBandwidth_;         //Bandwidth of events popped from dqm event queue (MB/s)
00348 
00349     xdata::Double            fragmentProcessorBusy_;     //Fragment processor busy percentage
00350     xdata::Double            diskWriterBusy_;            //Disk writer busy percentage
00351     xdata::Double            dqmEventProcessorBusy_;     //DQM event processor busy percentage
00352     xdata::Double            averagingTime_;                    //Time in s over which above values are averaged
00353   };
00354   
00355 } // namespace stor
00356 
00357 #endif // EventFilter_StorageManager_ThroughputMonitorCollection_h 
00358 
00359