Go to the documentation of this file.00001
00003
00004 #ifndef EventFilter_StorageManager_ThroughputMonitorCollection_h
00005 #define EventFilter_StorageManager_ThroughputMonitorCollection_h
00006
00007 #include <boost/shared_ptr.hpp>
00008 #include <boost/thread/mutex.hpp>
00009
00010 #include "toolbox/mem/Pool.h"
00011 #include "xdata/Double.h"
00012 #include "xdata/UnsignedInteger32.h"
00013
00014 #include "EventFilter/StorageManager/interface/DQMEventQueue.h"
00015 #include "EventFilter/StorageManager/interface/FragmentQueue.h"
00016 #include "EventFilter/StorageManager/interface/MonitorCollection.h"
00017 #include "EventFilter/StorageManager/interface/StreamQueue.h"
00018 #include "EventFilter/StorageManager/interface/Utils.h"
00019
00020 namespace stor {
00021
00031 class ThroughputMonitorCollection : public MonitorCollection
00032 {
00033 public:
00034
00035 explicit ThroughputMonitorCollection
00036 (
00037 const utils::Duration_t& updateInterval,
00038 const unsigned int& throuphputAveragingCycles
00039 );
00040
00041 int getBinCount() const {return binCount_;}
00042
00047 void setMemoryPoolPointer(toolbox::mem::Pool*);
00048
00049 void setFragmentQueue(FragmentQueuePtr fragmentQueue) {
00050 fragmentQueue_ = fragmentQueue;
00051 }
00052
00053 const MonitoredQuantity& getPoolUsageMQ() const {
00054 return poolUsageMQ_;
00055 }
00056 MonitoredQuantity& getPoolUsageMQ() {
00057 return poolUsageMQ_;
00058 }
00059
00060 const MonitoredQuantity& getFragmentQueueEntryCountMQ() const {
00061 return entriesInFragmentQueueMQ_;
00062 }
00063 MonitoredQuantity& getFragmentQueueEntryCountMQ() {
00064 return entriesInFragmentQueueMQ_;
00065 }
00066
00067 const MonitoredQuantity& getFragmentQueueMemoryUsedMQ() const {
00068 return memoryUsedInFragmentQueueMQ_;
00069 }
00070 MonitoredQuantity& getFragmentQueueMemoryUsedMQ() {
00071 return memoryUsedInFragmentQueueMQ_;
00072 }
00073
00074 void addPoppedFragmentSample(double dataSize);
00075
00076 const MonitoredQuantity& getPoppedFragmentSizeMQ() const {
00077 return poppedFragmentSizeMQ_;
00078 }
00079 MonitoredQuantity& getPoppedFragmentSizeMQ() {
00080 return poppedFragmentSizeMQ_;
00081 }
00082
00083 void addFragmentProcessorIdleSample(utils::Duration_t idleTime);
00084
00085 const MonitoredQuantity& getFragmentProcessorIdleMQ() const {
00086 return fragmentProcessorIdleTimeMQ_;
00087 }
00088 MonitoredQuantity& getFragmentProcessorIdleMQ() {
00089 return fragmentProcessorIdleTimeMQ_;
00090 }
00091
00092 const MonitoredQuantity& getFragmentStoreEntryCountMQ() const {
00093 return entriesInFragmentStoreMQ_;
00094 }
00095 MonitoredQuantity& getFragmentStoreEntryCountMQ() {
00096 return entriesInFragmentStoreMQ_;
00097 }
00098
00099 const MonitoredQuantity& getFragmentStoreMemoryUsedMQ() const {
00100 return memoryUsedInFragmentStoreMQ_;
00101 }
00102 MonitoredQuantity& getFragmentStoreMemoryUsedMQ() {
00103 return memoryUsedInFragmentStoreMQ_;
00104 }
00105
00106 void setStreamQueue(StreamQueuePtr streamQueue) {
00107 streamQueue_ = streamQueue;
00108 }
00109
00110 const MonitoredQuantity& getStreamQueueEntryCountMQ() const {
00111 return entriesInStreamQueueMQ_;
00112 }
00113 MonitoredQuantity& getStreamQueueEntryCountMQ() {
00114 return entriesInStreamQueueMQ_;
00115 }
00116
00117 const MonitoredQuantity& getStreamQueueMemoryUsedMQ() const {
00118 return memoryUsedInStreamQueueMQ_;
00119 }
00120 MonitoredQuantity& getStreamQueueMemoryUsedMQ() {
00121 return memoryUsedInStreamQueueMQ_;
00122 }
00123
00124 void addPoppedEventSample(double dataSize);
00125
00126 const MonitoredQuantity& getPoppedEventSizeMQ() const {
00127 return poppedEventSizeMQ_;
00128 }
00129 MonitoredQuantity& getPoppedEventSizeMQ() {
00130 return poppedEventSizeMQ_;
00131 }
00132
00133 void addDiskWriterIdleSample(utils::Duration_t idleTime);
00134
00135 const MonitoredQuantity& getDiskWriterIdleMQ() const {
00136 return diskWriterIdleTimeMQ_;
00137 }
00138 MonitoredQuantity& getDiskWriterIdleMQ() {
00139 return diskWriterIdleTimeMQ_;
00140 }
00141
00142 void addDiskWriteSample(double dataSize);
00143
00144 const MonitoredQuantity& getDiskWriteMQ() const {
00145 return diskWriteSizeMQ_;
00146 }
00147 MonitoredQuantity& getDiskWriteMQ() {
00148 return diskWriteSizeMQ_;
00149 }
00150
00151 void setDQMEventQueue(DQMEventQueuePtr dqmEventQueue) {
00152 dqmEventQueue_ = dqmEventQueue;
00153 }
00154
00155 const MonitoredQuantity& getDQMEventQueueEntryCountMQ() const {
00156 return entriesInDQMEventQueueMQ_;
00157 }
00158 MonitoredQuantity& getDQMEventQueueEntryCountMQ() {
00159 return entriesInDQMEventQueueMQ_;
00160 }
00161
00162 const MonitoredQuantity& getDQMEventQueueMemoryUsedMQ() const {
00163 return memoryUsedInDQMEventQueueMQ_;
00164 }
00165 MonitoredQuantity& getDQMEventQueueMemoryUsedMQ() {
00166 return memoryUsedInDQMEventQueueMQ_;
00167 }
00168
00169 void addPoppedDQMEventSample(double dataSize);
00170
00171 const MonitoredQuantity& getPoppedDQMEventSizeMQ() const {
00172 return poppedDQMEventSizeMQ_;
00173 }
00174 MonitoredQuantity& getPoppedDQMEventSizeMQ() {
00175 return poppedDQMEventSizeMQ_;
00176 }
00177
00178 void addDQMEventProcessorIdleSample(utils::Duration_t idleTime);
00179
00180 const MonitoredQuantity& getDQMEventProcessorIdleMQ() const {
00181 return dqmEventProcessorIdleTimeMQ_;
00182 }
00183 MonitoredQuantity& getDQMEventProcessorIdleMQ() {
00184 return dqmEventProcessorIdleTimeMQ_;
00185 }
00186
00190 inline void setFragmentStoreSize(unsigned int size) {
00191 currentFragmentStoreSize_ = size;
00192 }
00193
00197 inline void setFragmentStoreMemoryUsed(size_t memoryUsed) {
00198 currentFragmentStoreMemoryUsedMB_ = static_cast<double>(memoryUsed) / (1024*1024);
00199 }
00200
00201 struct Stats
00202 {
00203
00204 struct Snapshot
00205 {
00206 utils::Duration_t duration;
00207 utils::TimePoint_t absoluteTime;
00208 double poolUsage;
00209 double entriesInFragmentQueue;
00210 double memoryUsedInFragmentQueue;
00211 double fragmentQueueRate;
00212 double fragmentQueueBandwidth;
00213 double fragmentStoreSize;
00214 double fragmentStoreMemoryUsed;
00215 double entriesInStreamQueue;
00216 double memoryUsedInStreamQueue;
00217 double streamQueueRate;
00218 double streamQueueBandwidth;
00219 double writtenEventsRate;
00220 double writtenEventsBandwidth;
00221 double entriesInDQMQueue;
00222 double memoryUsedInDQMQueue;
00223 double dqmQueueRate;
00224 double dqmQueueBandwidth;
00225
00226 double fragmentProcessorBusy;
00227 double diskWriterBusy;
00228 double dqmEventProcessorBusy;
00229
00230 Snapshot();
00231 Snapshot operator=(const Snapshot&);
00232 Snapshot operator+=(const Snapshot&);
00233 Snapshot operator/=(const double&);
00234
00235 };
00236
00237 typedef std::vector<Snapshot> Snapshots;
00238 Snapshots snapshots;
00239 Snapshot average;
00240
00241 void reset();
00242 };
00243
00247 void getStats(Stats&) const;
00248
00252 void getStats(Stats&, const unsigned int sampleCount) const;
00253
00254
00255 private:
00256
00257
00258 ThroughputMonitorCollection(ThroughputMonitorCollection const&);
00259 ThroughputMonitorCollection& operator=(ThroughputMonitorCollection const&);
00260
00261 virtual void do_calculateStatistics();
00262 virtual void do_reset();
00263 virtual void do_appendInfoSpaceItems(InfoSpaceItems&);
00264 virtual void do_updateInfoSpaceItems();
00265
00266 void do_getStats(Stats&, const unsigned int sampleCount) const;
00267
00275 void smoothIdleTimes(MonitoredQuantity::Stats&) const;
00276
00277 int smoothIdleTimesHelper
00278 (
00279 std::vector<double>& idleTimes,
00280 std::vector<utils::Duration_t>& durations,
00281 int firstIndex, int lastIndex
00282 ) const;
00283
00284 void getRateAndBandwidth
00285 (
00286 MonitoredQuantity::Stats& stats,
00287 const int& idx,
00288 double& rate,
00289 double& bandwidth
00290 ) const;
00291
00292 double calcBusyPercentage(
00293 MonitoredQuantity::Stats&,
00294 const int& idx
00295 ) const;
00296
00297 void calcPoolUsage();
00298
00299 const unsigned int binCount_;
00300 mutable boost::mutex statsMutex_;
00301
00302 MonitoredQuantity poolUsageMQ_;
00303 MonitoredQuantity entriesInFragmentQueueMQ_;
00304 MonitoredQuantity memoryUsedInFragmentQueueMQ_;
00305 MonitoredQuantity poppedFragmentSizeMQ_;
00306 MonitoredQuantity fragmentProcessorIdleTimeMQ_;
00307 MonitoredQuantity entriesInFragmentStoreMQ_;
00308 MonitoredQuantity memoryUsedInFragmentStoreMQ_;
00309
00310 MonitoredQuantity entriesInStreamQueueMQ_;
00311 MonitoredQuantity memoryUsedInStreamQueueMQ_;
00312 MonitoredQuantity poppedEventSizeMQ_;
00313 MonitoredQuantity diskWriterIdleTimeMQ_;
00314 MonitoredQuantity diskWriteSizeMQ_;
00315
00316 MonitoredQuantity entriesInDQMEventQueueMQ_;
00317 MonitoredQuantity memoryUsedInDQMEventQueueMQ_;
00318 MonitoredQuantity poppedDQMEventSizeMQ_;
00319 MonitoredQuantity dqmEventProcessorIdleTimeMQ_;
00320
00321 FragmentQueuePtr fragmentQueue_;
00322 StreamQueuePtr streamQueue_;
00323 DQMEventQueuePtr dqmEventQueue_;
00324
00325 unsigned int currentFragmentStoreSize_;
00326 double currentFragmentStoreMemoryUsedMB_;
00327 unsigned int throuphputAveragingCycles_;
00328
00329 toolbox::mem::Pool* pool_;
00330
00331 xdata::UnsignedInteger32 poolUsage_;
00332 xdata::UnsignedInteger32 entriesInFragmentQueue_;
00333 xdata::Double memoryUsedInFragmentQueue_;
00334 xdata::Double fragmentQueueRate_;
00335 xdata::Double fragmentQueueBandwidth_;
00336 xdata::UnsignedInteger32 fragmentStoreSize_;
00337 xdata::Double fragmentStoreMemoryUsed_;
00338 xdata::UnsignedInteger32 entriesInStreamQueue_;
00339 xdata::Double memoryUsedInStreamQueue_;
00340 xdata::Double streamQueueRate_;
00341 xdata::Double streamQueueBandwidth_;
00342 xdata::Double writtenEventsRate_;
00343 xdata::Double writtenEventsBandwidth_;
00344 xdata::UnsignedInteger32 entriesInDQMQueue_;
00345 xdata::Double memoryUsedInDQMQueue_;
00346 xdata::Double dqmQueueRate_;
00347 xdata::Double dqmQueueBandwidth_;
00348
00349 xdata::Double fragmentProcessorBusy_;
00350 xdata::Double diskWriterBusy_;
00351 xdata::Double dqmEventProcessorBusy_;
00352 xdata::Double averagingTime_;
00353 };
00354
00355 }
00356
00357 #endif // EventFilter_StorageManager_ThroughputMonitorCollection_h
00358
00359