12 const unsigned int& throuphputAveragingCycles
16 poolUsageMQ_(updateInterval, updateInterval*binCount_),
17 entriesInFragmentQueueMQ_(updateInterval, updateInterval*binCount_),
18 memoryUsedInFragmentQueueMQ_(updateInterval, updateInterval*binCount_),
19 poppedFragmentSizeMQ_(updateInterval, updateInterval*binCount_),
20 fragmentProcessorIdleTimeMQ_(updateInterval, updateInterval*binCount_),
21 entriesInFragmentStoreMQ_(updateInterval, updateInterval*binCount_),
22 memoryUsedInFragmentStoreMQ_(updateInterval, updateInterval*binCount_),
23 entriesInStreamQueueMQ_(updateInterval, updateInterval*binCount_),
24 memoryUsedInStreamQueueMQ_(updateInterval, updateInterval*binCount_),
25 poppedEventSizeMQ_(updateInterval, updateInterval*binCount_),
26 diskWriterIdleTimeMQ_(updateInterval, updateInterval*binCount_),
27 diskWriteSizeMQ_(updateInterval, updateInterval*binCount_),
28 entriesInDQMEventQueueMQ_(updateInterval, updateInterval*binCount_),
29 memoryUsedInDQMEventQueueMQ_(updateInterval, updateInterval*binCount_),
30 poppedDQMEventSizeMQ_(updateInterval, updateInterval*binCount_),
31 dqmEventProcessorIdleTimeMQ_(updateInterval, updateInterval*binCount_),
32 currentFragmentStoreSize_(0),
33 currentFragmentStoreMemoryUsedMB_(0),
34 throuphputAveragingCycles_(throuphputAveragingCycles),
154 for (
int idx = (
binCount_ - 1); idx >= lowestBin; --idx)
157 relativeTime -= binDuration;
158 if (binDuration < boost::posix_time::milliseconds(10))
continue;
238 const size_t snapshotCount = stats.
snapshots.size();
239 if (snapshotCount > 0)
241 stats.
average /= snapshotCount;
260 std::vector<double>& idleTimes,
261 std::vector<utils::Duration_t>& durations,
262 int firstIndex,
int lastIndex
265 int workingSize = lastIndex - firstIndex + 1;
266 double idleTimeSum = 0;
267 double durationSum = 0;
269 for (
int idx = firstIndex; idx <= lastIndex; ++idx)
271 idleTimeSum += idleTimes[idx];
275 if (idleTimeSum > durationSum && firstIndex > 0)
281 if (lastIndex > firstIndex)
283 for (
int idx = firstIndex; idx <= lastIndex; ++idx)
285 idleTimes[idx] = idleTimeSum / workingSize;
289 return (firstIndex - 1);
303 if (recentBinnedDuration > 0)
310 / recentBinnedDuration;
321 double busyPercentage;
325 busyPercentage = 100;
346 return busyPercentage;
410 infoSpaceItems.push_back(std::make_pair(
"poolUsage", &
poolUsage_));
419 infoSpaceItems.push_back(std::make_pair(
"streamQueueRate", &
streamQueueRate_));
425 infoSpaceItems.push_back(std::make_pair(
"dqmQueueRate", &
dqmQueueRate_));
428 infoSpaceItems.push_back(std::make_pair(
"diskWriterBusy", &
diskWriterBusy_));
430 infoSpaceItems.push_back(std::make_pair(
"averagingTime", &
averagingTime_));
464 duration(boost::posix_time::
seconds(0)),
466 entriesInFragmentQueue(0),
467 memoryUsedInFragmentQueue(0),
468 fragmentQueueRate(0),
469 fragmentQueueBandwidth(0),
470 fragmentStoreSize(0),
471 fragmentStoreMemoryUsed(0),
472 entriesInStreamQueue(0),
473 memoryUsedInStreamQueue(0),
475 streamQueueBandwidth(0),
476 writtenEventsRate(0),
477 writtenEventsBandwidth(0),
478 entriesInDQMQueue(0),
479 memoryUsedInDQMQueue(0),
481 dqmQueueBandwidth(0),
482 fragmentProcessorBusy(0),
484 dqmEventProcessorBusy(0)
550 entriesInFragmentQueue /=
value;
551 memoryUsedInFragmentQueue /=
value;
552 fragmentQueueRate /=
value;
553 fragmentQueueBandwidth /=
value;
554 fragmentStoreSize /=
value;
555 fragmentStoreMemoryUsed /=
value;
556 entriesInStreamQueue /=
value;
557 memoryUsedInStreamQueue /=
value;
558 streamQueueRate /=
value;
559 streamQueueBandwidth /=
value;
560 writtenEventsRate /=
value;
561 writtenEventsBandwidth /=
value;
562 entriesInDQMQueue /=
value;
563 memoryUsedInDQMQueue /=
value;
564 dqmQueueRate /=
value;
565 dqmQueueBandwidth /=
value;
566 fragmentProcessorBusy /=
value;
567 diskWriterBusy /=
value;
568 dqmEventProcessorBusy /=
value;
xdata::Double fragmentQueueRate_
xdata::Double dqmQueueBandwidth_
MonitoredQuantity memoryUsedInFragmentStoreMQ_
xdata::Double writtenEventsRate_
MonitoredQuantity diskWriteSizeMQ_
xdata::UnsignedInteger32 entriesInDQMQueue_
ThroughputMonitorCollection(const utils::Duration_t &updateInterval, const unsigned int &throuphputAveragingCycles)
Snapshot operator+=(const Snapshot &)
double entriesInStreamQueue
xdata::Double memoryUsedInDQMQueue_
utils::Duration_t recentDuration
xdata::Double streamQueueBandwidth_
MonitoredQuantity fragmentProcessorIdleTimeMQ_
StreamQueuePtr streamQueue_
void addSample(const double &value=1)
MonitoredQuantity poolUsageMQ_
std::vector< uint64_t > recentBinnedSampleCounts
xdata::Double averagingTime_
Duration_t secondsToDuration(double const &seconds)
double entriesInFragmentQueue
MonitoredQuantity poppedDQMEventSizeMQ_
xdata::Double memoryUsedInStreamQueue_
double streamQueueBandwidth
xdata::UnsignedInteger32 entriesInStreamQueue_
MonitoredQuantity memoryUsedInDQMEventQueueMQ_
void getStats(Stats &stats) const
MonitoredQuantity poppedEventSizeMQ_
xdata::Double streamQueueRate_
MonitoredQuantity entriesInFragmentQueueMQ_
std::vector< utils::TimePoint_t > recentBinnedSnapshotTimes
double fragmentQueueBandwidth
int smoothIdleTimesHelper(std::vector< double > &idleTimes, std::vector< utils::Duration_t > &durations, int firstIndex, int lastIndex) const
double fragmentStoreMemoryUsed
void addPoppedFragmentSample(double dataSize)
virtual void do_appendInfoSpaceItems(InfoSpaceItems &)
void calculateStatistics(const utils::TimePoint_t ¤tTime=utils::getCurrentTime())
double dqmEventProcessorBusy
virtual void do_calculateStatistics()
boost::posix_time::time_duration Duration_t
double fragmentProcessorBusy
xdata::Double dqmQueueRate_
toolbox::mem::Pool * pool_
double writtenEventsBandwidth
virtual void do_updateInfoSpaceItems()
double calcBusyPercentage(MonitoredQuantity::Stats &, const int &idx) const
void do_getStats(Stats &, const unsigned int sampleCount) const
const unsigned int binCount_
MonitoredQuantity poppedFragmentSizeMQ_
void setMemoryPoolPointer(toolbox::mem::Pool *)
void addPoppedDQMEventSample(double dataSize)
utils::Duration_t duration
void addDiskWriteSample(double dataSize)
double memoryUsedInFragmentQueue
xdata::UnsignedInteger32 fragmentStoreSize_
void getRateAndBandwidth(MonitoredQuantity::Stats &stats, const int &idx, double &rate, double &bandwidth) const
void getStats(Stats &) const
utils::TimePoint_t absoluteTime
xdata::UnsignedInteger32 entriesInFragmentQueue_
std::vector< double > recentBinnedValueSums
MonitoredQuantity diskWriterIdleTimeMQ_
void addDiskWriterIdleSample(utils::Duration_t idleTime)
xdata::UnsignedInteger32 poolUsage_
xdata::Double fragmentProcessorBusy_
MonitoredQuantity entriesInDQMEventQueueMQ_
xdata::Double diskWriterBusy_
DQMEventQueuePtr dqmEventQueue_
MonitoredQuantity dqmEventProcessorIdleTimeMQ_
double memoryUsedInDQMQueue
MonitoredQuantity memoryUsedInStreamQueueMQ_
std::vector< std::pair< std::string, xdata::Serializable * > > InfoSpaceItems
Snapshot operator=(const Snapshot &)
xdata::Double memoryUsedInFragmentQueue_
void addPoppedEventSample(double dataSize)
void smoothIdleTimes(MonitoredQuantity::Stats &) const
unsigned int currentFragmentStoreSize_
std::vector< utils::Duration_t > recentBinnedDurations
xdata::Double fragmentStoreMemoryUsed_
void addFragmentProcessorIdleSample(utils::Duration_t idleTime)
double currentFragmentStoreMemoryUsedMB_
double durationToSeconds(Duration_t const &)
MonitoredQuantity entriesInFragmentStoreMQ_
Snapshot operator/=(const double &)
MonitoredQuantity memoryUsedInFragmentQueueMQ_
xdata::Double writtenEventsBandwidth_
unsigned int throuphputAveragingCycles_
void addDQMEventProcessorIdleSample(utils::Duration_t idleTime)
FragmentQueuePtr fragmentQueue_
xdata::Double fragmentQueueBandwidth_
xdata::Double dqmEventProcessorBusy_
double memoryUsedInStreamQueue
MonitoredQuantity entriesInStreamQueueMQ_