CMS 3D CMS Logo

Classes | Public Member Functions | Private Member Functions | Private Attributes

stor::ThroughputMonitorCollection Class Reference

#include <ThroughputMonitorCollection.h>

Inheritance diagram for stor::ThroughputMonitorCollection:
stor::MonitorCollection

List of all members.

Classes

struct  Stats

Public Member Functions

void addDiskWriterIdleSample (utils::Duration_t idleTime)
void addDiskWriteSample (double dataSize)
void addDQMEventProcessorIdleSample (utils::Duration_t idleTime)
void addFragmentProcessorIdleSample (utils::Duration_t idleTime)
void addPoppedDQMEventSample (double dataSize)
void addPoppedEventSample (double dataSize)
void addPoppedFragmentSample (double dataSize)
int getBinCount () const
const MonitoredQuantitygetDiskWriteMQ () const
MonitoredQuantitygetDiskWriteMQ ()
const MonitoredQuantitygetDiskWriterIdleMQ () const
MonitoredQuantitygetDiskWriterIdleMQ ()
const MonitoredQuantitygetDQMEventProcessorIdleMQ () const
MonitoredQuantitygetDQMEventProcessorIdleMQ ()
const MonitoredQuantitygetDQMEventQueueEntryCountMQ () const
MonitoredQuantitygetDQMEventQueueEntryCountMQ ()
const MonitoredQuantitygetDQMEventQueueMemoryUsedMQ () const
MonitoredQuantitygetDQMEventQueueMemoryUsedMQ ()
MonitoredQuantitygetFragmentProcessorIdleMQ ()
const MonitoredQuantitygetFragmentProcessorIdleMQ () const
MonitoredQuantitygetFragmentQueueEntryCountMQ ()
const MonitoredQuantitygetFragmentQueueEntryCountMQ () const
const MonitoredQuantitygetFragmentQueueMemoryUsedMQ () const
MonitoredQuantitygetFragmentQueueMemoryUsedMQ ()
const MonitoredQuantitygetFragmentStoreEntryCountMQ () const
MonitoredQuantitygetFragmentStoreEntryCountMQ ()
const MonitoredQuantitygetFragmentStoreMemoryUsedMQ () const
MonitoredQuantitygetFragmentStoreMemoryUsedMQ ()
const MonitoredQuantitygetPoolUsageMQ () const
MonitoredQuantitygetPoolUsageMQ ()
MonitoredQuantitygetPoppedDQMEventSizeMQ ()
const MonitoredQuantitygetPoppedDQMEventSizeMQ () const
const MonitoredQuantitygetPoppedEventSizeMQ () const
MonitoredQuantitygetPoppedEventSizeMQ ()
const MonitoredQuantitygetPoppedFragmentSizeMQ () const
MonitoredQuantitygetPoppedFragmentSizeMQ ()
void getStats (Stats &, const unsigned int sampleCount) const
void getStats (Stats &) const
MonitoredQuantitygetStreamQueueEntryCountMQ ()
const MonitoredQuantitygetStreamQueueEntryCountMQ () const
const MonitoredQuantitygetStreamQueueMemoryUsedMQ () const
MonitoredQuantitygetStreamQueueMemoryUsedMQ ()
void setDQMEventQueue (DQMEventQueuePtr dqmEventQueue)
void setFragmentQueue (FragmentQueuePtr fragmentQueue)
void setFragmentStoreMemoryUsed (size_t memoryUsed)
void setFragmentStoreSize (unsigned int size)
void setMemoryPoolPointer (toolbox::mem::Pool *)
void setStreamQueue (StreamQueuePtr streamQueue)
 ThroughputMonitorCollection (const utils::Duration_t &updateInterval, const unsigned int &throuphputAveragingCycles)

Private Member Functions

double calcBusyPercentage (MonitoredQuantity::Stats &, const int &idx) const
void calcPoolUsage ()
virtual void do_appendInfoSpaceItems (InfoSpaceItems &)
virtual void do_calculateStatistics ()
void do_getStats (Stats &, const unsigned int sampleCount) const
virtual void do_reset ()
virtual void do_updateInfoSpaceItems ()
void getRateAndBandwidth (MonitoredQuantity::Stats &stats, const int &idx, double &rate, double &bandwidth) const
ThroughputMonitorCollectionoperator= (ThroughputMonitorCollection const &)
void smoothIdleTimes (MonitoredQuantity::Stats &) const
int smoothIdleTimesHelper (std::vector< double > &idleTimes, std::vector< utils::Duration_t > &durations, int firstIndex, int lastIndex) const
 ThroughputMonitorCollection (ThroughputMonitorCollection const &)

Private Attributes

xdata::Double averagingTime_
const unsigned int binCount_
double currentFragmentStoreMemoryUsedMB_
unsigned int currentFragmentStoreSize_
xdata::Double diskWriterBusy_
MonitoredQuantity diskWriterIdleTimeMQ_
MonitoredQuantity diskWriteSizeMQ_
xdata::Double dqmEventProcessorBusy_
MonitoredQuantity dqmEventProcessorIdleTimeMQ_
DQMEventQueuePtr dqmEventQueue_
xdata::Double dqmQueueBandwidth_
xdata::Double dqmQueueRate_
MonitoredQuantity entriesInDQMEventQueueMQ_
xdata::UnsignedInteger32 entriesInDQMQueue_
xdata::UnsignedInteger32 entriesInFragmentQueue_
MonitoredQuantity entriesInFragmentQueueMQ_
MonitoredQuantity entriesInFragmentStoreMQ_
xdata::UnsignedInteger32 entriesInStreamQueue_
MonitoredQuantity entriesInStreamQueueMQ_
xdata::Double fragmentProcessorBusy_
MonitoredQuantity fragmentProcessorIdleTimeMQ_
FragmentQueuePtr fragmentQueue_
xdata::Double fragmentQueueBandwidth_
xdata::Double fragmentQueueRate_
xdata::Double fragmentStoreMemoryUsed_
xdata::UnsignedInteger32 fragmentStoreSize_
MonitoredQuantity memoryUsedInDQMEventQueueMQ_
xdata::Double memoryUsedInDQMQueue_
xdata::Double memoryUsedInFragmentQueue_
MonitoredQuantity memoryUsedInFragmentQueueMQ_
MonitoredQuantity memoryUsedInFragmentStoreMQ_
xdata::Double memoryUsedInStreamQueue_
MonitoredQuantity memoryUsedInStreamQueueMQ_
toolbox::mem::Pool * pool_
xdata::UnsignedInteger32 poolUsage_
MonitoredQuantity poolUsageMQ_
MonitoredQuantity poppedDQMEventSizeMQ_
MonitoredQuantity poppedEventSizeMQ_
MonitoredQuantity poppedFragmentSizeMQ_
boost::mutex statsMutex_
StreamQueuePtr streamQueue_
xdata::Double streamQueueBandwidth_
xdata::Double streamQueueRate_
unsigned int throuphputAveragingCycles_
xdata::Double writtenEventsBandwidth_
xdata::Double writtenEventsRate_

Detailed Description

A collection of MonitoredQuantities to track the flow of data through the storage manager.

Author:
mommsen
Revision:
1.19
Date:
2011/03/07 15:31:32

Definition at line 31 of file ThroughputMonitorCollection.h.


Constructor & Destructor Documentation

ThroughputMonitorCollection::ThroughputMonitorCollection ( const utils::Duration_t updateInterval,
const unsigned int &  throuphputAveragingCycles 
) [explicit]

Definition at line 10 of file ThroughputMonitorCollection.cc.

  :
  MonitorCollection(updateInterval),
  binCount_(300),
  poolUsageMQ_(updateInterval, updateInterval*binCount_),
  entriesInFragmentQueueMQ_(updateInterval, updateInterval*binCount_),
  memoryUsedInFragmentQueueMQ_(updateInterval, updateInterval*binCount_),
  poppedFragmentSizeMQ_(updateInterval, updateInterval*binCount_),
  fragmentProcessorIdleTimeMQ_(updateInterval, updateInterval*binCount_),
  entriesInFragmentStoreMQ_(updateInterval, updateInterval*binCount_),
  memoryUsedInFragmentStoreMQ_(updateInterval, updateInterval*binCount_),
  entriesInStreamQueueMQ_(updateInterval, updateInterval*binCount_),
  memoryUsedInStreamQueueMQ_(updateInterval, updateInterval*binCount_),
  poppedEventSizeMQ_(updateInterval, updateInterval*binCount_),
  diskWriterIdleTimeMQ_(updateInterval, updateInterval*binCount_),
  diskWriteSizeMQ_(updateInterval, updateInterval*binCount_),
  entriesInDQMEventQueueMQ_(updateInterval, updateInterval*binCount_),
  memoryUsedInDQMEventQueueMQ_(updateInterval, updateInterval*binCount_),
  poppedDQMEventSizeMQ_(updateInterval, updateInterval*binCount_),
  dqmEventProcessorIdleTimeMQ_(updateInterval, updateInterval*binCount_),
  currentFragmentStoreSize_(0),
  currentFragmentStoreMemoryUsedMB_(0),
  throuphputAveragingCycles_(throuphputAveragingCycles),
  pool_(0)
{}
stor::ThroughputMonitorCollection::ThroughputMonitorCollection ( ThroughputMonitorCollection const &  ) [private]

Member Function Documentation

void ThroughputMonitorCollection::addDiskWriterIdleSample ( utils::Duration_t  idleTime)
void ThroughputMonitorCollection::addDiskWriteSample ( double  dataSize)
void ThroughputMonitorCollection::addDQMEventProcessorIdleSample ( utils::Duration_t  idleTime)
void ThroughputMonitorCollection::addFragmentProcessorIdleSample ( utils::Duration_t  idleTime)
void ThroughputMonitorCollection::addPoppedDQMEventSample ( double  dataSize)
void ThroughputMonitorCollection::addPoppedEventSample ( double  dataSize)
void ThroughputMonitorCollection::addPoppedFragmentSample ( double  dataSize)
double ThroughputMonitorCollection::calcBusyPercentage ( MonitoredQuantity::Stats stats,
const int &  idx 
) const [private]

Definition at line 316 of file ThroughputMonitorCollection.cc.

References stor::utils::durationToSeconds(), stor::MonitoredQuantity::Stats::recentBinnedDurations, stor::MonitoredQuantity::Stats::recentBinnedSampleCounts, and stor::MonitoredQuantity::Stats::recentBinnedValueSums.

Referenced by do_getStats().

{
  double busyPercentage;
  if (stats.recentBinnedSampleCounts[idx] == 0)
  {
    // the thread did not log any idle time
    busyPercentage = 100;
  }
  else if (stats.recentBinnedSampleCounts[idx] == 1)
  {
    // only one sample means that we waited a whole second on a queue
    // this should only happen if deq_timed_wait timeout >= statistics calculation period
    busyPercentage = 0;
  }
  else if (stats.recentBinnedValueSums[idx] <= utils::durationToSeconds(stats.recentBinnedDurations[idx]))
  {
    // the thread was busy while it was not idle during the whole reporting duration
    busyPercentage = 100.0 * (1.0 - (stats.recentBinnedValueSums[idx] /
        utils::durationToSeconds(stats.recentBinnedDurations[idx])));
  }
  else
  {
    // the process logged more idle time than the whole reporting duration
    // this can happen due to rounding issues.
    busyPercentage = 0;
  }

  return busyPercentage;
}
void ThroughputMonitorCollection::calcPoolUsage ( ) [private]

Definition at line 91 of file ThroughputMonitorCollection.cc.

References stor::MonitoredQuantity::addSample(), stor::MonitoredQuantity::calculateStatistics(), pool_, and poolUsageMQ_.

Referenced by do_calculateStatistics().

{
  if (pool_)
  {
    try {
      pool_->lock();
      poolUsageMQ_.addSample(pool_->getMemoryUsage().getUsed());
      pool_->unlock();
    }
    catch (...)
    {
      pool_->unlock();
    }
  }
  poolUsageMQ_.calculateStatistics();
}
void ThroughputMonitorCollection::do_appendInfoSpaceItems ( InfoSpaceItems infoSpaceItems) [private, virtual]

Reimplemented from stor::MonitorCollection.

Definition at line 408 of file ThroughputMonitorCollection.cc.

References averagingTime_, diskWriterBusy_, dqmEventProcessorBusy_, dqmQueueBandwidth_, dqmQueueRate_, entriesInDQMQueue_, entriesInFragmentQueue_, entriesInStreamQueue_, fragmentProcessorBusy_, fragmentQueueBandwidth_, fragmentQueueRate_, fragmentStoreMemoryUsed_, fragmentStoreSize_, memoryUsedInDQMQueue_, memoryUsedInFragmentQueue_, memoryUsedInStreamQueue_, poolUsage_, streamQueueBandwidth_, streamQueueRate_, writtenEventsBandwidth_, and writtenEventsRate_.

{
  infoSpaceItems.push_back(std::make_pair("poolUsage", &poolUsage_));
  infoSpaceItems.push_back(std::make_pair("entriesInFragmentQueue", &entriesInFragmentQueue_));
  infoSpaceItems.push_back(std::make_pair("memoryUsedInFragmentQueue", &memoryUsedInFragmentQueue_));
  infoSpaceItems.push_back(std::make_pair("fragmentQueueRate", &fragmentQueueRate_));
  infoSpaceItems.push_back(std::make_pair("fragmentQueueBandwidth", &fragmentQueueBandwidth_));
  infoSpaceItems.push_back(std::make_pair("fragmentStoreSize", &fragmentStoreSize_));
  infoSpaceItems.push_back(std::make_pair("fragmentStoreMemoryUsed", &fragmentStoreMemoryUsed_));
  infoSpaceItems.push_back(std::make_pair("entriesInStreamQueue", &entriesInStreamQueue_));
  infoSpaceItems.push_back(std::make_pair("memoryUsedInStreamQueue", &memoryUsedInStreamQueue_));
  infoSpaceItems.push_back(std::make_pair("streamQueueRate", &streamQueueRate_));
  infoSpaceItems.push_back(std::make_pair("streamQueueBandwidth", &streamQueueBandwidth_));
  infoSpaceItems.push_back(std::make_pair("writtenEventsRate", &writtenEventsRate_));
  infoSpaceItems.push_back(std::make_pair("writtenEventsBandwidth", &writtenEventsBandwidth_));
  infoSpaceItems.push_back(std::make_pair("entriesInDQMQueue", &entriesInDQMQueue_));
  infoSpaceItems.push_back(std::make_pair("memoryUsedInDQMQueue", &memoryUsedInDQMQueue_));
  infoSpaceItems.push_back(std::make_pair("dqmQueueRate", &dqmQueueRate_));
  infoSpaceItems.push_back(std::make_pair("dqmQueueBandwidth", &dqmQueueBandwidth_));
  infoSpaceItems.push_back(std::make_pair("fragmentProcessorBusy", &fragmentProcessorBusy_));
  infoSpaceItems.push_back(std::make_pair("diskWriterBusy", &diskWriterBusy_));
  infoSpaceItems.push_back(std::make_pair("dqmEventProcessorBusy", &dqmEventProcessorBusy_));
  infoSpaceItems.push_back(std::make_pair("averagingTime", &averagingTime_));
}
void ThroughputMonitorCollection::do_calculateStatistics ( ) [private, virtual]

Implements stor::MonitorCollection.

Definition at line 350 of file ThroughputMonitorCollection.cc.

References stor::MonitoredQuantity::addSample(), calcPoolUsage(), stor::MonitoredQuantity::calculateStatistics(), currentFragmentStoreMemoryUsedMB_, currentFragmentStoreSize_, diskWriterIdleTimeMQ_, diskWriteSizeMQ_, dqmEventProcessorIdleTimeMQ_, dqmEventQueue_, entriesInDQMEventQueueMQ_, entriesInFragmentQueueMQ_, entriesInFragmentStoreMQ_, entriesInStreamQueueMQ_, fragmentProcessorIdleTimeMQ_, fragmentQueue_, memoryUsedInDQMEventQueueMQ_, memoryUsedInFragmentQueueMQ_, memoryUsedInFragmentStoreMQ_, memoryUsedInStreamQueueMQ_, poppedDQMEventSizeMQ_, poppedEventSizeMQ_, poppedFragmentSizeMQ_, and streamQueue_.

void ThroughputMonitorCollection::do_getStats ( Stats stats,
const unsigned int  sampleCount 
) const [private]

Definition at line 123 of file ThroughputMonitorCollection.cc.

References stor::ThroughputMonitorCollection::Stats::Snapshot::absoluteTime, stor::ThroughputMonitorCollection::Stats::average, binCount_, calcBusyPercentage(), stor::ThroughputMonitorCollection::Stats::Snapshot::diskWriterBusy, diskWriterIdleTimeMQ_, diskWriteSizeMQ_, stor::ThroughputMonitorCollection::Stats::Snapshot::dqmEventProcessorBusy, dqmEventProcessorIdleTimeMQ_, stor::ThroughputMonitorCollection::Stats::Snapshot::dqmQueueBandwidth, stor::ThroughputMonitorCollection::Stats::Snapshot::dqmQueueRate, stor::ThroughputMonitorCollection::Stats::Snapshot::duration, entriesInDQMEventQueueMQ_, stor::ThroughputMonitorCollection::Stats::Snapshot::entriesInDQMQueue, stor::ThroughputMonitorCollection::Stats::Snapshot::entriesInFragmentQueue, entriesInFragmentQueueMQ_, entriesInFragmentStoreMQ_, stor::ThroughputMonitorCollection::Stats::Snapshot::entriesInStreamQueue, entriesInStreamQueueMQ_, stor::ThroughputMonitorCollection::Stats::Snapshot::fragmentProcessorBusy, fragmentProcessorIdleTimeMQ_, stor::ThroughputMonitorCollection::Stats::Snapshot::fragmentQueueBandwidth, stor::ThroughputMonitorCollection::Stats::Snapshot::fragmentQueueRate, stor::ThroughputMonitorCollection::Stats::Snapshot::fragmentStoreMemoryUsed, stor::ThroughputMonitorCollection::Stats::Snapshot::fragmentStoreSize, getRateAndBandwidth(), stor::MonitoredQuantity::getStats(), memoryUsedInDQMEventQueueMQ_, stor::ThroughputMonitorCollection::Stats::Snapshot::memoryUsedInDQMQueue, stor::ThroughputMonitorCollection::Stats::Snapshot::memoryUsedInFragmentQueue, memoryUsedInFragmentQueueMQ_, memoryUsedInFragmentStoreMQ_, stor::ThroughputMonitorCollection::Stats::Snapshot::memoryUsedInStreamQueue, memoryUsedInStreamQueueMQ_, stor::ThroughputMonitorCollection::Stats::Snapshot::poolUsage, poolUsageMQ_, poppedDQMEventSizeMQ_, poppedEventSizeMQ_, poppedFragmentSizeMQ_, stor::MonitoredQuantity::Stats::recentBinnedDurations, stor::MonitoredQuantity::Stats::recentBinnedSampleCounts, stor::MonitoredQuantity::Stats::recentBinnedSnapshotTimes, stor::MonitoredQuantity::Stats::recentBinnedValueSums, stor::MonitoredQuantity::Stats::recentDuration, stor::ThroughputMonitorCollection::Stats::reset(), smoothIdleTimes(), stor::ThroughputMonitorCollection::Stats::snapshots, stor::ThroughputMonitorCollection::Stats::Snapshot::streamQueueBandwidth, stor::ThroughputMonitorCollection::Stats::Snapshot::streamQueueRate, stor::ThroughputMonitorCollection::Stats::Snapshot::writtenEventsBandwidth, and stor::ThroughputMonitorCollection::Stats::Snapshot::writtenEventsRate.

Referenced by getStats().

{
  MonitoredQuantity::Stats fqEntryCountMQ, fqMemoryUsedMQ, fragSizeMQ;
  MonitoredQuantity::Stats fpIdleMQ, fsEntryCountMQ, fsMemoryUsedMQ;
  MonitoredQuantity::Stats sqEntryCountMQ, sqMemoryUsedMQ, eventSizeMQ, dwIdleMQ, diskWriteMQ;
  MonitoredQuantity::Stats dqEntryCountMQ, dqMemoryUsedMQ, dqmEventSizeMQ, dqmIdleMQ, poolUsageMQ;
  poolUsageMQ_.getStats(poolUsageMQ);
  entriesInFragmentQueueMQ_.getStats(fqEntryCountMQ);
  memoryUsedInFragmentQueueMQ_.getStats(fqMemoryUsedMQ);
  poppedFragmentSizeMQ_.getStats(fragSizeMQ);
  fragmentProcessorIdleTimeMQ_.getStats(fpIdleMQ);
  entriesInFragmentStoreMQ_.getStats(fsEntryCountMQ);
  memoryUsedInFragmentStoreMQ_.getStats(fsMemoryUsedMQ);
  entriesInStreamQueueMQ_.getStats(sqEntryCountMQ);
  memoryUsedInStreamQueueMQ_.getStats(sqMemoryUsedMQ);
  poppedEventSizeMQ_.getStats(eventSizeMQ);
  diskWriterIdleTimeMQ_.getStats(dwIdleMQ);
  diskWriteSizeMQ_.getStats(diskWriteMQ);
  entriesInDQMEventQueueMQ_.getStats(dqEntryCountMQ);
  memoryUsedInDQMEventQueueMQ_.getStats(dqMemoryUsedMQ);
  poppedDQMEventSizeMQ_.getStats(dqmEventSizeMQ);
  dqmEventProcessorIdleTimeMQ_.getStats(dqmIdleMQ);

  stats.reset();

  smoothIdleTimes(fpIdleMQ);
  smoothIdleTimes(dwIdleMQ);
  smoothIdleTimes(dqmIdleMQ);

  utils::Duration_t relativeTime = fqEntryCountMQ.recentDuration;
  const int lowestBin = sampleCount<binCount_ ? binCount_-sampleCount : 0;
  for (int idx = (binCount_ - 1); idx >= lowestBin; --idx)
  {
    utils::Duration_t binDuration = fqEntryCountMQ.recentBinnedDurations[idx];
    relativeTime -= binDuration;
    if (binDuration < boost::posix_time::milliseconds(10)) continue; //avoid very short durations

    Stats::Snapshot snapshot;

    snapshot.duration = binDuration;
    snapshot.absoluteTime = fqEntryCountMQ.recentBinnedSnapshotTimes[idx];

    // memory pool usage
    snapshot.poolUsage = poolUsageMQ.recentBinnedSampleCounts[idx]>0 ? 
      poolUsageMQ.recentBinnedValueSums[idx]/poolUsageMQ.recentBinnedSampleCounts[idx] :
      0;

    // number of fragments in fragment queue
    snapshot.entriesInFragmentQueue = fqEntryCountMQ.recentBinnedSampleCounts[idx]>0 ?
      fqEntryCountMQ.recentBinnedValueSums[idx]/fqEntryCountMQ.recentBinnedSampleCounts[idx] :
      0;

    // memory usage in fragment queue
    snapshot.memoryUsedInFragmentQueue = fqMemoryUsedMQ.recentBinnedSampleCounts[idx]>0 ?
      fqMemoryUsedMQ.recentBinnedValueSums[idx]/fqMemoryUsedMQ.recentBinnedSampleCounts[idx] :
      0;

    // rate/bandwidth of fragments popped from fragment queue
    getRateAndBandwidth(fragSizeMQ, idx, snapshot.fragmentQueueRate, snapshot.fragmentQueueBandwidth);

    // number of events in fragment store
    snapshot.fragmentStoreSize = fsEntryCountMQ.recentBinnedSampleCounts[idx]>0 ?
      fsEntryCountMQ.recentBinnedValueSums[idx]/fsEntryCountMQ.recentBinnedSampleCounts[idx]>0 :
      0;

    // memory usage in fragment store
    snapshot.fragmentStoreMemoryUsed = fsMemoryUsedMQ.recentBinnedSampleCounts[idx]>0 ?
      fsMemoryUsedMQ.recentBinnedValueSums[idx]/fsMemoryUsedMQ.recentBinnedSampleCounts[idx] :
      0;

    // number of events in stream queue
    snapshot.entriesInStreamQueue = sqEntryCountMQ.recentBinnedSampleCounts[idx]>0 ?
      sqEntryCountMQ.recentBinnedValueSums[idx]/sqEntryCountMQ.recentBinnedSampleCounts[idx]>0 :
      0;

    // memory usage in stream queue
    snapshot.memoryUsedInStreamQueue = sqMemoryUsedMQ.recentBinnedSampleCounts[idx]>0 ?
      sqMemoryUsedMQ.recentBinnedValueSums[idx]/sqMemoryUsedMQ.recentBinnedSampleCounts[idx] :
      0;

    // rate/bandwidth of events popped from stream queue
    getRateAndBandwidth(eventSizeMQ, idx, snapshot.streamQueueRate, snapshot.streamQueueBandwidth);

    // rate/bandwidth of events written to disk
    getRateAndBandwidth(diskWriteMQ, idx, snapshot.writtenEventsRate, snapshot.writtenEventsBandwidth);

    // number of dqm events in DQMEvent queue
    snapshot.entriesInDQMQueue = dqEntryCountMQ.recentBinnedSampleCounts[idx]>0 ?
      dqEntryCountMQ.recentBinnedValueSums[idx]/dqEntryCountMQ.recentBinnedSampleCounts[idx] :
      0;

    // memory usage in DQMEvent queue
    snapshot.memoryUsedInDQMQueue = dqMemoryUsedMQ.recentBinnedSampleCounts[idx]>0 ?
      dqMemoryUsedMQ.recentBinnedValueSums[idx]/dqMemoryUsedMQ.recentBinnedSampleCounts[idx] :
      0;

    // rate/bandwidth of dqm events popped from DQMEvent queue
    getRateAndBandwidth(dqmEventSizeMQ, idx, snapshot.dqmQueueRate, snapshot.dqmQueueBandwidth);

    // fragment processor thread busy percentage
    snapshot.fragmentProcessorBusy =
      calcBusyPercentage(fpIdleMQ, idx);

    // disk writer thread busy percentage
    snapshot.diskWriterBusy =
      calcBusyPercentage(dwIdleMQ, idx);

    // DQMEvent processor thread busy percentage
    snapshot.dqmEventProcessorBusy =
      calcBusyPercentage(dqmIdleMQ, idx);

    stats.average += snapshot;
    stats.snapshots.push_back(snapshot);
  }

  const size_t snapshotCount = stats.snapshots.size();
  if (snapshotCount > 0)
  {
    stats.average /= snapshotCount;
  }
}
void ThroughputMonitorCollection::do_reset ( ) [private, virtual]
void ThroughputMonitorCollection::do_updateInfoSpaceItems ( ) [private, virtual]

Reimplemented from stor::MonitorCollection.

Definition at line 434 of file ThroughputMonitorCollection.cc.

References stor::ThroughputMonitorCollection::Stats::average, averagingTime_, diskWriterBusy_, dqmEventProcessorBusy_, dqmQueueBandwidth_, dqmQueueRate_, stor::utils::durationToSeconds(), entriesInDQMQueue_, entriesInFragmentQueue_, entriesInStreamQueue_, fragmentProcessorBusy_, fragmentQueueBandwidth_, fragmentQueueRate_, fragmentStoreMemoryUsed_, fragmentStoreSize_, getStats(), memoryUsedInDQMQueue_, memoryUsedInFragmentQueue_, memoryUsedInStreamQueue_, poolUsage_, streamQueueBandwidth_, streamQueueRate_, throuphputAveragingCycles_, writtenEventsBandwidth_, and writtenEventsRate_.

{
  Stats stats;
  getStats(stats, throuphputAveragingCycles_);

  poolUsage_ = static_cast<unsigned int>(stats.average.poolUsage);
  entriesInFragmentQueue_ = static_cast<unsigned int>(stats.average.entriesInFragmentQueue);
  memoryUsedInFragmentQueue_ = stats.average.memoryUsedInFragmentQueue;
  fragmentQueueRate_ = stats.average.fragmentQueueRate;
  fragmentQueueBandwidth_ = stats.average.fragmentQueueBandwidth;
  fragmentStoreSize_ = static_cast<unsigned int>(stats.average.fragmentStoreSize);
  fragmentStoreMemoryUsed_ = stats.average.fragmentStoreMemoryUsed;
  entriesInStreamQueue_ = static_cast<unsigned int>(stats.average.entriesInStreamQueue);
  memoryUsedInStreamQueue_ = stats.average.memoryUsedInStreamQueue;
  streamQueueRate_ = stats.average.streamQueueRate;
  streamQueueBandwidth_ = stats.average.streamQueueBandwidth;
  writtenEventsRate_ = stats.average.writtenEventsRate;
  writtenEventsBandwidth_ = stats.average.writtenEventsBandwidth;
  entriesInDQMQueue_ = static_cast<unsigned int>(stats.average.entriesInDQMQueue);
  memoryUsedInDQMQueue_ = stats.average.memoryUsedInDQMQueue;
  dqmQueueRate_ = stats.average.dqmQueueRate;
  dqmQueueBandwidth_ = stats.average.dqmQueueBandwidth;
  fragmentProcessorBusy_ = stats.average.fragmentProcessorBusy;
  diskWriterBusy_ = stats.average.diskWriterBusy;
  dqmEventProcessorBusy_ = stats.average.dqmEventProcessorBusy;
  averagingTime_ = utils::durationToSeconds(stats.average.duration);
}
int stor::ThroughputMonitorCollection::getBinCount ( ) const [inline]

Definition at line 41 of file ThroughputMonitorCollection.h.

References binCount_.

{return binCount_;}
MonitoredQuantity& stor::ThroughputMonitorCollection::getDiskWriteMQ ( ) [inline]

Definition at line 147 of file ThroughputMonitorCollection.h.

References diskWriteSizeMQ_.

                                        {
      return diskWriteSizeMQ_;
    }
const MonitoredQuantity& stor::ThroughputMonitorCollection::getDiskWriteMQ ( ) const [inline]

Definition at line 144 of file ThroughputMonitorCollection.h.

References diskWriteSizeMQ_.

                                                    {
      return diskWriteSizeMQ_;
    }
const MonitoredQuantity& stor::ThroughputMonitorCollection::getDiskWriterIdleMQ ( ) const [inline]

Definition at line 135 of file ThroughputMonitorCollection.h.

References diskWriterIdleTimeMQ_.

                                                         {
      return diskWriterIdleTimeMQ_;
    }
MonitoredQuantity& stor::ThroughputMonitorCollection::getDiskWriterIdleMQ ( ) [inline]

Definition at line 138 of file ThroughputMonitorCollection.h.

References diskWriterIdleTimeMQ_.

                                             {
      return diskWriterIdleTimeMQ_;
    }
const MonitoredQuantity& stor::ThroughputMonitorCollection::getDQMEventProcessorIdleMQ ( ) const [inline]
MonitoredQuantity& stor::ThroughputMonitorCollection::getDQMEventProcessorIdleMQ ( ) [inline]
const MonitoredQuantity& stor::ThroughputMonitorCollection::getDQMEventQueueEntryCountMQ ( ) const [inline]

Definition at line 155 of file ThroughputMonitorCollection.h.

References entriesInDQMEventQueueMQ_.

MonitoredQuantity& stor::ThroughputMonitorCollection::getDQMEventQueueEntryCountMQ ( ) [inline]

Definition at line 158 of file ThroughputMonitorCollection.h.

References entriesInDQMEventQueueMQ_.

const MonitoredQuantity& stor::ThroughputMonitorCollection::getDQMEventQueueMemoryUsedMQ ( ) const [inline]
MonitoredQuantity& stor::ThroughputMonitorCollection::getDQMEventQueueMemoryUsedMQ ( ) [inline]
const MonitoredQuantity& stor::ThroughputMonitorCollection::getFragmentProcessorIdleMQ ( ) const [inline]
MonitoredQuantity& stor::ThroughputMonitorCollection::getFragmentProcessorIdleMQ ( ) [inline]
const MonitoredQuantity& stor::ThroughputMonitorCollection::getFragmentQueueEntryCountMQ ( ) const [inline]

Definition at line 60 of file ThroughputMonitorCollection.h.

References entriesInFragmentQueueMQ_.

MonitoredQuantity& stor::ThroughputMonitorCollection::getFragmentQueueEntryCountMQ ( ) [inline]

Definition at line 63 of file ThroughputMonitorCollection.h.

References entriesInFragmentQueueMQ_.

MonitoredQuantity& stor::ThroughputMonitorCollection::getFragmentQueueMemoryUsedMQ ( ) [inline]
const MonitoredQuantity& stor::ThroughputMonitorCollection::getFragmentQueueMemoryUsedMQ ( ) const [inline]
const MonitoredQuantity& stor::ThroughputMonitorCollection::getFragmentStoreEntryCountMQ ( ) const [inline]

Definition at line 92 of file ThroughputMonitorCollection.h.

References entriesInFragmentStoreMQ_.

MonitoredQuantity& stor::ThroughputMonitorCollection::getFragmentStoreEntryCountMQ ( ) [inline]

Definition at line 95 of file ThroughputMonitorCollection.h.

References entriesInFragmentStoreMQ_.

const MonitoredQuantity& stor::ThroughputMonitorCollection::getFragmentStoreMemoryUsedMQ ( ) const [inline]
MonitoredQuantity& stor::ThroughputMonitorCollection::getFragmentStoreMemoryUsedMQ ( ) [inline]
const MonitoredQuantity& stor::ThroughputMonitorCollection::getPoolUsageMQ ( ) const [inline]

Definition at line 53 of file ThroughputMonitorCollection.h.

References poolUsageMQ_.

                                                    {
      return poolUsageMQ_;
    }
MonitoredQuantity& stor::ThroughputMonitorCollection::getPoolUsageMQ ( ) [inline]

Definition at line 56 of file ThroughputMonitorCollection.h.

References poolUsageMQ_.

                                        {
      return poolUsageMQ_;
    }
const MonitoredQuantity& stor::ThroughputMonitorCollection::getPoppedDQMEventSizeMQ ( ) const [inline]

Definition at line 171 of file ThroughputMonitorCollection.h.

References poppedDQMEventSizeMQ_.

                                                             {
      return poppedDQMEventSizeMQ_;
    }
MonitoredQuantity& stor::ThroughputMonitorCollection::getPoppedDQMEventSizeMQ ( ) [inline]

Definition at line 174 of file ThroughputMonitorCollection.h.

References poppedDQMEventSizeMQ_.

                                                 {
      return poppedDQMEventSizeMQ_;
    }
MonitoredQuantity& stor::ThroughputMonitorCollection::getPoppedEventSizeMQ ( ) [inline]

Definition at line 129 of file ThroughputMonitorCollection.h.

References poppedEventSizeMQ_.

                                              {
      return poppedEventSizeMQ_;
    }
const MonitoredQuantity& stor::ThroughputMonitorCollection::getPoppedEventSizeMQ ( ) const [inline]

Definition at line 126 of file ThroughputMonitorCollection.h.

References poppedEventSizeMQ_.

                                                          {
      return poppedEventSizeMQ_;
    }
const MonitoredQuantity& stor::ThroughputMonitorCollection::getPoppedFragmentSizeMQ ( ) const [inline]

Definition at line 76 of file ThroughputMonitorCollection.h.

References poppedFragmentSizeMQ_.

                                                             {
      return poppedFragmentSizeMQ_;
    }
MonitoredQuantity& stor::ThroughputMonitorCollection::getPoppedFragmentSizeMQ ( ) [inline]

Definition at line 79 of file ThroughputMonitorCollection.h.

References poppedFragmentSizeMQ_.

                                                 {
      return poppedFragmentSizeMQ_;
    }
void ThroughputMonitorCollection::getRateAndBandwidth ( MonitoredQuantity::Stats stats,
const int &  idx,
double &  rate,
double &  bandwidth 
) const [private]

Definition at line 295 of file ThroughputMonitorCollection.cc.

References stor::utils::durationToSeconds(), stor::MonitoredQuantity::Stats::recentBinnedDurations, stor::MonitoredQuantity::Stats::recentBinnedSampleCounts, and stor::MonitoredQuantity::Stats::recentBinnedValueSums.

Referenced by do_getStats().

{
  const double recentBinnedDuration = utils::durationToSeconds(stats.recentBinnedDurations[idx]);
  if (recentBinnedDuration > 0)
  {
    rate =
      stats.recentBinnedSampleCounts[idx] / recentBinnedDuration;
    
    bandwidth =
      stats.recentBinnedValueSums[idx] / (1024*1024) 
      / recentBinnedDuration;
  }
}
void ThroughputMonitorCollection::getStats ( Stats stats) const

Write all our collected statistics into the given Stats struct.

Definition at line 109 of file ThroughputMonitorCollection.cc.

References binCount_, do_getStats(), and statsMutex_.

Referenced by stor::SMWebPageHelper::addDOMforResourceUsage(), stor::SMWebPageHelper::addDOMforThroughputStatistics(), and do_updateInfoSpaceItems().

{
  boost::mutex::scoped_lock sl(statsMutex_);
  do_getStats(stats, binCount_);
}
void ThroughputMonitorCollection::getStats ( Stats stats,
const unsigned int  sampleCount 
) const

Write only the sampleCount most recent snapshots into the given Stats struct.

Definition at line 116 of file ThroughputMonitorCollection.cc.

References do_getStats(), and statsMutex_.

{
  boost::mutex::scoped_lock sl(statsMutex_);
  do_getStats(stats, sampleCount);
}
const MonitoredQuantity& stor::ThroughputMonitorCollection::getStreamQueueEntryCountMQ ( ) const [inline]

Definition at line 110 of file ThroughputMonitorCollection.h.

References entriesInStreamQueueMQ_.

MonitoredQuantity& stor::ThroughputMonitorCollection::getStreamQueueEntryCountMQ ( ) [inline]

Definition at line 113 of file ThroughputMonitorCollection.h.

References entriesInStreamQueueMQ_.

MonitoredQuantity& stor::ThroughputMonitorCollection::getStreamQueueMemoryUsedMQ ( ) [inline]

Definition at line 120 of file ThroughputMonitorCollection.h.

References memoryUsedInStreamQueueMQ_.

const MonitoredQuantity& stor::ThroughputMonitorCollection::getStreamQueueMemoryUsedMQ ( ) const [inline]

Definition at line 117 of file ThroughputMonitorCollection.h.

References memoryUsedInStreamQueueMQ_.

ThroughputMonitorCollection& stor::ThroughputMonitorCollection::operator= ( ThroughputMonitorCollection const &  ) [private]
void stor::ThroughputMonitorCollection::setDQMEventQueue ( DQMEventQueuePtr  dqmEventQueue) [inline]

Definition at line 151 of file ThroughputMonitorCollection.h.

References dqmEventQueue_.

                                                          {
      dqmEventQueue_ = dqmEventQueue;
    }
void stor::ThroughputMonitorCollection::setFragmentQueue ( FragmentQueuePtr  fragmentQueue) [inline]

Definition at line 49 of file ThroughputMonitorCollection.h.

References fragmentQueue_.

                                                          {
      fragmentQueue_ = fragmentQueue;
    }
void stor::ThroughputMonitorCollection::setFragmentStoreMemoryUsed ( size_t  memoryUsed) [inline]

Sets the current number of events in the fragment store.

Definition at line 197 of file ThroughputMonitorCollection.h.

References currentFragmentStoreMemoryUsedMB_.

Referenced by stor::Processing::do_processI2OFragment().

                                                              {
      currentFragmentStoreMemoryUsedMB_ = static_cast<double>(memoryUsed) / (1024*1024);
    }
void stor::ThroughputMonitorCollection::setFragmentStoreSize ( unsigned int  size) [inline]

Sets the current number of events in the fragment store.

Definition at line 190 of file ThroughputMonitorCollection.h.

References currentFragmentStoreSize_, and findQualityFiles::size.

Referenced by stor::Processing::do_processI2OFragment().

void ThroughputMonitorCollection::setMemoryPoolPointer ( toolbox::mem::Pool *  pool)

Stores the given memory pool pointer if not yet set. If it is already set, the argument is ignored.

Definition at line 39 of file ThroughputMonitorCollection.cc.

References fetchall_from_DQM_v2::pool, and pool_.

Referenced by stor::StorageManager::receiveRegistryMessage().

{
  if ( ! pool_)
    pool_ = pool;
}
void stor::ThroughputMonitorCollection::setStreamQueue ( StreamQueuePtr  streamQueue) [inline]

Definition at line 106 of file ThroughputMonitorCollection.h.

References streamQueue_.

                                                    {
      streamQueue_ = streamQueue;
    }
void ThroughputMonitorCollection::smoothIdleTimes ( MonitoredQuantity::Stats stats) const [private]

Smooth out binned idle times for the throughput display. Returns the index to be used for the next section to smooth. Note that this method works on the idleTimes and durations lists in *reverse* order. So, the initial indices should be idleTimes.size()-1.

Definition at line 246 of file ThroughputMonitorCollection.cc.

References binCount_, getHLTprescales::index, stor::MonitoredQuantity::Stats::recentBinnedDurations, stor::MonitoredQuantity::Stats::recentBinnedValueSums, and smoothIdleTimesHelper().

Referenced by do_getStats().

{
  int index = binCount_ - 1;
  while (index >= 0)
  {
    index = smoothIdleTimesHelper(stats.recentBinnedValueSums,
                                  stats.recentBinnedDurations,
                                  index, index);
  }
}
int ThroughputMonitorCollection::smoothIdleTimesHelper ( std::vector< double > &  idleTimes,
std::vector< utils::Duration_t > &  durations,
int  firstIndex,
int  lastIndex 
) const [private]

Definition at line 259 of file ThroughputMonitorCollection.cc.

References stor::utils::durationToSeconds(), and stor::utils::secondsToDuration().

Referenced by smoothIdleTimes().

{
  int workingSize = lastIndex - firstIndex + 1;
  double idleTimeSum = 0;
  double durationSum = 0;

  for (int idx = firstIndex; idx <= lastIndex; ++idx)
  {
    idleTimeSum += idleTimes[idx];
    durationSum += utils::durationToSeconds(durations[idx]);
  }

  if (idleTimeSum > durationSum && firstIndex > 0)
  {
    return smoothIdleTimesHelper(idleTimes, durations, firstIndex-1, lastIndex);
  }
  else
  {
    if (lastIndex > firstIndex)
    {
      for (int idx = firstIndex; idx <= lastIndex; ++idx)
      {
        idleTimes[idx] = idleTimeSum / workingSize;
        durations[idx] = utils::secondsToDuration(durationSum / workingSize);
      }
    }
    return (firstIndex - 1);
  }
}

Member Data Documentation

const unsigned int stor::ThroughputMonitorCollection::binCount_ [private]

Definition at line 299 of file ThroughputMonitorCollection.h.

Referenced by do_getStats(), getBinCount(), getStats(), and smoothIdleTimes().

Definition at line 325 of file ThroughputMonitorCollection.h.

Referenced by do_calculateStatistics(), and setFragmentStoreSize().

Definition at line 323 of file ThroughputMonitorCollection.h.

Referenced by do_calculateStatistics(), and setDQMEventQueue().

xdata::UnsignedInteger32 stor::ThroughputMonitorCollection::entriesInDQMQueue_ [private]

Definition at line 321 of file ThroughputMonitorCollection.h.

Referenced by do_calculateStatistics(), and setFragmentQueue().

xdata::UnsignedInteger32 stor::ThroughputMonitorCollection::fragmentStoreSize_ [private]
toolbox::mem::Pool* stor::ThroughputMonitorCollection::pool_ [private]

Definition at line 329 of file ThroughputMonitorCollection.h.

Referenced by calcPoolUsage(), and setMemoryPoolPointer().

xdata::UnsignedInteger32 stor::ThroughputMonitorCollection::poolUsage_ [private]

Definition at line 300 of file ThroughputMonitorCollection.h.

Referenced by getStats().

Definition at line 322 of file ThroughputMonitorCollection.h.

Referenced by do_calculateStatistics(), and setStreamQueue().

Definition at line 327 of file ThroughputMonitorCollection.h.

Referenced by do_updateInfoSpaceItems().