CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
ThroughputMonitorCollection.cc
Go to the documentation of this file.
1 // $Id: ThroughputMonitorCollection.cc,v 1.23.4.1 2011/03/07 11:33:05 mommsen Exp $
3 
6 
7 using namespace stor;
8 
10 (
11  const utils::Duration_t& updateInterval,
12  const unsigned int& throuphputAveragingCycles
13 ) :
14  MonitorCollection(updateInterval),
15  binCount_(300),
16  poolUsageMQ_(updateInterval, updateInterval*binCount_),
17  entriesInFragmentQueueMQ_(updateInterval, updateInterval*binCount_),
18  memoryUsedInFragmentQueueMQ_(updateInterval, updateInterval*binCount_),
19  poppedFragmentSizeMQ_(updateInterval, updateInterval*binCount_),
20  fragmentProcessorIdleTimeMQ_(updateInterval, updateInterval*binCount_),
21  entriesInFragmentStoreMQ_(updateInterval, updateInterval*binCount_),
22  memoryUsedInFragmentStoreMQ_(updateInterval, updateInterval*binCount_),
23  entriesInStreamQueueMQ_(updateInterval, updateInterval*binCount_),
24  memoryUsedInStreamQueueMQ_(updateInterval, updateInterval*binCount_),
25  poppedEventSizeMQ_(updateInterval, updateInterval*binCount_),
26  diskWriterIdleTimeMQ_(updateInterval, updateInterval*binCount_),
27  diskWriteSizeMQ_(updateInterval, updateInterval*binCount_),
28  entriesInDQMEventQueueMQ_(updateInterval, updateInterval*binCount_),
29  memoryUsedInDQMEventQueueMQ_(updateInterval, updateInterval*binCount_),
30  poppedDQMEventSizeMQ_(updateInterval, updateInterval*binCount_),
31  dqmEventProcessorIdleTimeMQ_(updateInterval, updateInterval*binCount_),
32  currentFragmentStoreSize_(0),
33  currentFragmentStoreMemoryUsedMB_(0),
34  throuphputAveragingCycles_(throuphputAveragingCycles),
35  pool_(0)
36 {}
37 
38 
40 {
41  if ( ! pool_)
42  pool_ = pool;
43 }
44 
45 
47 {
49 }
50 
51 
54 {
56 }
57 
58 
60 {
61  poppedEventSizeMQ_.addSample(dataSize);
62 }
63 
64 
67 {
69 }
70 
71 
73 {
74  diskWriteSizeMQ_.addSample(dataSize);
75 }
76 
77 
79 {
81 }
82 
83 
86 {
88 }
89 
90 
92 {
93  if (pool_)
94  {
95  try {
96  pool_->lock();
97  poolUsageMQ_.addSample(pool_->getMemoryUsage().getUsed());
98  pool_->unlock();
99  }
100  catch (...)
101  {
102  pool_->unlock();
103  }
104  }
106 }
107 
108 
110 {
111  boost::mutex::scoped_lock sl(statsMutex_);
112  do_getStats(stats, binCount_);
113 }
114 
115 
116 void ThroughputMonitorCollection::getStats(Stats& stats, const unsigned int sampleCount) const
117 {
118  boost::mutex::scoped_lock sl(statsMutex_);
119  do_getStats(stats, sampleCount);
120 }
121 
122 
123 void ThroughputMonitorCollection::do_getStats(Stats& stats, const unsigned int sampleCount) const
124 {
125  MonitoredQuantity::Stats fqEntryCountMQ, fqMemoryUsedMQ, fragSizeMQ;
126  MonitoredQuantity::Stats fpIdleMQ, fsEntryCountMQ, fsMemoryUsedMQ;
127  MonitoredQuantity::Stats sqEntryCountMQ, sqMemoryUsedMQ, eventSizeMQ, dwIdleMQ, diskWriteMQ;
128  MonitoredQuantity::Stats dqEntryCountMQ, dqMemoryUsedMQ, dqmEventSizeMQ, dqmIdleMQ, poolUsageMQ;
129  poolUsageMQ_.getStats(poolUsageMQ);
130  entriesInFragmentQueueMQ_.getStats(fqEntryCountMQ);
131  memoryUsedInFragmentQueueMQ_.getStats(fqMemoryUsedMQ);
132  poppedFragmentSizeMQ_.getStats(fragSizeMQ);
134  entriesInFragmentStoreMQ_.getStats(fsEntryCountMQ);
135  memoryUsedInFragmentStoreMQ_.getStats(fsMemoryUsedMQ);
136  entriesInStreamQueueMQ_.getStats(sqEntryCountMQ);
137  memoryUsedInStreamQueueMQ_.getStats(sqMemoryUsedMQ);
138  poppedEventSizeMQ_.getStats(eventSizeMQ);
140  diskWriteSizeMQ_.getStats(diskWriteMQ);
141  entriesInDQMEventQueueMQ_.getStats(dqEntryCountMQ);
142  memoryUsedInDQMEventQueueMQ_.getStats(dqMemoryUsedMQ);
143  poppedDQMEventSizeMQ_.getStats(dqmEventSizeMQ);
145 
146  stats.reset();
147 
148  smoothIdleTimes(fpIdleMQ);
149  smoothIdleTimes(dwIdleMQ);
150  smoothIdleTimes(dqmIdleMQ);
151 
152  utils::Duration_t relativeTime = fqEntryCountMQ.recentDuration;
153  const int lowestBin = sampleCount<binCount_ ? binCount_-sampleCount : 0;
154  for (int idx = (binCount_ - 1); idx >= lowestBin; --idx)
155  {
156  utils::Duration_t binDuration = fqEntryCountMQ.recentBinnedDurations[idx];
157  relativeTime -= binDuration;
158  if (binDuration < boost::posix_time::milliseconds(10)) continue; //avoid very short durations
159 
160  Stats::Snapshot snapshot;
161 
162  snapshot.duration = binDuration;
163  snapshot.absoluteTime = fqEntryCountMQ.recentBinnedSnapshotTimes[idx];
164 
165  // memory pool usage
166  snapshot.poolUsage = poolUsageMQ.recentBinnedSampleCounts[idx]>0 ?
167  poolUsageMQ.recentBinnedValueSums[idx]/poolUsageMQ.recentBinnedSampleCounts[idx] :
168  0;
169 
170  // number of fragments in fragment queue
171  snapshot.entriesInFragmentQueue = fqEntryCountMQ.recentBinnedSampleCounts[idx]>0 ?
172  fqEntryCountMQ.recentBinnedValueSums[idx]/fqEntryCountMQ.recentBinnedSampleCounts[idx] :
173  0;
174 
175  // memory usage in fragment queue
176  snapshot.memoryUsedInFragmentQueue = fqMemoryUsedMQ.recentBinnedSampleCounts[idx]>0 ?
177  fqMemoryUsedMQ.recentBinnedValueSums[idx]/fqMemoryUsedMQ.recentBinnedSampleCounts[idx] :
178  0;
179 
180  // rate/bandwidth of fragments popped from fragment queue
181  getRateAndBandwidth(fragSizeMQ, idx, snapshot.fragmentQueueRate, snapshot.fragmentQueueBandwidth);
182 
183  // number of events in fragment store
184  snapshot.fragmentStoreSize = fsEntryCountMQ.recentBinnedSampleCounts[idx]>0 ?
185  fsEntryCountMQ.recentBinnedValueSums[idx]/fsEntryCountMQ.recentBinnedSampleCounts[idx]>0 :
186  0;
187 
188  // memory usage in fragment store
189  snapshot.fragmentStoreMemoryUsed = fsMemoryUsedMQ.recentBinnedSampleCounts[idx]>0 ?
190  fsMemoryUsedMQ.recentBinnedValueSums[idx]/fsMemoryUsedMQ.recentBinnedSampleCounts[idx] :
191  0;
192 
193  // number of events in stream queue
194  snapshot.entriesInStreamQueue = sqEntryCountMQ.recentBinnedSampleCounts[idx]>0 ?
195  sqEntryCountMQ.recentBinnedValueSums[idx]/sqEntryCountMQ.recentBinnedSampleCounts[idx]>0 :
196  0;
197 
198  // memory usage in stream queue
199  snapshot.memoryUsedInStreamQueue = sqMemoryUsedMQ.recentBinnedSampleCounts[idx]>0 ?
200  sqMemoryUsedMQ.recentBinnedValueSums[idx]/sqMemoryUsedMQ.recentBinnedSampleCounts[idx] :
201  0;
202 
203  // rate/bandwidth of events popped from stream queue
204  getRateAndBandwidth(eventSizeMQ, idx, snapshot.streamQueueRate, snapshot.streamQueueBandwidth);
205 
206  // rate/bandwidth of events written to disk
207  getRateAndBandwidth(diskWriteMQ, idx, snapshot.writtenEventsRate, snapshot.writtenEventsBandwidth);
208 
209  // number of dqm events in DQMEvent queue
210  snapshot.entriesInDQMQueue = dqEntryCountMQ.recentBinnedSampleCounts[idx]>0 ?
211  dqEntryCountMQ.recentBinnedValueSums[idx]/dqEntryCountMQ.recentBinnedSampleCounts[idx] :
212  0;
213 
214  // memory usage in DQMEvent queue
215  snapshot.memoryUsedInDQMQueue = dqMemoryUsedMQ.recentBinnedSampleCounts[idx]>0 ?
216  dqMemoryUsedMQ.recentBinnedValueSums[idx]/dqMemoryUsedMQ.recentBinnedSampleCounts[idx] :
217  0;
218 
219  // rate/bandwidth of dqm events popped from DQMEvent queue
220  getRateAndBandwidth(dqmEventSizeMQ, idx, snapshot.dqmQueueRate, snapshot.dqmQueueBandwidth);
221 
222  // fragment processor thread busy percentage
223  snapshot.fragmentProcessorBusy =
224  calcBusyPercentage(fpIdleMQ, idx);
225 
226  // disk writer thread busy percentage
227  snapshot.diskWriterBusy =
228  calcBusyPercentage(dwIdleMQ, idx);
229 
230  // DQMEvent processor thread busy percentage
231  snapshot.dqmEventProcessorBusy =
232  calcBusyPercentage(dqmIdleMQ, idx);
233 
234  stats.average += snapshot;
235  stats.snapshots.push_back(snapshot);
236  }
237 
238  const size_t snapshotCount = stats.snapshots.size();
239  if (snapshotCount > 0)
240  {
241  stats.average /= snapshotCount;
242  }
243 }
244 
245 
247 {
248  int index = binCount_ - 1;
249  while (index >= 0)
250  {
252  stats.recentBinnedDurations,
253  index, index);
254  }
255 }
256 
257 
259 (
260  std::vector<double>& idleTimes,
261  std::vector<utils::Duration_t>& durations,
262  int firstIndex, int lastIndex
263 ) const
264 {
265  int workingSize = lastIndex - firstIndex + 1;
266  double idleTimeSum = 0;
267  double durationSum = 0;
268 
269  for (int idx = firstIndex; idx <= lastIndex; ++idx)
270  {
271  idleTimeSum += idleTimes[idx];
272  durationSum += utils::durationToSeconds(durations[idx]);
273  }
274 
275  if (idleTimeSum > durationSum && firstIndex > 0)
276  {
277  return smoothIdleTimesHelper(idleTimes, durations, firstIndex-1, lastIndex);
278  }
279  else
280  {
281  if (lastIndex > firstIndex)
282  {
283  for (int idx = firstIndex; idx <= lastIndex; ++idx)
284  {
285  idleTimes[idx] = idleTimeSum / workingSize;
286  durations[idx] = utils::secondsToDuration(durationSum / workingSize);
287  }
288  }
289  return (firstIndex - 1);
290  }
291 }
292 
293 
295 (
297  const int& idx,
298  double& rate,
299  double& bandwidth
300 ) const
301 {
302  const double recentBinnedDuration = utils::durationToSeconds(stats.recentBinnedDurations[idx]);
303  if (recentBinnedDuration > 0)
304  {
305  rate =
306  stats.recentBinnedSampleCounts[idx] / recentBinnedDuration;
307 
308  bandwidth =
309  stats.recentBinnedValueSums[idx] / (1024*1024)
310  / recentBinnedDuration;
311  }
312 }
313 
314 
316 (
318  const int& idx
319 ) const
320 {
321  double busyPercentage;
322  if (stats.recentBinnedSampleCounts[idx] == 0)
323  {
324  // the thread did not log any idle time
325  busyPercentage = 100;
326  }
327  else if (stats.recentBinnedSampleCounts[idx] == 1)
328  {
329  // only one sample means that we waited a whole second on a queue
330  // this should only happen if deq_timed_wait timeout >= statistics calculation period
331  busyPercentage = 0;
332  }
333  else if (stats.recentBinnedValueSums[idx] <= utils::durationToSeconds(stats.recentBinnedDurations[idx]))
334  {
335  // the thread was busy while it was not idle during the whole reporting duration
336  busyPercentage = 100.0 * (1.0 - (stats.recentBinnedValueSums[idx] /
338  }
339  else
340  {
341  // the process logged more idle time than the whole reporting duration
342  // this can happen due to rounding issues.
343  busyPercentage = 0;
344  }
345 
346  return busyPercentage;
347 }
348 
349 
351 {
352  calcPoolUsage();
353 
354  if (fragmentQueue_.get() != 0) {
356  memoryUsedInFragmentQueueMQ_.addSample( static_cast<double>(fragmentQueue_->used()) / (1024*1024) );
357  }
358  if (streamQueue_.get() != 0) {
360  memoryUsedInStreamQueueMQ_.addSample( static_cast<double>(streamQueue_->used()) / (1024*1024) );
361  }
362  if (dqmEventQueue_.get() != 0) {
364  memoryUsedInDQMEventQueueMQ_.addSample( static_cast<double>(dqmEventQueue_->used()) / (1024*1024) );
365  }
368 
384 }
385 
386 
388 {
405 }
406 
407 
409 {
410  infoSpaceItems.push_back(std::make_pair("poolUsage", &poolUsage_));
411  infoSpaceItems.push_back(std::make_pair("entriesInFragmentQueue", &entriesInFragmentQueue_));
412  infoSpaceItems.push_back(std::make_pair("memoryUsedInFragmentQueue", &memoryUsedInFragmentQueue_));
413  infoSpaceItems.push_back(std::make_pair("fragmentQueueRate", &fragmentQueueRate_));
414  infoSpaceItems.push_back(std::make_pair("fragmentQueueBandwidth", &fragmentQueueBandwidth_));
415  infoSpaceItems.push_back(std::make_pair("fragmentStoreSize", &fragmentStoreSize_));
416  infoSpaceItems.push_back(std::make_pair("fragmentStoreMemoryUsed", &fragmentStoreMemoryUsed_));
417  infoSpaceItems.push_back(std::make_pair("entriesInStreamQueue", &entriesInStreamQueue_));
418  infoSpaceItems.push_back(std::make_pair("memoryUsedInStreamQueue", &memoryUsedInStreamQueue_));
419  infoSpaceItems.push_back(std::make_pair("streamQueueRate", &streamQueueRate_));
420  infoSpaceItems.push_back(std::make_pair("streamQueueBandwidth", &streamQueueBandwidth_));
421  infoSpaceItems.push_back(std::make_pair("writtenEventsRate", &writtenEventsRate_));
422  infoSpaceItems.push_back(std::make_pair("writtenEventsBandwidth", &writtenEventsBandwidth_));
423  infoSpaceItems.push_back(std::make_pair("entriesInDQMQueue", &entriesInDQMQueue_));
424  infoSpaceItems.push_back(std::make_pair("memoryUsedInDQMQueue", &memoryUsedInDQMQueue_));
425  infoSpaceItems.push_back(std::make_pair("dqmQueueRate", &dqmQueueRate_));
426  infoSpaceItems.push_back(std::make_pair("dqmQueueBandwidth", &dqmQueueBandwidth_));
427  infoSpaceItems.push_back(std::make_pair("fragmentProcessorBusy", &fragmentProcessorBusy_));
428  infoSpaceItems.push_back(std::make_pair("diskWriterBusy", &diskWriterBusy_));
429  infoSpaceItems.push_back(std::make_pair("dqmEventProcessorBusy", &dqmEventProcessorBusy_));
430  infoSpaceItems.push_back(std::make_pair("averagingTime", &averagingTime_));
431 }
432 
433 
435 {
436  Stats stats;
438 
439  poolUsage_ = static_cast<unsigned int>(stats.average.poolUsage);
440  entriesInFragmentQueue_ = static_cast<unsigned int>(stats.average.entriesInFragmentQueue);
444  fragmentStoreSize_ = static_cast<unsigned int>(stats.average.fragmentStoreSize);
446  entriesInStreamQueue_ = static_cast<unsigned int>(stats.average.entriesInStreamQueue);
452  entriesInDQMQueue_ = static_cast<unsigned int>(stats.average.entriesInDQMQueue);
460 }
461 
462 
464 duration(boost::posix_time::seconds(0)),
465 poolUsage(0),
466 entriesInFragmentQueue(0),
467 memoryUsedInFragmentQueue(0),
468 fragmentQueueRate(0),
469 fragmentQueueBandwidth(0),
470 fragmentStoreSize(0),
471 fragmentStoreMemoryUsed(0),
472 entriesInStreamQueue(0),
473 memoryUsedInStreamQueue(0),
474 streamQueueRate(0),
475 streamQueueBandwidth(0),
476 writtenEventsRate(0),
477 writtenEventsBandwidth(0),
478 entriesInDQMQueue(0),
479 memoryUsedInDQMQueue(0),
480 dqmQueueRate(0),
481 dqmQueueBandwidth(0),
482 fragmentProcessorBusy(0),
483 diskWriterBusy(0),
484 dqmEventProcessorBusy(0)
485 {}
486 
487 
490 {
491  duration = other.duration;
492  poolUsage = other.poolUsage;
493  entriesInFragmentQueue = other.entriesInFragmentQueue;
494  memoryUsedInFragmentQueue = other.memoryUsedInFragmentQueue;
495  fragmentQueueRate = other.fragmentQueueRate;
496  fragmentQueueBandwidth = other.fragmentQueueBandwidth;
497  fragmentStoreSize = other.fragmentStoreSize;
498  fragmentStoreMemoryUsed = other.fragmentStoreMemoryUsed;
499  entriesInStreamQueue = other.entriesInStreamQueue;
500  memoryUsedInStreamQueue = other.memoryUsedInStreamQueue;
501  streamQueueRate = other.streamQueueRate;
502  streamQueueBandwidth = other.streamQueueBandwidth;
503  writtenEventsRate = other.writtenEventsRate;
504  writtenEventsBandwidth = other.writtenEventsBandwidth;
505  entriesInDQMQueue = other.entriesInDQMQueue;
506  memoryUsedInDQMQueue = other.memoryUsedInDQMQueue;
507  dqmQueueRate = other.dqmQueueRate;
508  dqmQueueBandwidth = other.dqmQueueBandwidth;
509  fragmentProcessorBusy = other.fragmentProcessorBusy;
510  diskWriterBusy = other.diskWriterBusy;
511  dqmEventProcessorBusy = other.dqmEventProcessorBusy;
512 
513  return *this;
514 }
515 
516 
519 {
520  duration += other.duration;
521  poolUsage += other.poolUsage;
522  entriesInFragmentQueue += other.entriesInFragmentQueue;
523  memoryUsedInFragmentQueue += other.memoryUsedInFragmentQueue;
524  fragmentQueueRate += other.fragmentQueueRate;
525  fragmentQueueBandwidth += other.fragmentQueueBandwidth;
526  fragmentStoreSize += other.fragmentStoreSize;
527  fragmentStoreMemoryUsed += other.fragmentStoreMemoryUsed;
528  entriesInStreamQueue += other.entriesInStreamQueue;
529  memoryUsedInStreamQueue += other.memoryUsedInStreamQueue;
530  streamQueueRate += other.streamQueueRate;
531  streamQueueBandwidth += other.streamQueueBandwidth;
532  writtenEventsRate += other.writtenEventsRate;
533  writtenEventsBandwidth += other.writtenEventsBandwidth;
534  entriesInDQMQueue += other.entriesInDQMQueue;
535  memoryUsedInDQMQueue += other.memoryUsedInDQMQueue;
536  dqmQueueRate += other.dqmQueueRate;
537  dqmQueueBandwidth += other.dqmQueueBandwidth;
538  fragmentProcessorBusy += other.fragmentProcessorBusy;
539  diskWriterBusy += other.diskWriterBusy;
540  dqmEventProcessorBusy += other.dqmEventProcessorBusy;
541 
542  return *this;
543 }
544 
545 
548 {
549  poolUsage /= value;
550  entriesInFragmentQueue /= value;
551  memoryUsedInFragmentQueue /= value;
552  fragmentQueueRate /= value;
553  fragmentQueueBandwidth /= value;
554  fragmentStoreSize /= value;
555  fragmentStoreMemoryUsed /= value;
556  entriesInStreamQueue /= value;
557  memoryUsedInStreamQueue /= value;
558  streamQueueRate /= value;
559  streamQueueBandwidth /= value;
560  writtenEventsRate /= value;
561  writtenEventsBandwidth /= value;
562  entriesInDQMQueue /= value;
563  memoryUsedInDQMQueue /= value;
564  dqmQueueRate /= value;
565  dqmQueueBandwidth /= value;
566  fragmentProcessorBusy /= value;
567  diskWriterBusy /= value;
568  dqmEventProcessorBusy /= value;
569 
570  return *this;
571 }
572 
573 
575 {
576  snapshots.clear();
577  Snapshot empty;
578  average = empty;
579 }
580 
double seconds()
ThroughputMonitorCollection(const utils::Duration_t &updateInterval, const unsigned int &throuphputAveragingCycles)
void addSample(const double &value=1)
std::vector< uint64_t > recentBinnedSampleCounts
Duration_t secondsToDuration(double const &seconds)
Definition: Utils.h:140
void getStats(Stats &stats) const
std::vector< utils::TimePoint_t > recentBinnedSnapshotTimes
int smoothIdleTimesHelper(std::vector< double > &idleTimes, std::vector< utils::Duration_t > &durations, int firstIndex, int lastIndex) const
virtual void do_appendInfoSpaceItems(InfoSpaceItems &)
void calculateStatistics(const utils::TimePoint_t &currentTime=utils::getCurrentTime())
boost::posix_time::time_duration Duration_t
Definition: Utils.h:41
double calcBusyPercentage(MonitoredQuantity::Stats &, const int &idx) const
void do_getStats(Stats &, const unsigned int sampleCount) const
void setMemoryPoolPointer(toolbox::mem::Pool *)
void getRateAndBandwidth(MonitoredQuantity::Stats &stats, const int &idx, double &rate, double &bandwidth) const
std::vector< double > recentBinnedValueSums
void addDiskWriterIdleSample(utils::Duration_t idleTime)
std::vector< std::pair< std::string, xdata::Serializable * > > InfoSpaceItems
void smoothIdleTimes(MonitoredQuantity::Stats &) const
std::vector< utils::Duration_t > recentBinnedDurations
void addFragmentProcessorIdleSample(utils::Duration_t idleTime)
double durationToSeconds(Duration_t const &)
Definition: Utils.h:147
void addDQMEventProcessorIdleSample(utils::Duration_t idleTime)