CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
MonitoredQuantity.cc
Go to the documentation of this file.
1 // $Id: MonitoredQuantity.cc,v 1.13 2011/04/07 08:01:40 mommsen Exp $
3 
5 
6 #include <algorithm>
7 #include <math.h>
8 
9 
10 namespace stor {
11 
13  (
14  utils::Duration_t expectedCalculationInterval,
15  utils::Duration_t timeWindowForRecentResults
16  ):
17  enabled_(true),
18  expectedCalculationInterval_(expectedCalculationInterval)
19  {
20  setNewTimeWindowForRecentResults(timeWindowForRecentResults);
21  }
22 
23  void MonitoredQuantity::addSample(const double& value)
24  {
25  if (! enabled_) {return;}
26 
27  boost::mutex::scoped_lock sl(accumulationMutex_);
28 
29  if ( lastCalculationTime_.is_not_a_date_time() )
30  {
32  }
33 
36  workingValueSumOfSquares_ += (value * value);
37 
40 
42  }
43 
45  {
46  addSample(static_cast<double>(value));
47  }
48 
49  void MonitoredQuantity::addSample(const unsigned int& value)
50  {
51  addSample(static_cast<double>(value));
52  }
53 
55  {
56  addSample(static_cast<double>(value));
57  }
58 
59  void MonitoredQuantity::addSample(const unsigned long& value)
60  {
61  addSample(static_cast<double>(value));
62  }
63 
64  void MonitoredQuantity::addSample(const long long& value)
65  {
66  addSample(static_cast<double>(value));
67  }
68 
69  void MonitoredQuantity::addSample(const unsigned long long& value)
70  {
71  addSample(static_cast<double>(value));
72  }
73 
75  {
76  if (value > workingLastSampleValue_)
77  addSample(value);
78  }
79 
81  {
82  if (! enabled_) {return;}
83 
84  // create local copies of the working values to minimize the
85  // time that we could block a thread trying to add a sample.
86  // Also, reset the working values.
87  long long latestSampleCount;
88  double latestValueSum;
89  double latestValueSumOfSquares;
90  double latestValueMin;
91  double latestValueMax;
92  utils::Duration_t latestDuration;
93  utils::TimePoint_t latestSnapshotTime;
94  double latestLastLatchedSampleValue;
95  {
96  boost::mutex::scoped_lock sl(accumulationMutex_);
97 
98  if (lastCalculationTime_.is_not_a_date_time()) {return;}
99  if (currentTime - lastCalculationTime_ < expectedCalculationInterval_) {return;}
100 
101  latestSampleCount = workingSampleCount_;
102  latestValueSum = workingValueSum_;
103  latestValueSumOfSquares = workingValueSumOfSquares_;
104  latestValueMin = workingValueMin_;
105  latestValueMax = workingValueMax_;
106  latestDuration = currentTime - lastCalculationTime_;
107  latestSnapshotTime = currentTime;
108  latestLastLatchedSampleValue = workingLastSampleValue_;
109 
110  lastCalculationTime_ = currentTime;
112  workingValueSum_ = 0.0;
114  workingValueMin_ = INFINITY;
115  workingValueMax_ = -INFINITY;
116  }
117 
118  // lock out any interaction with the results while we update them
119  {
120  boost::mutex::scoped_lock sl(resultsMutex_);
121  lastLatchedSampleValue_ = latestLastLatchedSampleValue;
122 
123  // we simply add the latest results to the full set
124  fullSampleCount_ += latestSampleCount;
125  fullValueSum_ += latestValueSum;
126  fullValueSumOfSquares_ += latestValueSumOfSquares;
127  if (latestValueMin < fullValueMin_) {fullValueMin_ = latestValueMin;}
128  if (latestValueMax > fullValueMax_) {fullValueMax_ = latestValueMax;}
129  fullDuration_ += latestDuration;
130 
131  // for the recent results, we need to replace the contents of
132  // the working bin and re-calculate the recent values
133  binSampleCount_[workingBinId_] = latestSampleCount;
134  binValueSum_[workingBinId_] = latestValueSum;
135  binValueSumOfSquares_[workingBinId_] = latestValueSumOfSquares;
136  binValueMin_[workingBinId_] = latestValueMin;
137  binValueMax_[workingBinId_] = latestValueMax;
138  binDuration_[workingBinId_] = latestDuration;
139  binSnapshotTime_[workingBinId_] = latestSnapshotTime;
140 
141  lastLatchedValueRate_ = latestValueSum / utils::durationToSeconds(latestDuration);
142 
143  recentSampleCount_ = 0;
144  recentValueSum_ = 0.0;
146  recentValueMin_ = INFINITY;
147  recentValueMax_ = -INFINITY;
149 
150  for (unsigned int idx = 0; idx < binCount_; ++idx) {
154  if (binValueMin_[idx] < recentValueMin_) {
156  }
157  if (binValueMax_[idx] > recentValueMax_) {
159  }
161  }
162 
163  // update the working bin ID here so that we are ready for
164  // the next calculation request
165  ++workingBinId_;
166  if (workingBinId_ >= binCount_) {workingBinId_ = 0;}
167 
168  // calculate the derived full values
169  const double fullDuration = utils::durationToSeconds(fullDuration_);
170  fullSampleRate_ = fullSampleCount_ / fullDuration;
171  fullValueRate_ = fullValueSum_ / fullDuration;
172 
173  if (fullSampleCount_ > 0) {
174  fullValueAverage_ = fullValueSum_ / static_cast<double>(fullSampleCount_);
175 
176  double squareAvg = fullValueSumOfSquares_ / static_cast<double>(fullSampleCount_);
177  double avg = fullValueSum_ / static_cast<double>(fullSampleCount_);
178  double sigSquared = squareAvg - avg*avg;
179  if(sigSquared > 0.0) {
180  fullValueRMS_ = sqrt(sigSquared);
181  }
182  else {
183  fullValueRMS_ = 0.0;
184  }
185  }
186  else {
187  fullValueAverage_ = 0.0;
188  fullValueRMS_ = 0.0;
189  }
190 
191  // calculate the derived recent values
192  const double recentDuration = utils::durationToSeconds(recentDuration_);
193  if (recentDuration > 0) {
194  recentSampleRate_ = recentSampleCount_ / recentDuration;
195  recentValueRate_ = recentValueSum_ / recentDuration;
196  }
197  else {
198  recentSampleRate_ = 0.0;
199  recentValueRate_ = 0.0;
200  }
201 
202  if (recentSampleCount_ > 0) {
203  recentValueAverage_ = recentValueSum_ / static_cast<double>(recentSampleCount_);
204 
205  double squareAvg = recentValueSumOfSquares_ /
206  static_cast<double>(recentSampleCount_);
207  double avg = recentValueSum_ / static_cast<double>(recentSampleCount_);
208  double sigSquared = squareAvg - avg*avg;
209  if(sigSquared > 0.0) {
210  recentValueRMS_ = sqrt(sigSquared);
211  }
212  else {
213  recentValueRMS_ = 0.0;
214  }
215  }
216  else {
217  recentValueAverage_ = 0.0;
218  recentValueRMS_ = 0.0;
219  }
220  }
221  }
222 
224  {
225  lastCalculationTime_ = boost::posix_time::not_a_date_time;
227  workingValueSum_ = 0.0;
229  workingValueMin_ = INFINITY;
230  workingValueMax_ = -INFINITY;
232  }
233 
235  {
236  workingBinId_ = 0;
237  for (unsigned int idx = 0; idx < binCount_; ++idx) {
238  binSampleCount_[idx] = 0;
239  binValueSum_[idx] = 0.0;
240  binValueSumOfSquares_[idx] = 0.0;
241  binValueMin_[idx] = INFINITY;
242  binValueMax_[idx] = -INFINITY;
244  binSnapshotTime_[idx] = boost::posix_time::not_a_date_time;
245  }
246 
247  fullSampleCount_ = 0;
248  fullSampleRate_ = 0.0;
249  fullValueSum_ = 0.0;
251  fullValueAverage_ = 0.0;
252  fullValueRMS_ = 0.0;
253  fullValueMin_ = INFINITY;
254  fullValueMax_ = -INFINITY;
255  fullValueRate_ = 0.0;
257 
258  recentSampleCount_ = 0;
259  recentSampleRate_ = 0.0;
260  recentValueSum_ = 0.0;
262  recentValueAverage_ = 0.0;
263  recentValueRMS_ = 0.0;
264  recentValueMin_ = INFINITY;
265  recentValueMax_ = -INFINITY;
266  recentValueRate_ = 0.0;
269  lastLatchedValueRate_ = 0.0;
270  }
271 
273  {
274  {
275  boost::mutex::scoped_lock sl(accumulationMutex_);
277  }
278 
279  {
280  boost::mutex::scoped_lock sl(resultsMutex_);
281  resetResults();
282  }
283  }
284 
286  {
287  if (! enabled_) {
288  reset();
289  enabled_ = true;
290  }
291  }
292 
294  {
295  // It is faster to just set enabled_ to false than to test and set
296  // it conditionally.
297  enabled_ = false;
298  }
299 
301  {
302  // lock the results objects since we're dramatically changing the
303  // bins used for the recent results
304  {
305  boost::mutex::scoped_lock sl(resultsMutex_);
306 
308 
309  // determine how many bins we should use in our sliding window
310  // by dividing the input time window by the expected calculation
311  // interval and rounding to the nearest integer.
312  // In case that the calculation interval is larger then the
313  // interval for recent stats, keep the last one.
314  binCount_ = std::max(1U,
315  static_cast<unsigned int>(
316  (intervalForRecentStats_.total_nanoseconds() / expectedCalculationInterval_.total_nanoseconds()) + 0.5
317  )
318  );
319 
320  // create the vectors for the binned quantities
321  binSampleCount_.reserve(binCount_);
322  binValueSum_.reserve(binCount_);
324  binValueMin_.reserve(binCount_);
325  binValueMax_.reserve(binCount_);
326  binDuration_.reserve(binCount_);
327  binSnapshotTime_.reserve(binCount_);
328 
329  resetResults();
330  }
331 
332  {
333  boost::mutex::scoped_lock sl(accumulationMutex_);
335  }
336 
337  // call the reset method to populate the correct initial values
338  // for the internal sample data
339  //reset();
340  }
341 
342  void
344  {
345  boost::mutex::scoped_lock results(resultsMutex_);
346 
357 
368 
373  uint32_t sourceBinId = workingBinId_;
374  for (uint32_t idx = 0; idx < binCount_; ++idx) {
375  if (sourceBinId >= binCount_) {sourceBinId = 0;}
376  s.recentBinnedSampleCounts[idx] = binSampleCount_[sourceBinId];
377  s.recentBinnedValueSums[idx] = binValueSum_[sourceBinId];
378  s.recentBinnedDurations[idx] = binDuration_[sourceBinId];
379  s.recentBinnedSnapshotTimes[idx] = binSnapshotTime_[sourceBinId];
380  ++sourceBinId;
381  }
382 
385  s.enabled = enabled_;
386  }
387 
388 } // namespace stor
389 
390 
TimePoint_t getCurrentTime()
Definition: Utils.h:158
utils::Duration_t intervalForRecentStats_
utils::Duration_t recentDuration_
std::vector< utils::TimePoint_t > binSnapshotTime_
double seconds()
void setNewTimeWindowForRecentResults(const utils::Duration_t &interval)
tuple interval
Definition: MergeJob_cfg.py:20
void addSample(const double &value=1)
std::vector< uint64_t > recentBinnedSampleCounts
std::vector< uint64_t > binSampleCount_
void getStats(Stats &stats) const
std::vector< double > binValueSum_
std::vector< utils::TimePoint_t > recentBinnedSnapshotTimes
const utils::Duration_t expectedCalculationInterval_
std::vector< utils::Duration_t > binDuration_
std::vector< double > binValueMax_
void calculateStatistics(const utils::TimePoint_t &currentTime=utils::getCurrentTime())
utils::Duration_t fullDuration_
boost::posix_time::time_duration Duration_t
Definition: Utils.h:41
const T & max(const T &a, const T &b)
T sqrt(T t)
Definition: SSEVec.h:46
boost::posix_time::ptime TimePoint_t
Definition: Utils.h:35
std::vector< double > recentBinnedValueSums
utils::TimePoint_t lastCalculationTime_
std::vector< utils::Duration_t > recentBinnedDurations
double durationToSeconds(Duration_t const &)
Definition: Utils.h:147
void addSampleIfLarger(const double &value)
MonitoredQuantity(utils::Duration_t expectedCalculationInterval, utils::Duration_t timeWindowForRecentResults)
std::vector< double > binValueMin_
std::vector< double > binValueSumOfSquares_