CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_4_1_8_patch9/src/EventFilter/StorageManager/src/ThroughputMonitorCollection.cc

Go to the documentation of this file.
00001 // $Id: ThroughputMonitorCollection.cc,v 1.24 2011/03/07 15:31:32 mommsen Exp $
00003 
00004 #include "EventFilter/StorageManager/interface/ThroughputMonitorCollection.h"
00005 #include "EventFilter/StorageManager/interface/Utils.h"
00006 
00007 using namespace stor;
00008 
00009 ThroughputMonitorCollection::ThroughputMonitorCollection
00010 (
00011   const utils::Duration_t& updateInterval,
00012   const unsigned int& throuphputAveragingCycles
00013 ) :
00014   MonitorCollection(updateInterval),
00015   binCount_(300),
00016   poolUsageMQ_(updateInterval, updateInterval*binCount_),
00017   entriesInFragmentQueueMQ_(updateInterval, updateInterval*binCount_),
00018   memoryUsedInFragmentQueueMQ_(updateInterval, updateInterval*binCount_),
00019   poppedFragmentSizeMQ_(updateInterval, updateInterval*binCount_),
00020   fragmentProcessorIdleTimeMQ_(updateInterval, updateInterval*binCount_),
00021   entriesInFragmentStoreMQ_(updateInterval, updateInterval*binCount_),
00022   memoryUsedInFragmentStoreMQ_(updateInterval, updateInterval*binCount_),
00023   entriesInStreamQueueMQ_(updateInterval, updateInterval*binCount_),
00024   memoryUsedInStreamQueueMQ_(updateInterval, updateInterval*binCount_),
00025   poppedEventSizeMQ_(updateInterval, updateInterval*binCount_),
00026   diskWriterIdleTimeMQ_(updateInterval, updateInterval*binCount_),
00027   diskWriteSizeMQ_(updateInterval, updateInterval*binCount_),
00028   entriesInDQMEventQueueMQ_(updateInterval, updateInterval*binCount_),
00029   memoryUsedInDQMEventQueueMQ_(updateInterval, updateInterval*binCount_),
00030   poppedDQMEventSizeMQ_(updateInterval, updateInterval*binCount_),
00031   dqmEventProcessorIdleTimeMQ_(updateInterval, updateInterval*binCount_),
00032   currentFragmentStoreSize_(0),
00033   currentFragmentStoreMemoryUsedMB_(0),
00034   throuphputAveragingCycles_(throuphputAveragingCycles),
00035   pool_(0)
00036 {}
00037 
00038 
00039 void ThroughputMonitorCollection::setMemoryPoolPointer(toolbox::mem::Pool* pool)
00040 {
00041   if ( ! pool_)
00042     pool_ = pool;
00043 }
00044 
00045 
00046 void ThroughputMonitorCollection::addPoppedFragmentSample(double dataSize)
00047 {
00048   poppedFragmentSizeMQ_.addSample(dataSize);
00049 }
00050 
00051 
00052 void ThroughputMonitorCollection::
00053 addFragmentProcessorIdleSample(utils::Duration_t idleTime)
00054 {
00055   fragmentProcessorIdleTimeMQ_.addSample(utils::durationToSeconds(idleTime));
00056 }
00057 
00058 
00059 void ThroughputMonitorCollection::addPoppedEventSample(double dataSize)
00060 {
00061   poppedEventSizeMQ_.addSample(dataSize);
00062 }
00063 
00064 
00065 void ThroughputMonitorCollection::
00066 addDiskWriterIdleSample(utils::Duration_t idleTime)
00067 {
00068   diskWriterIdleTimeMQ_.addSample(utils::durationToSeconds(idleTime));
00069 }
00070 
00071 
00072 void ThroughputMonitorCollection::addDiskWriteSample(double dataSize)
00073 {
00074   diskWriteSizeMQ_.addSample(dataSize);
00075 }
00076 
00077 
00078 void ThroughputMonitorCollection::addPoppedDQMEventSample(double dataSize)
00079 {
00080   poppedDQMEventSizeMQ_.addSample(dataSize);
00081 }
00082 
00083 
00084 void ThroughputMonitorCollection::
00085 addDQMEventProcessorIdleSample(utils::Duration_t idleTime)
00086 {
00087   dqmEventProcessorIdleTimeMQ_.addSample(utils::durationToSeconds(idleTime));
00088 }
00089 
00090 
00091 void ThroughputMonitorCollection::calcPoolUsage()
00092 {
00093   if (pool_)
00094   {
00095     try {
00096       pool_->lock();
00097       poolUsageMQ_.addSample(pool_->getMemoryUsage().getUsed());
00098       pool_->unlock();
00099     }
00100     catch (...)
00101     {
00102       pool_->unlock();
00103     }
00104   }
00105   poolUsageMQ_.calculateStatistics();
00106 }
00107 
00108 
00109 void ThroughputMonitorCollection::getStats(Stats& stats) const
00110 {
00111   boost::mutex::scoped_lock sl(statsMutex_);
00112   do_getStats(stats, binCount_);
00113 }
00114 
00115 
00116 void ThroughputMonitorCollection::getStats(Stats& stats, const unsigned int sampleCount) const
00117 {
00118   boost::mutex::scoped_lock sl(statsMutex_);
00119   do_getStats(stats, sampleCount);
00120 }
00121 
00122 
00123 void ThroughputMonitorCollection::do_getStats(Stats& stats, const unsigned int sampleCount) const
00124 {
00125   MonitoredQuantity::Stats fqEntryCountMQ, fqMemoryUsedMQ, fragSizeMQ;
00126   MonitoredQuantity::Stats fpIdleMQ, fsEntryCountMQ, fsMemoryUsedMQ;
00127   MonitoredQuantity::Stats sqEntryCountMQ, sqMemoryUsedMQ, eventSizeMQ, dwIdleMQ, diskWriteMQ;
00128   MonitoredQuantity::Stats dqEntryCountMQ, dqMemoryUsedMQ, dqmEventSizeMQ, dqmIdleMQ, poolUsageMQ;
00129   poolUsageMQ_.getStats(poolUsageMQ);
00130   entriesInFragmentQueueMQ_.getStats(fqEntryCountMQ);
00131   memoryUsedInFragmentQueueMQ_.getStats(fqMemoryUsedMQ);
00132   poppedFragmentSizeMQ_.getStats(fragSizeMQ);
00133   fragmentProcessorIdleTimeMQ_.getStats(fpIdleMQ);
00134   entriesInFragmentStoreMQ_.getStats(fsEntryCountMQ);
00135   memoryUsedInFragmentStoreMQ_.getStats(fsMemoryUsedMQ);
00136   entriesInStreamQueueMQ_.getStats(sqEntryCountMQ);
00137   memoryUsedInStreamQueueMQ_.getStats(sqMemoryUsedMQ);
00138   poppedEventSizeMQ_.getStats(eventSizeMQ);
00139   diskWriterIdleTimeMQ_.getStats(dwIdleMQ);
00140   diskWriteSizeMQ_.getStats(diskWriteMQ);
00141   entriesInDQMEventQueueMQ_.getStats(dqEntryCountMQ);
00142   memoryUsedInDQMEventQueueMQ_.getStats(dqMemoryUsedMQ);
00143   poppedDQMEventSizeMQ_.getStats(dqmEventSizeMQ);
00144   dqmEventProcessorIdleTimeMQ_.getStats(dqmIdleMQ);
00145 
00146   stats.reset();
00147 
00148   smoothIdleTimes(fpIdleMQ);
00149   smoothIdleTimes(dwIdleMQ);
00150   smoothIdleTimes(dqmIdleMQ);
00151 
00152   utils::Duration_t relativeTime = fqEntryCountMQ.recentDuration;
00153   const int lowestBin = sampleCount<binCount_ ? binCount_-sampleCount : 0;
00154   for (int idx = (binCount_ - 1); idx >= lowestBin; --idx)
00155   {
00156     utils::Duration_t binDuration = fqEntryCountMQ.recentBinnedDurations[idx];
00157     relativeTime -= binDuration;
00158     if (binDuration < boost::posix_time::milliseconds(10)) continue; //avoid very short durations
00159 
00160     Stats::Snapshot snapshot;
00161 
00162     snapshot.duration = binDuration;
00163     snapshot.absoluteTime = fqEntryCountMQ.recentBinnedSnapshotTimes[idx];
00164 
00165     // memory pool usage
00166     snapshot.poolUsage = poolUsageMQ.recentBinnedSampleCounts[idx]>0 ? 
00167       poolUsageMQ.recentBinnedValueSums[idx]/poolUsageMQ.recentBinnedSampleCounts[idx] :
00168       0;
00169 
00170     // number of fragments in fragment queue
00171     snapshot.entriesInFragmentQueue = fqEntryCountMQ.recentBinnedSampleCounts[idx]>0 ?
00172       fqEntryCountMQ.recentBinnedValueSums[idx]/fqEntryCountMQ.recentBinnedSampleCounts[idx] :
00173       0;
00174 
00175     // memory usage in fragment queue
00176     snapshot.memoryUsedInFragmentQueue = fqMemoryUsedMQ.recentBinnedSampleCounts[idx]>0 ?
00177       fqMemoryUsedMQ.recentBinnedValueSums[idx]/fqMemoryUsedMQ.recentBinnedSampleCounts[idx] :
00178       0;
00179 
00180     // rate/bandwidth of fragments popped from fragment queue
00181     getRateAndBandwidth(fragSizeMQ, idx, snapshot.fragmentQueueRate, snapshot.fragmentQueueBandwidth);
00182 
00183     // number of events in fragment store
00184     snapshot.fragmentStoreSize = fsEntryCountMQ.recentBinnedSampleCounts[idx]>0 ?
00185       fsEntryCountMQ.recentBinnedValueSums[idx]/fsEntryCountMQ.recentBinnedSampleCounts[idx]>0 :
00186       0;
00187 
00188     // memory usage in fragment store
00189     snapshot.fragmentStoreMemoryUsed = fsMemoryUsedMQ.recentBinnedSampleCounts[idx]>0 ?
00190       fsMemoryUsedMQ.recentBinnedValueSums[idx]/fsMemoryUsedMQ.recentBinnedSampleCounts[idx] :
00191       0;
00192 
00193     // number of events in stream queue
00194     snapshot.entriesInStreamQueue = sqEntryCountMQ.recentBinnedSampleCounts[idx]>0 ?
00195       sqEntryCountMQ.recentBinnedValueSums[idx]/sqEntryCountMQ.recentBinnedSampleCounts[idx]>0 :
00196       0;
00197 
00198     // memory usage in stream queue
00199     snapshot.memoryUsedInStreamQueue = sqMemoryUsedMQ.recentBinnedSampleCounts[idx]>0 ?
00200       sqMemoryUsedMQ.recentBinnedValueSums[idx]/sqMemoryUsedMQ.recentBinnedSampleCounts[idx] :
00201       0;
00202 
00203     // rate/bandwidth of events popped from stream queue
00204     getRateAndBandwidth(eventSizeMQ, idx, snapshot.streamQueueRate, snapshot.streamQueueBandwidth);
00205 
00206     // rate/bandwidth of events written to disk
00207     getRateAndBandwidth(diskWriteMQ, idx, snapshot.writtenEventsRate, snapshot.writtenEventsBandwidth);
00208 
00209     // number of dqm events in DQMEvent queue
00210     snapshot.entriesInDQMQueue = dqEntryCountMQ.recentBinnedSampleCounts[idx]>0 ?
00211       dqEntryCountMQ.recentBinnedValueSums[idx]/dqEntryCountMQ.recentBinnedSampleCounts[idx] :
00212       0;
00213 
00214     // memory usage in DQMEvent queue
00215     snapshot.memoryUsedInDQMQueue = dqMemoryUsedMQ.recentBinnedSampleCounts[idx]>0 ?
00216       dqMemoryUsedMQ.recentBinnedValueSums[idx]/dqMemoryUsedMQ.recentBinnedSampleCounts[idx] :
00217       0;
00218 
00219     // rate/bandwidth of dqm events popped from DQMEvent queue
00220     getRateAndBandwidth(dqmEventSizeMQ, idx, snapshot.dqmQueueRate, snapshot.dqmQueueBandwidth);
00221 
00222     // fragment processor thread busy percentage
00223     snapshot.fragmentProcessorBusy =
00224       calcBusyPercentage(fpIdleMQ, idx);
00225 
00226     // disk writer thread busy percentage
00227     snapshot.diskWriterBusy =
00228       calcBusyPercentage(dwIdleMQ, idx);
00229 
00230     // DQMEvent processor thread busy percentage
00231     snapshot.dqmEventProcessorBusy =
00232       calcBusyPercentage(dqmIdleMQ, idx);
00233 
00234     stats.average += snapshot;
00235     stats.snapshots.push_back(snapshot);
00236   }
00237 
00238   const size_t snapshotCount = stats.snapshots.size();
00239   if (snapshotCount > 0)
00240   {
00241     stats.average /= snapshotCount;
00242   }
00243 }
00244 
00245 
00246 void ThroughputMonitorCollection::smoothIdleTimes(MonitoredQuantity::Stats& stats) const
00247 {
00248   int index = binCount_ - 1;
00249   while (index >= 0)
00250   {
00251     index = smoothIdleTimesHelper(stats.recentBinnedValueSums,
00252                                   stats.recentBinnedDurations,
00253                                   index, index);
00254   }
00255 }
00256 
00257 
00258 int ThroughputMonitorCollection::smoothIdleTimesHelper
00259 (
00260   std::vector<double>& idleTimes,
00261   std::vector<utils::Duration_t>& durations,
00262   int firstIndex, int lastIndex
00263 ) const
00264 {
00265   int workingSize = lastIndex - firstIndex + 1;
00266   double idleTimeSum = 0;
00267   double durationSum = 0;
00268 
00269   for (int idx = firstIndex; idx <= lastIndex; ++idx)
00270   {
00271     idleTimeSum += idleTimes[idx];
00272     durationSum += utils::durationToSeconds(durations[idx]);
00273   }
00274 
00275   if (idleTimeSum > durationSum && firstIndex > 0)
00276   {
00277     return smoothIdleTimesHelper(idleTimes, durations, firstIndex-1, lastIndex);
00278   }
00279   else
00280   {
00281     if (lastIndex > firstIndex)
00282     {
00283       for (int idx = firstIndex; idx <= lastIndex; ++idx)
00284       {
00285         idleTimes[idx] = idleTimeSum / workingSize;
00286         durations[idx] = utils::secondsToDuration(durationSum / workingSize);
00287       }
00288     }
00289     return (firstIndex - 1);
00290   }
00291 }
00292 
00293 
00294 void ThroughputMonitorCollection::getRateAndBandwidth
00295 (
00296   MonitoredQuantity::Stats& stats,
00297   const int& idx,
00298   double& rate,
00299   double& bandwidth
00300 ) const
00301 {
00302   const double recentBinnedDuration = utils::durationToSeconds(stats.recentBinnedDurations[idx]);
00303   if (recentBinnedDuration > 0)
00304   {
00305     rate =
00306       stats.recentBinnedSampleCounts[idx] / recentBinnedDuration;
00307     
00308     bandwidth =
00309       stats.recentBinnedValueSums[idx] / (1024*1024) 
00310       / recentBinnedDuration;
00311   }
00312 }
00313 
00314 
00315 double ThroughputMonitorCollection::calcBusyPercentage
00316 (
00317   MonitoredQuantity::Stats& stats,
00318   const int& idx
00319 ) const
00320 {
00321   double busyPercentage;
00322   if (stats.recentBinnedSampleCounts[idx] == 0)
00323   {
00324     // the thread did not log any idle time
00325     busyPercentage = 100;
00326   }
00327   else if (stats.recentBinnedSampleCounts[idx] == 1)
00328   {
00329     // only one sample means that we waited a whole second on a queue
00330     // this should only happen if deq_timed_wait timeout >= statistics calculation period
00331     busyPercentage = 0;
00332   }
00333   else if (stats.recentBinnedValueSums[idx] <= utils::durationToSeconds(stats.recentBinnedDurations[idx]))
00334   {
00335     // the thread was busy while it was not idle during the whole reporting duration
00336     busyPercentage = 100.0 * (1.0 - (stats.recentBinnedValueSums[idx] /
00337         utils::durationToSeconds(stats.recentBinnedDurations[idx])));
00338   }
00339   else
00340   {
00341     // the process logged more idle time than the whole reporting duration
00342     // this can happen due to rounding issues.
00343     busyPercentage = 0;
00344   }
00345 
00346   return busyPercentage;
00347 }
00348 
00349 
00350 void ThroughputMonitorCollection::do_calculateStatistics()
00351 {
00352   calcPoolUsage();
00353 
00354   if (fragmentQueue_.get() != 0) {
00355     entriesInFragmentQueueMQ_.addSample(fragmentQueue_->size());
00356     memoryUsedInFragmentQueueMQ_.addSample( static_cast<double>(fragmentQueue_->used()) / (1024*1024) );
00357   }
00358   if (streamQueue_.get() != 0) {
00359     entriesInStreamQueueMQ_.addSample(streamQueue_->size());
00360     memoryUsedInStreamQueueMQ_.addSample( static_cast<double>(streamQueue_->used()) / (1024*1024) );
00361   }
00362   if (dqmEventQueue_.get() != 0) {
00363     entriesInDQMEventQueueMQ_.addSample(dqmEventQueue_->size());
00364     memoryUsedInDQMEventQueueMQ_.addSample( static_cast<double>(dqmEventQueue_->used()) / (1024*1024) );
00365   }
00366   entriesInFragmentStoreMQ_.addSample(currentFragmentStoreSize_);
00367   memoryUsedInFragmentStoreMQ_.addSample(currentFragmentStoreMemoryUsedMB_);
00368 
00369   entriesInFragmentQueueMQ_.calculateStatistics();
00370   memoryUsedInFragmentQueueMQ_.calculateStatistics();
00371   poppedFragmentSizeMQ_.calculateStatistics();
00372   fragmentProcessorIdleTimeMQ_.calculateStatistics();
00373   entriesInFragmentStoreMQ_.calculateStatistics();
00374   memoryUsedInFragmentStoreMQ_.calculateStatistics();
00375   entriesInStreamQueueMQ_.calculateStatistics();
00376   memoryUsedInStreamQueueMQ_.calculateStatistics();
00377   poppedEventSizeMQ_.calculateStatistics();
00378   diskWriterIdleTimeMQ_.calculateStatistics();
00379   diskWriteSizeMQ_.calculateStatistics();
00380   entriesInDQMEventQueueMQ_.calculateStatistics();
00381   memoryUsedInDQMEventQueueMQ_.calculateStatistics();
00382   poppedDQMEventSizeMQ_.calculateStatistics();
00383   dqmEventProcessorIdleTimeMQ_.calculateStatistics();
00384 }
00385 
00386 
00387 void ThroughputMonitorCollection::do_reset()
00388 {
00389   poolUsageMQ_.reset();
00390   entriesInFragmentQueueMQ_.reset();
00391   memoryUsedInFragmentQueueMQ_.reset();
00392   poppedFragmentSizeMQ_.reset();
00393   fragmentProcessorIdleTimeMQ_.reset();
00394   entriesInFragmentStoreMQ_.reset();
00395   memoryUsedInFragmentStoreMQ_.reset();
00396   entriesInStreamQueueMQ_.reset();
00397   memoryUsedInStreamQueueMQ_.reset();
00398   poppedEventSizeMQ_.reset();
00399   diskWriterIdleTimeMQ_.reset();
00400   diskWriteSizeMQ_.reset();
00401   entriesInDQMEventQueueMQ_.reset();
00402   memoryUsedInDQMEventQueueMQ_.reset();
00403   poppedDQMEventSizeMQ_.reset();
00404   dqmEventProcessorIdleTimeMQ_.reset();
00405 }
00406 
00407 
00408 void ThroughputMonitorCollection::do_appendInfoSpaceItems(InfoSpaceItems& infoSpaceItems)
00409 {
00410   infoSpaceItems.push_back(std::make_pair("poolUsage", &poolUsage_));
00411   infoSpaceItems.push_back(std::make_pair("entriesInFragmentQueue", &entriesInFragmentQueue_));
00412   infoSpaceItems.push_back(std::make_pair("memoryUsedInFragmentQueue", &memoryUsedInFragmentQueue_));
00413   infoSpaceItems.push_back(std::make_pair("fragmentQueueRate", &fragmentQueueRate_));
00414   infoSpaceItems.push_back(std::make_pair("fragmentQueueBandwidth", &fragmentQueueBandwidth_));
00415   infoSpaceItems.push_back(std::make_pair("fragmentStoreSize", &fragmentStoreSize_));
00416   infoSpaceItems.push_back(std::make_pair("fragmentStoreMemoryUsed", &fragmentStoreMemoryUsed_));
00417   infoSpaceItems.push_back(std::make_pair("entriesInStreamQueue", &entriesInStreamQueue_));
00418   infoSpaceItems.push_back(std::make_pair("memoryUsedInStreamQueue", &memoryUsedInStreamQueue_));
00419   infoSpaceItems.push_back(std::make_pair("streamQueueRate", &streamQueueRate_));
00420   infoSpaceItems.push_back(std::make_pair("streamQueueBandwidth", &streamQueueBandwidth_));
00421   infoSpaceItems.push_back(std::make_pair("writtenEventsRate", &writtenEventsRate_));
00422   infoSpaceItems.push_back(std::make_pair("writtenEventsBandwidth", &writtenEventsBandwidth_));
00423   infoSpaceItems.push_back(std::make_pair("entriesInDQMQueue", &entriesInDQMQueue_));
00424   infoSpaceItems.push_back(std::make_pair("memoryUsedInDQMQueue", &memoryUsedInDQMQueue_));
00425   infoSpaceItems.push_back(std::make_pair("dqmQueueRate", &dqmQueueRate_));
00426   infoSpaceItems.push_back(std::make_pair("dqmQueueBandwidth", &dqmQueueBandwidth_));
00427   infoSpaceItems.push_back(std::make_pair("fragmentProcessorBusy", &fragmentProcessorBusy_));
00428   infoSpaceItems.push_back(std::make_pair("diskWriterBusy", &diskWriterBusy_));
00429   infoSpaceItems.push_back(std::make_pair("dqmEventProcessorBusy", &dqmEventProcessorBusy_));
00430   infoSpaceItems.push_back(std::make_pair("averagingTime", &averagingTime_));
00431 }
00432 
00433 
00434 void ThroughputMonitorCollection::do_updateInfoSpaceItems()
00435 {
00436   Stats stats;
00437   getStats(stats, throuphputAveragingCycles_);
00438 
00439   poolUsage_ = static_cast<unsigned int>(stats.average.poolUsage);
00440   entriesInFragmentQueue_ = static_cast<unsigned int>(stats.average.entriesInFragmentQueue);
00441   memoryUsedInFragmentQueue_ = stats.average.memoryUsedInFragmentQueue;
00442   fragmentQueueRate_ = stats.average.fragmentQueueRate;
00443   fragmentQueueBandwidth_ = stats.average.fragmentQueueBandwidth;
00444   fragmentStoreSize_ = static_cast<unsigned int>(stats.average.fragmentStoreSize);
00445   fragmentStoreMemoryUsed_ = stats.average.fragmentStoreMemoryUsed;
00446   entriesInStreamQueue_ = static_cast<unsigned int>(stats.average.entriesInStreamQueue);
00447   memoryUsedInStreamQueue_ = stats.average.memoryUsedInStreamQueue;
00448   streamQueueRate_ = stats.average.streamQueueRate;
00449   streamQueueBandwidth_ = stats.average.streamQueueBandwidth;
00450   writtenEventsRate_ = stats.average.writtenEventsRate;
00451   writtenEventsBandwidth_ = stats.average.writtenEventsBandwidth;
00452   entriesInDQMQueue_ = static_cast<unsigned int>(stats.average.entriesInDQMQueue);
00453   memoryUsedInDQMQueue_ = stats.average.memoryUsedInDQMQueue;
00454   dqmQueueRate_ = stats.average.dqmQueueRate;
00455   dqmQueueBandwidth_ = stats.average.dqmQueueBandwidth;
00456   fragmentProcessorBusy_ = stats.average.fragmentProcessorBusy;
00457   diskWriterBusy_ = stats.average.diskWriterBusy;
00458   dqmEventProcessorBusy_ = stats.average.dqmEventProcessorBusy;
00459   averagingTime_ = utils::durationToSeconds(stats.average.duration);
00460 }
00461 
00462 
00463 ThroughputMonitorCollection::Stats::Snapshot::Snapshot() :
00464 duration(boost::posix_time::seconds(0)),
00465 poolUsage(0),
00466 entriesInFragmentQueue(0),
00467 memoryUsedInFragmentQueue(0),
00468 fragmentQueueRate(0),
00469 fragmentQueueBandwidth(0),
00470 fragmentStoreSize(0),
00471 fragmentStoreMemoryUsed(0),
00472 entriesInStreamQueue(0),
00473 memoryUsedInStreamQueue(0),
00474 streamQueueRate(0),
00475 streamQueueBandwidth(0),
00476 writtenEventsRate(0),
00477 writtenEventsBandwidth(0),
00478 entriesInDQMQueue(0),
00479 memoryUsedInDQMQueue(0),
00480 dqmQueueRate(0),
00481 dqmQueueBandwidth(0),
00482 fragmentProcessorBusy(0),
00483 diskWriterBusy(0),
00484 dqmEventProcessorBusy(0)
00485 {}
00486 
00487 
00488 ThroughputMonitorCollection::Stats::Snapshot
00489 ThroughputMonitorCollection::Stats::Snapshot::operator=(const Snapshot& other)
00490 {
00491   duration = other.duration;
00492   poolUsage = other.poolUsage;
00493   entriesInFragmentQueue = other.entriesInFragmentQueue;
00494   memoryUsedInFragmentQueue = other.memoryUsedInFragmentQueue;
00495   fragmentQueueRate = other.fragmentQueueRate;
00496   fragmentQueueBandwidth = other.fragmentQueueBandwidth;
00497   fragmentStoreSize = other.fragmentStoreSize;
00498   fragmentStoreMemoryUsed = other.fragmentStoreMemoryUsed;
00499   entriesInStreamQueue = other.entriesInStreamQueue;
00500   memoryUsedInStreamQueue = other.memoryUsedInStreamQueue;
00501   streamQueueRate = other.streamQueueRate;
00502   streamQueueBandwidth = other.streamQueueBandwidth;
00503   writtenEventsRate = other.writtenEventsRate;
00504   writtenEventsBandwidth = other.writtenEventsBandwidth;
00505   entriesInDQMQueue = other.entriesInDQMQueue;
00506   memoryUsedInDQMQueue = other.memoryUsedInDQMQueue;
00507   dqmQueueRate = other.dqmQueueRate;
00508   dqmQueueBandwidth = other.dqmQueueBandwidth;
00509   fragmentProcessorBusy = other.fragmentProcessorBusy;
00510   diskWriterBusy = other.diskWriterBusy;
00511   dqmEventProcessorBusy = other.dqmEventProcessorBusy;
00512 
00513   return *this;
00514 }
00515 
00516 
00517 ThroughputMonitorCollection::Stats::Snapshot
00518 ThroughputMonitorCollection::Stats::Snapshot::operator+=(const Snapshot& other)
00519 {
00520   duration += other.duration;
00521   poolUsage += other.poolUsage;
00522   entriesInFragmentQueue += other.entriesInFragmentQueue;
00523   memoryUsedInFragmentQueue += other.memoryUsedInFragmentQueue;
00524   fragmentQueueRate += other.fragmentQueueRate;
00525   fragmentQueueBandwidth += other.fragmentQueueBandwidth;
00526   fragmentStoreSize += other.fragmentStoreSize;
00527   fragmentStoreMemoryUsed += other.fragmentStoreMemoryUsed;
00528   entriesInStreamQueue += other.entriesInStreamQueue;
00529   memoryUsedInStreamQueue += other.memoryUsedInStreamQueue;
00530   streamQueueRate += other.streamQueueRate;
00531   streamQueueBandwidth += other.streamQueueBandwidth;
00532   writtenEventsRate += other.writtenEventsRate;
00533   writtenEventsBandwidth += other.writtenEventsBandwidth;
00534   entriesInDQMQueue += other.entriesInDQMQueue;
00535   memoryUsedInDQMQueue += other.memoryUsedInDQMQueue;
00536   dqmQueueRate += other.dqmQueueRate;
00537   dqmQueueBandwidth += other.dqmQueueBandwidth;
00538   fragmentProcessorBusy += other.fragmentProcessorBusy;
00539   diskWriterBusy += other.diskWriterBusy;
00540   dqmEventProcessorBusy += other.dqmEventProcessorBusy;
00541 
00542   return *this;
00543 }
00544 
00545 
00546 ThroughputMonitorCollection::Stats::Snapshot
00547 ThroughputMonitorCollection::Stats::Snapshot::operator/=(const double& value)
00548 {
00549   poolUsage /= value;
00550   entriesInFragmentQueue /= value;
00551   memoryUsedInFragmentQueue /= value;
00552   fragmentQueueRate /= value;
00553   fragmentQueueBandwidth /= value;
00554   fragmentStoreSize /= value;
00555   fragmentStoreMemoryUsed /= value;
00556   entriesInStreamQueue /= value;
00557   memoryUsedInStreamQueue /= value;
00558   streamQueueRate /= value;
00559   streamQueueBandwidth /= value;
00560   writtenEventsRate /= value;
00561   writtenEventsBandwidth /= value;
00562   entriesInDQMQueue /= value;
00563   memoryUsedInDQMQueue /= value;
00564   dqmQueueRate /= value;
00565   dqmQueueBandwidth /= value;
00566   fragmentProcessorBusy /= value;
00567   diskWriterBusy /= value;
00568   dqmEventProcessorBusy /= value;
00569 
00570   return *this;
00571 }
00572 
00573 
00574 void ThroughputMonitorCollection::Stats::reset()
00575 {
00576   snapshots.clear();
00577   Snapshot empty;
00578   average = empty;
00579 }
00580