00001 // $Id: MonitoredQuantity.cc,v 1.13 2011/04/07 08:01:40 mommsen Exp $ 00003 00004 #include "EventFilter/StorageManager/interface/MonitoredQuantity.h" 00005 00006 #include <algorithm> 00007 #include <math.h> 00008 00009 00010 namespace stor { 00011 00012 MonitoredQuantity::MonitoredQuantity 00013 ( 00014 utils::Duration_t expectedCalculationInterval, 00015 utils::Duration_t timeWindowForRecentResults 00016 ): 00017 enabled_(true), 00018 expectedCalculationInterval_(expectedCalculationInterval) 00019 { 00020 setNewTimeWindowForRecentResults(timeWindowForRecentResults); 00021 } 00022 00023 void MonitoredQuantity::addSample(const double& value) 00024 { 00025 if (! enabled_) {return;} 00026 00027 boost::mutex::scoped_lock sl(accumulationMutex_); 00028 00029 if ( lastCalculationTime_.is_not_a_date_time() ) 00030 { 00031 lastCalculationTime_ = utils::getCurrentTime(); 00032 } 00033 00034 ++workingSampleCount_; 00035 workingValueSum_ += value; 00036 workingValueSumOfSquares_ += (value * value); 00037 00038 if (value < workingValueMin_) workingValueMin_ = value; 00039 if (value > workingValueMax_) workingValueMax_ = value; 00040 00041 workingLastSampleValue_ = value; 00042 } 00043 00044 void MonitoredQuantity::addSample(const int& value) 00045 { 00046 addSample(static_cast<double>(value)); 00047 } 00048 00049 void MonitoredQuantity::addSample(const unsigned int& value) 00050 { 00051 addSample(static_cast<double>(value)); 00052 } 00053 00054 void MonitoredQuantity::addSample(const long& value) 00055 { 00056 addSample(static_cast<double>(value)); 00057 } 00058 00059 void MonitoredQuantity::addSample(const unsigned long& value) 00060 { 00061 addSample(static_cast<double>(value)); 00062 } 00063 00064 void MonitoredQuantity::addSample(const long long& value) 00065 { 00066 addSample(static_cast<double>(value)); 00067 } 00068 00069 void MonitoredQuantity::addSample(const unsigned long long& value) 00070 { 00071 addSample(static_cast<double>(value)); 00072 } 00073 00074 void MonitoredQuantity::addSampleIfLarger(const double& value) 00075 { 00076 if (value > workingLastSampleValue_) 00077 addSample(value); 00078 } 00079 00080 void MonitoredQuantity::calculateStatistics(const utils::TimePoint_t& currentTime) 00081 { 00082 if (! enabled_) {return;} 00083 00084 // create local copies of the working values to minimize the 00085 // time that we could block a thread trying to add a sample. 00086 // Also, reset the working values. 00087 long long latestSampleCount; 00088 double latestValueSum; 00089 double latestValueSumOfSquares; 00090 double latestValueMin; 00091 double latestValueMax; 00092 utils::Duration_t latestDuration; 00093 utils::TimePoint_t latestSnapshotTime; 00094 double latestLastLatchedSampleValue; 00095 { 00096 boost::mutex::scoped_lock sl(accumulationMutex_); 00097 00098 if (lastCalculationTime_.is_not_a_date_time()) {return;} 00099 if (currentTime - lastCalculationTime_ < expectedCalculationInterval_) {return;} 00100 00101 latestSampleCount = workingSampleCount_; 00102 latestValueSum = workingValueSum_; 00103 latestValueSumOfSquares = workingValueSumOfSquares_; 00104 latestValueMin = workingValueMin_; 00105 latestValueMax = workingValueMax_; 00106 latestDuration = currentTime - lastCalculationTime_; 00107 latestSnapshotTime = currentTime; 00108 latestLastLatchedSampleValue = workingLastSampleValue_; 00109 00110 lastCalculationTime_ = currentTime; 00111 workingSampleCount_ = 0; 00112 workingValueSum_ = 0.0; 00113 workingValueSumOfSquares_ = 0.0; 00114 workingValueMin_ = INFINITY; 00115 workingValueMax_ = -INFINITY; 00116 } 00117 00118 // lock out any interaction with the results while we update them 00119 { 00120 boost::mutex::scoped_lock sl(resultsMutex_); 00121 lastLatchedSampleValue_ = latestLastLatchedSampleValue; 00122 00123 // we simply add the latest results to the full set 00124 fullSampleCount_ += latestSampleCount; 00125 fullValueSum_ += latestValueSum; 00126 fullValueSumOfSquares_ += latestValueSumOfSquares; 00127 if (latestValueMin < fullValueMin_) {fullValueMin_ = latestValueMin;} 00128 if (latestValueMax > fullValueMax_) {fullValueMax_ = latestValueMax;} 00129 fullDuration_ += latestDuration; 00130 00131 // for the recent results, we need to replace the contents of 00132 // the working bin and re-calculate the recent values 00133 binSampleCount_[workingBinId_] = latestSampleCount; 00134 binValueSum_[workingBinId_] = latestValueSum; 00135 binValueSumOfSquares_[workingBinId_] = latestValueSumOfSquares; 00136 binValueMin_[workingBinId_] = latestValueMin; 00137 binValueMax_[workingBinId_] = latestValueMax; 00138 binDuration_[workingBinId_] = latestDuration; 00139 binSnapshotTime_[workingBinId_] = latestSnapshotTime; 00140 00141 lastLatchedValueRate_ = latestValueSum / utils::durationToSeconds(latestDuration); 00142 00143 recentSampleCount_ = 0; 00144 recentValueSum_ = 0.0; 00145 recentValueSumOfSquares_ = 0.0; 00146 recentValueMin_ = INFINITY; 00147 recentValueMax_ = -INFINITY; 00148 recentDuration_ = boost::posix_time::seconds(0); 00149 00150 for (unsigned int idx = 0; idx < binCount_; ++idx) { 00151 recentSampleCount_ += binSampleCount_[idx]; 00152 recentValueSum_ += binValueSum_[idx]; 00153 recentValueSumOfSquares_ += binValueSumOfSquares_[idx]; 00154 if (binValueMin_[idx] < recentValueMin_) { 00155 recentValueMin_ = binValueMin_[idx]; 00156 } 00157 if (binValueMax_[idx] > recentValueMax_) { 00158 recentValueMax_ = binValueMax_[idx]; 00159 } 00160 recentDuration_ += binDuration_[idx]; 00161 } 00162 00163 // update the working bin ID here so that we are ready for 00164 // the next calculation request 00165 ++workingBinId_; 00166 if (workingBinId_ >= binCount_) {workingBinId_ = 0;} 00167 00168 // calculate the derived full values 00169 const double fullDuration = utils::durationToSeconds(fullDuration_); 00170 fullSampleRate_ = fullSampleCount_ / fullDuration; 00171 fullValueRate_ = fullValueSum_ / fullDuration; 00172 00173 if (fullSampleCount_ > 0) { 00174 fullValueAverage_ = fullValueSum_ / static_cast<double>(fullSampleCount_); 00175 00176 double squareAvg = fullValueSumOfSquares_ / static_cast<double>(fullSampleCount_); 00177 double avg = fullValueSum_ / static_cast<double>(fullSampleCount_); 00178 double sigSquared = squareAvg - avg*avg; 00179 if(sigSquared > 0.0) { 00180 fullValueRMS_ = sqrt(sigSquared); 00181 } 00182 else { 00183 fullValueRMS_ = 0.0; 00184 } 00185 } 00186 else { 00187 fullValueAverage_ = 0.0; 00188 fullValueRMS_ = 0.0; 00189 } 00190 00191 // calculate the derived recent values 00192 const double recentDuration = utils::durationToSeconds(recentDuration_); 00193 if (recentDuration > 0) { 00194 recentSampleRate_ = recentSampleCount_ / recentDuration; 00195 recentValueRate_ = recentValueSum_ / recentDuration; 00196 } 00197 else { 00198 recentSampleRate_ = 0.0; 00199 recentValueRate_ = 0.0; 00200 } 00201 00202 if (recentSampleCount_ > 0) { 00203 recentValueAverage_ = recentValueSum_ / static_cast<double>(recentSampleCount_); 00204 00205 double squareAvg = recentValueSumOfSquares_ / 00206 static_cast<double>(recentSampleCount_); 00207 double avg = recentValueSum_ / static_cast<double>(recentSampleCount_); 00208 double sigSquared = squareAvg - avg*avg; 00209 if(sigSquared > 0.0) { 00210 recentValueRMS_ = sqrt(sigSquared); 00211 } 00212 else { 00213 recentValueRMS_ = 0.0; 00214 } 00215 } 00216 else { 00217 recentValueAverage_ = 0.0; 00218 recentValueRMS_ = 0.0; 00219 } 00220 } 00221 } 00222 00223 void MonitoredQuantity::resetAccumulators() 00224 { 00225 lastCalculationTime_ = boost::posix_time::not_a_date_time; 00226 workingSampleCount_ = 0; 00227 workingValueSum_ = 0.0; 00228 workingValueSumOfSquares_ = 0.0; 00229 workingValueMin_ = INFINITY; 00230 workingValueMax_ = -INFINITY; 00231 workingLastSampleValue_ = 0; 00232 } 00233 00234 void MonitoredQuantity::resetResults() 00235 { 00236 workingBinId_ = 0; 00237 for (unsigned int idx = 0; idx < binCount_; ++idx) { 00238 binSampleCount_[idx] = 0; 00239 binValueSum_[idx] = 0.0; 00240 binValueSumOfSquares_[idx] = 0.0; 00241 binValueMin_[idx] = INFINITY; 00242 binValueMax_[idx] = -INFINITY; 00243 binDuration_[idx] = boost::posix_time::seconds(0); 00244 binSnapshotTime_[idx] = boost::posix_time::not_a_date_time; 00245 } 00246 00247 fullSampleCount_ = 0; 00248 fullSampleRate_ = 0.0; 00249 fullValueSum_ = 0.0; 00250 fullValueSumOfSquares_ = 0.0; 00251 fullValueAverage_ = 0.0; 00252 fullValueRMS_ = 0.0; 00253 fullValueMin_ = INFINITY; 00254 fullValueMax_ = -INFINITY; 00255 fullValueRate_ = 0.0; 00256 fullDuration_ = boost::posix_time::seconds(0); 00257 00258 recentSampleCount_ = 0; 00259 recentSampleRate_ = 0.0; 00260 recentValueSum_ = 0.0; 00261 recentValueSumOfSquares_ = 0.0; 00262 recentValueAverage_ = 0.0; 00263 recentValueRMS_ = 0.0; 00264 recentValueMin_ = INFINITY; 00265 recentValueMax_ = -INFINITY; 00266 recentValueRate_ = 0.0; 00267 recentDuration_ = boost::posix_time::seconds(0); 00268 lastLatchedSampleValue_ = 0.0; 00269 lastLatchedValueRate_ = 0.0; 00270 } 00271 00272 void MonitoredQuantity::reset() 00273 { 00274 { 00275 boost::mutex::scoped_lock sl(accumulationMutex_); 00276 resetAccumulators(); 00277 } 00278 00279 { 00280 boost::mutex::scoped_lock sl(resultsMutex_); 00281 resetResults(); 00282 } 00283 } 00284 00285 void MonitoredQuantity::enable() 00286 { 00287 if (! enabled_) { 00288 reset(); 00289 enabled_ = true; 00290 } 00291 } 00292 00293 void MonitoredQuantity::disable() 00294 { 00295 // It is faster to just set enabled_ to false than to test and set 00296 // it conditionally. 00297 enabled_ = false; 00298 } 00299 00300 void MonitoredQuantity::setNewTimeWindowForRecentResults(const utils::Duration_t& interval) 00301 { 00302 // lock the results objects since we're dramatically changing the 00303 // bins used for the recent results 00304 { 00305 boost::mutex::scoped_lock sl(resultsMutex_); 00306 00307 intervalForRecentStats_ = interval; 00308 00309 // determine how many bins we should use in our sliding window 00310 // by dividing the input time window by the expected calculation 00311 // interval and rounding to the nearest integer. 00312 // In case that the calculation interval is larger then the 00313 // interval for recent stats, keep the last one. 00314 binCount_ = std::max(1U, 00315 static_cast<unsigned int>( 00316 (intervalForRecentStats_.total_nanoseconds() / expectedCalculationInterval_.total_nanoseconds()) + 0.5 00317 ) 00318 ); 00319 00320 // create the vectors for the binned quantities 00321 binSampleCount_.reserve(binCount_); 00322 binValueSum_.reserve(binCount_); 00323 binValueSumOfSquares_.reserve(binCount_); 00324 binValueMin_.reserve(binCount_); 00325 binValueMax_.reserve(binCount_); 00326 binDuration_.reserve(binCount_); 00327 binSnapshotTime_.reserve(binCount_); 00328 00329 resetResults(); 00330 } 00331 00332 { 00333 boost::mutex::scoped_lock sl(accumulationMutex_); 00334 resetAccumulators(); 00335 } 00336 00337 // call the reset method to populate the correct initial values 00338 // for the internal sample data 00339 //reset(); 00340 } 00341 00342 void 00343 MonitoredQuantity::getStats(Stats& s) const 00344 { 00345 boost::mutex::scoped_lock results(resultsMutex_); 00346 00347 s.fullSampleCount = fullSampleCount_; 00348 s.fullSampleRate = fullSampleRate_; 00349 s.fullValueSum = fullValueSum_; 00350 s.fullValueSumOfSquares = fullValueSumOfSquares_; 00351 s.fullValueAverage = fullValueAverage_; 00352 s.fullValueRMS = fullValueRMS_; 00353 s.fullValueMin = fullValueMin_; 00354 s.fullValueMax = fullValueMax_; 00355 s.fullValueRate = fullValueRate_; 00356 s.fullDuration = fullDuration_; 00357 00358 s.recentSampleCount = recentSampleCount_; 00359 s.recentSampleRate = recentSampleRate_; 00360 s.recentValueSum = recentValueSum_; 00361 s.recentValueSumOfSquares = recentValueSumOfSquares_; 00362 s.recentValueAverage = recentValueAverage_; 00363 s.recentValueRMS = recentValueRMS_; 00364 s.recentValueMin = recentValueMin_; 00365 s.recentValueMax = recentValueMax_; 00366 s.recentValueRate = recentValueRate_; 00367 s.recentDuration = recentDuration_; 00368 00369 s.recentBinnedSampleCounts.resize(binCount_); 00370 s.recentBinnedValueSums.resize(binCount_); 00371 s.recentBinnedDurations.resize(binCount_); 00372 s.recentBinnedSnapshotTimes.resize(binCount_); 00373 uint32_t sourceBinId = workingBinId_; 00374 for (uint32_t idx = 0; idx < binCount_; ++idx) { 00375 if (sourceBinId >= binCount_) {sourceBinId = 0;} 00376 s.recentBinnedSampleCounts[idx] = binSampleCount_[sourceBinId]; 00377 s.recentBinnedValueSums[idx] = binValueSum_[sourceBinId]; 00378 s.recentBinnedDurations[idx] = binDuration_[sourceBinId]; 00379 s.recentBinnedSnapshotTimes[idx] = binSnapshotTime_[sourceBinId]; 00380 ++sourceBinId; 00381 } 00382 00383 s.lastSampleValue = lastLatchedSampleValue_; 00384 s.lastValueRate = lastLatchedValueRate_; 00385 s.enabled = enabled_; 00386 } 00387 00388 } // namespace stor 00389 00390