00001
00003
00004 #include "EventFilter/StorageManager/interface/ThroughputMonitorCollection.h"
00005 #include "EventFilter/StorageManager/interface/Utils.h"
00006
00007 using namespace stor;
00008
00009 ThroughputMonitorCollection::ThroughputMonitorCollection
00010 (
00011 const utils::Duration_t& updateInterval,
00012 const unsigned int& throuphputAveragingCycles
00013 ) :
00014 MonitorCollection(updateInterval),
00015 binCount_(300),
00016 poolUsageMQ_(updateInterval, updateInterval*binCount_),
00017 entriesInFragmentQueueMQ_(updateInterval, updateInterval*binCount_),
00018 memoryUsedInFragmentQueueMQ_(updateInterval, updateInterval*binCount_),
00019 poppedFragmentSizeMQ_(updateInterval, updateInterval*binCount_),
00020 fragmentProcessorIdleTimeMQ_(updateInterval, updateInterval*binCount_),
00021 entriesInFragmentStoreMQ_(updateInterval, updateInterval*binCount_),
00022 memoryUsedInFragmentStoreMQ_(updateInterval, updateInterval*binCount_),
00023 entriesInStreamQueueMQ_(updateInterval, updateInterval*binCount_),
00024 memoryUsedInStreamQueueMQ_(updateInterval, updateInterval*binCount_),
00025 poppedEventSizeMQ_(updateInterval, updateInterval*binCount_),
00026 diskWriterIdleTimeMQ_(updateInterval, updateInterval*binCount_),
00027 diskWriteSizeMQ_(updateInterval, updateInterval*binCount_),
00028 entriesInDQMEventQueueMQ_(updateInterval, updateInterval*binCount_),
00029 memoryUsedInDQMEventQueueMQ_(updateInterval, updateInterval*binCount_),
00030 poppedDQMEventSizeMQ_(updateInterval, updateInterval*binCount_),
00031 dqmEventProcessorIdleTimeMQ_(updateInterval, updateInterval*binCount_),
00032 currentFragmentStoreSize_(0),
00033 currentFragmentStoreMemoryUsedMB_(0),
00034 throuphputAveragingCycles_(throuphputAveragingCycles),
00035 pool_(0)
00036 {}
00037
00038
00039 void ThroughputMonitorCollection::setMemoryPoolPointer(toolbox::mem::Pool* pool)
00040 {
00041 if ( ! pool_)
00042 pool_ = pool;
00043 }
00044
00045
00046 void ThroughputMonitorCollection::addPoppedFragmentSample(double dataSize)
00047 {
00048 poppedFragmentSizeMQ_.addSample(dataSize);
00049 }
00050
00051
00052 void ThroughputMonitorCollection::
00053 addFragmentProcessorIdleSample(utils::Duration_t idleTime)
00054 {
00055 fragmentProcessorIdleTimeMQ_.addSample(utils::durationToSeconds(idleTime));
00056 }
00057
00058
00059 void ThroughputMonitorCollection::addPoppedEventSample(double dataSize)
00060 {
00061 poppedEventSizeMQ_.addSample(dataSize);
00062 }
00063
00064
00065 void ThroughputMonitorCollection::
00066 addDiskWriterIdleSample(utils::Duration_t idleTime)
00067 {
00068 diskWriterIdleTimeMQ_.addSample(utils::durationToSeconds(idleTime));
00069 }
00070
00071
00072 void ThroughputMonitorCollection::addDiskWriteSample(double dataSize)
00073 {
00074 diskWriteSizeMQ_.addSample(dataSize);
00075 }
00076
00077
00078 void ThroughputMonitorCollection::addPoppedDQMEventSample(double dataSize)
00079 {
00080 poppedDQMEventSizeMQ_.addSample(dataSize);
00081 }
00082
00083
00084 void ThroughputMonitorCollection::
00085 addDQMEventProcessorIdleSample(utils::Duration_t idleTime)
00086 {
00087 dqmEventProcessorIdleTimeMQ_.addSample(utils::durationToSeconds(idleTime));
00088 }
00089
00090
00091 void ThroughputMonitorCollection::calcPoolUsage()
00092 {
00093 if (pool_)
00094 {
00095 try {
00096 pool_->lock();
00097 poolUsageMQ_.addSample(pool_->getMemoryUsage().getUsed());
00098 pool_->unlock();
00099 }
00100 catch (...)
00101 {
00102 pool_->unlock();
00103 }
00104 }
00105 poolUsageMQ_.calculateStatistics();
00106 }
00107
00108
00109 void ThroughputMonitorCollection::getStats(Stats& stats) const
00110 {
00111 boost::mutex::scoped_lock sl(statsMutex_);
00112 do_getStats(stats, binCount_);
00113 }
00114
00115
00116 void ThroughputMonitorCollection::getStats(Stats& stats, const unsigned int sampleCount) const
00117 {
00118 boost::mutex::scoped_lock sl(statsMutex_);
00119 do_getStats(stats, sampleCount);
00120 }
00121
00122
00123 void ThroughputMonitorCollection::do_getStats(Stats& stats, const unsigned int sampleCount) const
00124 {
00125 MonitoredQuantity::Stats fqEntryCountMQ, fqMemoryUsedMQ, fragSizeMQ;
00126 MonitoredQuantity::Stats fpIdleMQ, fsEntryCountMQ, fsMemoryUsedMQ;
00127 MonitoredQuantity::Stats sqEntryCountMQ, sqMemoryUsedMQ, eventSizeMQ, dwIdleMQ, diskWriteMQ;
00128 MonitoredQuantity::Stats dqEntryCountMQ, dqMemoryUsedMQ, dqmEventSizeMQ, dqmIdleMQ, poolUsageMQ;
00129 poolUsageMQ_.getStats(poolUsageMQ);
00130 entriesInFragmentQueueMQ_.getStats(fqEntryCountMQ);
00131 memoryUsedInFragmentQueueMQ_.getStats(fqMemoryUsedMQ);
00132 poppedFragmentSizeMQ_.getStats(fragSizeMQ);
00133 fragmentProcessorIdleTimeMQ_.getStats(fpIdleMQ);
00134 entriesInFragmentStoreMQ_.getStats(fsEntryCountMQ);
00135 memoryUsedInFragmentStoreMQ_.getStats(fsMemoryUsedMQ);
00136 entriesInStreamQueueMQ_.getStats(sqEntryCountMQ);
00137 memoryUsedInStreamQueueMQ_.getStats(sqMemoryUsedMQ);
00138 poppedEventSizeMQ_.getStats(eventSizeMQ);
00139 diskWriterIdleTimeMQ_.getStats(dwIdleMQ);
00140 diskWriteSizeMQ_.getStats(diskWriteMQ);
00141 entriesInDQMEventQueueMQ_.getStats(dqEntryCountMQ);
00142 memoryUsedInDQMEventQueueMQ_.getStats(dqMemoryUsedMQ);
00143 poppedDQMEventSizeMQ_.getStats(dqmEventSizeMQ);
00144 dqmEventProcessorIdleTimeMQ_.getStats(dqmIdleMQ);
00145
00146 stats.reset();
00147
00148 smoothIdleTimes(fpIdleMQ);
00149 smoothIdleTimes(dwIdleMQ);
00150 smoothIdleTimes(dqmIdleMQ);
00151
00152 utils::Duration_t relativeTime = fqEntryCountMQ.recentDuration;
00153 const int lowestBin = sampleCount<binCount_ ? binCount_-sampleCount : 0;
00154 for (int idx = (binCount_ - 1); idx >= lowestBin; --idx)
00155 {
00156 utils::Duration_t binDuration = fqEntryCountMQ.recentBinnedDurations[idx];
00157 relativeTime -= binDuration;
00158 if (binDuration < boost::posix_time::milliseconds(10)) continue;
00159
00160 Stats::Snapshot snapshot;
00161
00162 snapshot.duration = binDuration;
00163 snapshot.absoluteTime = fqEntryCountMQ.recentBinnedSnapshotTimes[idx];
00164
00165
00166 snapshot.poolUsage = poolUsageMQ.recentBinnedSampleCounts[idx]>0 ?
00167 poolUsageMQ.recentBinnedValueSums[idx]/poolUsageMQ.recentBinnedSampleCounts[idx] :
00168 0;
00169
00170
00171 snapshot.entriesInFragmentQueue = fqEntryCountMQ.recentBinnedSampleCounts[idx]>0 ?
00172 fqEntryCountMQ.recentBinnedValueSums[idx]/fqEntryCountMQ.recentBinnedSampleCounts[idx] :
00173 0;
00174
00175
00176 snapshot.memoryUsedInFragmentQueue = fqMemoryUsedMQ.recentBinnedSampleCounts[idx]>0 ?
00177 fqMemoryUsedMQ.recentBinnedValueSums[idx]/fqMemoryUsedMQ.recentBinnedSampleCounts[idx] :
00178 0;
00179
00180
00181 getRateAndBandwidth(fragSizeMQ, idx, snapshot.fragmentQueueRate, snapshot.fragmentQueueBandwidth);
00182
00183
00184 snapshot.fragmentStoreSize = fsEntryCountMQ.recentBinnedSampleCounts[idx]>0 ?
00185 fsEntryCountMQ.recentBinnedValueSums[idx]/fsEntryCountMQ.recentBinnedSampleCounts[idx]>0 :
00186 0;
00187
00188
00189 snapshot.fragmentStoreMemoryUsed = fsMemoryUsedMQ.recentBinnedSampleCounts[idx]>0 ?
00190 fsMemoryUsedMQ.recentBinnedValueSums[idx]/fsMemoryUsedMQ.recentBinnedSampleCounts[idx] :
00191 0;
00192
00193
00194 snapshot.entriesInStreamQueue = sqEntryCountMQ.recentBinnedSampleCounts[idx]>0 ?
00195 sqEntryCountMQ.recentBinnedValueSums[idx]/sqEntryCountMQ.recentBinnedSampleCounts[idx]>0 :
00196 0;
00197
00198
00199 snapshot.memoryUsedInStreamQueue = sqMemoryUsedMQ.recentBinnedSampleCounts[idx]>0 ?
00200 sqMemoryUsedMQ.recentBinnedValueSums[idx]/sqMemoryUsedMQ.recentBinnedSampleCounts[idx] :
00201 0;
00202
00203
00204 getRateAndBandwidth(eventSizeMQ, idx, snapshot.streamQueueRate, snapshot.streamQueueBandwidth);
00205
00206
00207 getRateAndBandwidth(diskWriteMQ, idx, snapshot.writtenEventsRate, snapshot.writtenEventsBandwidth);
00208
00209
00210 snapshot.entriesInDQMQueue = dqEntryCountMQ.recentBinnedSampleCounts[idx]>0 ?
00211 dqEntryCountMQ.recentBinnedValueSums[idx]/dqEntryCountMQ.recentBinnedSampleCounts[idx] :
00212 0;
00213
00214
00215 snapshot.memoryUsedInDQMQueue = dqMemoryUsedMQ.recentBinnedSampleCounts[idx]>0 ?
00216 dqMemoryUsedMQ.recentBinnedValueSums[idx]/dqMemoryUsedMQ.recentBinnedSampleCounts[idx] :
00217 0;
00218
00219
00220 getRateAndBandwidth(dqmEventSizeMQ, idx, snapshot.dqmQueueRate, snapshot.dqmQueueBandwidth);
00221
00222
00223 snapshot.fragmentProcessorBusy =
00224 calcBusyPercentage(fpIdleMQ, idx);
00225
00226
00227 snapshot.diskWriterBusy =
00228 calcBusyPercentage(dwIdleMQ, idx);
00229
00230
00231 snapshot.dqmEventProcessorBusy =
00232 calcBusyPercentage(dqmIdleMQ, idx);
00233
00234 stats.average += snapshot;
00235 stats.snapshots.push_back(snapshot);
00236 }
00237
00238 const size_t snapshotCount = stats.snapshots.size();
00239 if (snapshotCount > 0)
00240 {
00241 stats.average /= snapshotCount;
00242 }
00243 }
00244
00245
00246 void ThroughputMonitorCollection::smoothIdleTimes(MonitoredQuantity::Stats& stats) const
00247 {
00248 int index = binCount_ - 1;
00249 while (index >= 0)
00250 {
00251 index = smoothIdleTimesHelper(stats.recentBinnedValueSums,
00252 stats.recentBinnedDurations,
00253 index, index);
00254 }
00255 }
00256
00257
00258 int ThroughputMonitorCollection::smoothIdleTimesHelper
00259 (
00260 std::vector<double>& idleTimes,
00261 std::vector<utils::Duration_t>& durations,
00262 int firstIndex, int lastIndex
00263 ) const
00264 {
00265 int workingSize = lastIndex - firstIndex + 1;
00266 double idleTimeSum = 0;
00267 double durationSum = 0;
00268
00269 for (int idx = firstIndex; idx <= lastIndex; ++idx)
00270 {
00271 idleTimeSum += idleTimes[idx];
00272 durationSum += utils::durationToSeconds(durations[idx]);
00273 }
00274
00275 if (idleTimeSum > durationSum && firstIndex > 0)
00276 {
00277 return smoothIdleTimesHelper(idleTimes, durations, firstIndex-1, lastIndex);
00278 }
00279 else
00280 {
00281 if (lastIndex > firstIndex)
00282 {
00283 for (int idx = firstIndex; idx <= lastIndex; ++idx)
00284 {
00285 idleTimes[idx] = idleTimeSum / workingSize;
00286 durations[idx] = utils::secondsToDuration(durationSum / workingSize);
00287 }
00288 }
00289 return (firstIndex - 1);
00290 }
00291 }
00292
00293
00294 void ThroughputMonitorCollection::getRateAndBandwidth
00295 (
00296 MonitoredQuantity::Stats& stats,
00297 const int& idx,
00298 double& rate,
00299 double& bandwidth
00300 ) const
00301 {
00302 const double recentBinnedDuration = utils::durationToSeconds(stats.recentBinnedDurations[idx]);
00303 if (recentBinnedDuration > 0)
00304 {
00305 rate =
00306 stats.recentBinnedSampleCounts[idx] / recentBinnedDuration;
00307
00308 bandwidth =
00309 stats.recentBinnedValueSums[idx] / (1024*1024)
00310 / recentBinnedDuration;
00311 }
00312 }
00313
00314
00315 double ThroughputMonitorCollection::calcBusyPercentage
00316 (
00317 MonitoredQuantity::Stats& stats,
00318 const int& idx
00319 ) const
00320 {
00321 double busyPercentage;
00322 if (stats.recentBinnedSampleCounts[idx] == 0)
00323 {
00324
00325 busyPercentage = 100;
00326 }
00327 else if (stats.recentBinnedSampleCounts[idx] == 1)
00328 {
00329
00330
00331 busyPercentage = 0;
00332 }
00333 else if (stats.recentBinnedValueSums[idx] <= utils::durationToSeconds(stats.recentBinnedDurations[idx]))
00334 {
00335
00336 busyPercentage = 100.0 * (1.0 - (stats.recentBinnedValueSums[idx] /
00337 utils::durationToSeconds(stats.recentBinnedDurations[idx])));
00338 }
00339 else
00340 {
00341
00342
00343 busyPercentage = 0;
00344 }
00345
00346 return busyPercentage;
00347 }
00348
00349
00350 void ThroughputMonitorCollection::do_calculateStatistics()
00351 {
00352 calcPoolUsage();
00353
00354 if (fragmentQueue_.get() != 0) {
00355 entriesInFragmentQueueMQ_.addSample(fragmentQueue_->size());
00356 memoryUsedInFragmentQueueMQ_.addSample( static_cast<double>(fragmentQueue_->used()) / (1024*1024) );
00357 }
00358 if (streamQueue_.get() != 0) {
00359 entriesInStreamQueueMQ_.addSample(streamQueue_->size());
00360 memoryUsedInStreamQueueMQ_.addSample( static_cast<double>(streamQueue_->used()) / (1024*1024) );
00361 }
00362 if (dqmEventQueue_.get() != 0) {
00363 entriesInDQMEventQueueMQ_.addSample(dqmEventQueue_->size());
00364 memoryUsedInDQMEventQueueMQ_.addSample( static_cast<double>(dqmEventQueue_->used()) / (1024*1024) );
00365 }
00366 entriesInFragmentStoreMQ_.addSample(currentFragmentStoreSize_);
00367 memoryUsedInFragmentStoreMQ_.addSample(currentFragmentStoreMemoryUsedMB_);
00368
00369 entriesInFragmentQueueMQ_.calculateStatistics();
00370 memoryUsedInFragmentQueueMQ_.calculateStatistics();
00371 poppedFragmentSizeMQ_.calculateStatistics();
00372 fragmentProcessorIdleTimeMQ_.calculateStatistics();
00373 entriesInFragmentStoreMQ_.calculateStatistics();
00374 memoryUsedInFragmentStoreMQ_.calculateStatistics();
00375 entriesInStreamQueueMQ_.calculateStatistics();
00376 memoryUsedInStreamQueueMQ_.calculateStatistics();
00377 poppedEventSizeMQ_.calculateStatistics();
00378 diskWriterIdleTimeMQ_.calculateStatistics();
00379 diskWriteSizeMQ_.calculateStatistics();
00380 entriesInDQMEventQueueMQ_.calculateStatistics();
00381 memoryUsedInDQMEventQueueMQ_.calculateStatistics();
00382 poppedDQMEventSizeMQ_.calculateStatistics();
00383 dqmEventProcessorIdleTimeMQ_.calculateStatistics();
00384 }
00385
00386
00387 void ThroughputMonitorCollection::do_reset()
00388 {
00389 poolUsageMQ_.reset();
00390 entriesInFragmentQueueMQ_.reset();
00391 memoryUsedInFragmentQueueMQ_.reset();
00392 poppedFragmentSizeMQ_.reset();
00393 fragmentProcessorIdleTimeMQ_.reset();
00394 entriesInFragmentStoreMQ_.reset();
00395 memoryUsedInFragmentStoreMQ_.reset();
00396 entriesInStreamQueueMQ_.reset();
00397 memoryUsedInStreamQueueMQ_.reset();
00398 poppedEventSizeMQ_.reset();
00399 diskWriterIdleTimeMQ_.reset();
00400 diskWriteSizeMQ_.reset();
00401 entriesInDQMEventQueueMQ_.reset();
00402 memoryUsedInDQMEventQueueMQ_.reset();
00403 poppedDQMEventSizeMQ_.reset();
00404 dqmEventProcessorIdleTimeMQ_.reset();
00405 }
00406
00407
00408 void ThroughputMonitorCollection::do_appendInfoSpaceItems(InfoSpaceItems& infoSpaceItems)
00409 {
00410 infoSpaceItems.push_back(std::make_pair("poolUsage", &poolUsage_));
00411 infoSpaceItems.push_back(std::make_pair("entriesInFragmentQueue", &entriesInFragmentQueue_));
00412 infoSpaceItems.push_back(std::make_pair("memoryUsedInFragmentQueue", &memoryUsedInFragmentQueue_));
00413 infoSpaceItems.push_back(std::make_pair("fragmentQueueRate", &fragmentQueueRate_));
00414 infoSpaceItems.push_back(std::make_pair("fragmentQueueBandwidth", &fragmentQueueBandwidth_));
00415 infoSpaceItems.push_back(std::make_pair("fragmentStoreSize", &fragmentStoreSize_));
00416 infoSpaceItems.push_back(std::make_pair("fragmentStoreMemoryUsed", &fragmentStoreMemoryUsed_));
00417 infoSpaceItems.push_back(std::make_pair("entriesInStreamQueue", &entriesInStreamQueue_));
00418 infoSpaceItems.push_back(std::make_pair("memoryUsedInStreamQueue", &memoryUsedInStreamQueue_));
00419 infoSpaceItems.push_back(std::make_pair("streamQueueRate", &streamQueueRate_));
00420 infoSpaceItems.push_back(std::make_pair("streamQueueBandwidth", &streamQueueBandwidth_));
00421 infoSpaceItems.push_back(std::make_pair("writtenEventsRate", &writtenEventsRate_));
00422 infoSpaceItems.push_back(std::make_pair("writtenEventsBandwidth", &writtenEventsBandwidth_));
00423 infoSpaceItems.push_back(std::make_pair("entriesInDQMQueue", &entriesInDQMQueue_));
00424 infoSpaceItems.push_back(std::make_pair("memoryUsedInDQMQueue", &memoryUsedInDQMQueue_));
00425 infoSpaceItems.push_back(std::make_pair("dqmQueueRate", &dqmQueueRate_));
00426 infoSpaceItems.push_back(std::make_pair("dqmQueueBandwidth", &dqmQueueBandwidth_));
00427 infoSpaceItems.push_back(std::make_pair("fragmentProcessorBusy", &fragmentProcessorBusy_));
00428 infoSpaceItems.push_back(std::make_pair("diskWriterBusy", &diskWriterBusy_));
00429 infoSpaceItems.push_back(std::make_pair("dqmEventProcessorBusy", &dqmEventProcessorBusy_));
00430 infoSpaceItems.push_back(std::make_pair("averagingTime", &averagingTime_));
00431 }
00432
00433
00434 void ThroughputMonitorCollection::do_updateInfoSpaceItems()
00435 {
00436 Stats stats;
00437 getStats(stats, throuphputAveragingCycles_);
00438
00439 poolUsage_ = static_cast<unsigned int>(stats.average.poolUsage);
00440 entriesInFragmentQueue_ = static_cast<unsigned int>(stats.average.entriesInFragmentQueue);
00441 memoryUsedInFragmentQueue_ = stats.average.memoryUsedInFragmentQueue;
00442 fragmentQueueRate_ = stats.average.fragmentQueueRate;
00443 fragmentQueueBandwidth_ = stats.average.fragmentQueueBandwidth;
00444 fragmentStoreSize_ = static_cast<unsigned int>(stats.average.fragmentStoreSize);
00445 fragmentStoreMemoryUsed_ = stats.average.fragmentStoreMemoryUsed;
00446 entriesInStreamQueue_ = static_cast<unsigned int>(stats.average.entriesInStreamQueue);
00447 memoryUsedInStreamQueue_ = stats.average.memoryUsedInStreamQueue;
00448 streamQueueRate_ = stats.average.streamQueueRate;
00449 streamQueueBandwidth_ = stats.average.streamQueueBandwidth;
00450 writtenEventsRate_ = stats.average.writtenEventsRate;
00451 writtenEventsBandwidth_ = stats.average.writtenEventsBandwidth;
00452 entriesInDQMQueue_ = static_cast<unsigned int>(stats.average.entriesInDQMQueue);
00453 memoryUsedInDQMQueue_ = stats.average.memoryUsedInDQMQueue;
00454 dqmQueueRate_ = stats.average.dqmQueueRate;
00455 dqmQueueBandwidth_ = stats.average.dqmQueueBandwidth;
00456 fragmentProcessorBusy_ = stats.average.fragmentProcessorBusy;
00457 diskWriterBusy_ = stats.average.diskWriterBusy;
00458 dqmEventProcessorBusy_ = stats.average.dqmEventProcessorBusy;
00459 averagingTime_ = utils::durationToSeconds(stats.average.duration);
00460 }
00461
00462
00463 ThroughputMonitorCollection::Stats::Snapshot::Snapshot() :
00464 duration(boost::posix_time::seconds(0)),
00465 poolUsage(0),
00466 entriesInFragmentQueue(0),
00467 memoryUsedInFragmentQueue(0),
00468 fragmentQueueRate(0),
00469 fragmentQueueBandwidth(0),
00470 fragmentStoreSize(0),
00471 fragmentStoreMemoryUsed(0),
00472 entriesInStreamQueue(0),
00473 memoryUsedInStreamQueue(0),
00474 streamQueueRate(0),
00475 streamQueueBandwidth(0),
00476 writtenEventsRate(0),
00477 writtenEventsBandwidth(0),
00478 entriesInDQMQueue(0),
00479 memoryUsedInDQMQueue(0),
00480 dqmQueueRate(0),
00481 dqmQueueBandwidth(0),
00482 fragmentProcessorBusy(0),
00483 diskWriterBusy(0),
00484 dqmEventProcessorBusy(0)
00485 {}
00486
00487
00488 ThroughputMonitorCollection::Stats::Snapshot
00489 ThroughputMonitorCollection::Stats::Snapshot::operator=(const Snapshot& other)
00490 {
00491 duration = other.duration;
00492 poolUsage = other.poolUsage;
00493 entriesInFragmentQueue = other.entriesInFragmentQueue;
00494 memoryUsedInFragmentQueue = other.memoryUsedInFragmentQueue;
00495 fragmentQueueRate = other.fragmentQueueRate;
00496 fragmentQueueBandwidth = other.fragmentQueueBandwidth;
00497 fragmentStoreSize = other.fragmentStoreSize;
00498 fragmentStoreMemoryUsed = other.fragmentStoreMemoryUsed;
00499 entriesInStreamQueue = other.entriesInStreamQueue;
00500 memoryUsedInStreamQueue = other.memoryUsedInStreamQueue;
00501 streamQueueRate = other.streamQueueRate;
00502 streamQueueBandwidth = other.streamQueueBandwidth;
00503 writtenEventsRate = other.writtenEventsRate;
00504 writtenEventsBandwidth = other.writtenEventsBandwidth;
00505 entriesInDQMQueue = other.entriesInDQMQueue;
00506 memoryUsedInDQMQueue = other.memoryUsedInDQMQueue;
00507 dqmQueueRate = other.dqmQueueRate;
00508 dqmQueueBandwidth = other.dqmQueueBandwidth;
00509 fragmentProcessorBusy = other.fragmentProcessorBusy;
00510 diskWriterBusy = other.diskWriterBusy;
00511 dqmEventProcessorBusy = other.dqmEventProcessorBusy;
00512
00513 return *this;
00514 }
00515
00516
00517 ThroughputMonitorCollection::Stats::Snapshot
00518 ThroughputMonitorCollection::Stats::Snapshot::operator+=(const Snapshot& other)
00519 {
00520 duration += other.duration;
00521 poolUsage += other.poolUsage;
00522 entriesInFragmentQueue += other.entriesInFragmentQueue;
00523 memoryUsedInFragmentQueue += other.memoryUsedInFragmentQueue;
00524 fragmentQueueRate += other.fragmentQueueRate;
00525 fragmentQueueBandwidth += other.fragmentQueueBandwidth;
00526 fragmentStoreSize += other.fragmentStoreSize;
00527 fragmentStoreMemoryUsed += other.fragmentStoreMemoryUsed;
00528 entriesInStreamQueue += other.entriesInStreamQueue;
00529 memoryUsedInStreamQueue += other.memoryUsedInStreamQueue;
00530 streamQueueRate += other.streamQueueRate;
00531 streamQueueBandwidth += other.streamQueueBandwidth;
00532 writtenEventsRate += other.writtenEventsRate;
00533 writtenEventsBandwidth += other.writtenEventsBandwidth;
00534 entriesInDQMQueue += other.entriesInDQMQueue;
00535 memoryUsedInDQMQueue += other.memoryUsedInDQMQueue;
00536 dqmQueueRate += other.dqmQueueRate;
00537 dqmQueueBandwidth += other.dqmQueueBandwidth;
00538 fragmentProcessorBusy += other.fragmentProcessorBusy;
00539 diskWriterBusy += other.diskWriterBusy;
00540 dqmEventProcessorBusy += other.dqmEventProcessorBusy;
00541
00542 return *this;
00543 }
00544
00545
00546 ThroughputMonitorCollection::Stats::Snapshot
00547 ThroughputMonitorCollection::Stats::Snapshot::operator/=(const double& value)
00548 {
00549 poolUsage /= value;
00550 entriesInFragmentQueue /= value;
00551 memoryUsedInFragmentQueue /= value;
00552 fragmentQueueRate /= value;
00553 fragmentQueueBandwidth /= value;
00554 fragmentStoreSize /= value;
00555 fragmentStoreMemoryUsed /= value;
00556 entriesInStreamQueue /= value;
00557 memoryUsedInStreamQueue /= value;
00558 streamQueueRate /= value;
00559 streamQueueBandwidth /= value;
00560 writtenEventsRate /= value;
00561 writtenEventsBandwidth /= value;
00562 entriesInDQMQueue /= value;
00563 memoryUsedInDQMQueue /= value;
00564 dqmQueueRate /= value;
00565 dqmQueueBandwidth /= value;
00566 fragmentProcessorBusy /= value;
00567 diskWriterBusy /= value;
00568 dqmEventProcessorBusy /= value;
00569
00570 return *this;
00571 }
00572
00573
00574 void ThroughputMonitorCollection::Stats::reset()
00575 {
00576 snapshots.clear();
00577 Snapshot empty;
00578 average = empty;
00579 }
00580