CMS 3D CMS Logo

/afs/cern.ch/work/a/aaltunda/public/www/CMSSW_5_3_14/src/EventFilter/StorageManager/src/MonitoredQuantity.cc

Go to the documentation of this file.
00001 // $Id: MonitoredQuantity.cc,v 1.13 2011/04/07 08:01:40 mommsen Exp $
00003 
00004 #include "EventFilter/StorageManager/interface/MonitoredQuantity.h"
00005 
00006 #include <algorithm>
00007 #include <math.h>
00008 
00009 
00010 namespace stor {
00011   
00012   MonitoredQuantity::MonitoredQuantity
00013   (
00014     utils::Duration_t expectedCalculationInterval,
00015     utils::Duration_t timeWindowForRecentResults
00016   ):
00017   enabled_(true),
00018   expectedCalculationInterval_(expectedCalculationInterval)
00019   {
00020     setNewTimeWindowForRecentResults(timeWindowForRecentResults);
00021   }
00022   
00023   void MonitoredQuantity::addSample(const double& value)
00024   {
00025     if (! enabled_) {return;}
00026     
00027     boost::mutex::scoped_lock sl(accumulationMutex_);
00028     
00029     if ( lastCalculationTime_.is_not_a_date_time() )
00030     {
00031       lastCalculationTime_ = utils::getCurrentTime();
00032     }
00033     
00034     ++workingSampleCount_;
00035     workingValueSum_ += value;
00036     workingValueSumOfSquares_ += (value * value);
00037     
00038     if (value < workingValueMin_) workingValueMin_ = value;
00039     if (value > workingValueMax_) workingValueMax_ = value;
00040     
00041     workingLastSampleValue_ = value;
00042   }
00043   
00044   void MonitoredQuantity::addSample(const int& value)
00045   {
00046     addSample(static_cast<double>(value));
00047   }
00048   
00049   void MonitoredQuantity::addSample(const unsigned int& value)
00050   {
00051     addSample(static_cast<double>(value));
00052   }
00053   
00054   void MonitoredQuantity::addSample(const long& value)
00055   {
00056     addSample(static_cast<double>(value));
00057   }
00058   
00059   void MonitoredQuantity::addSample(const unsigned long& value)
00060   {
00061     addSample(static_cast<double>(value));
00062   }
00063   
00064   void MonitoredQuantity::addSample(const long long& value)
00065   {
00066     addSample(static_cast<double>(value));
00067   }
00068   
00069   void MonitoredQuantity::addSample(const unsigned long long& value)
00070   {
00071     addSample(static_cast<double>(value));
00072   }
00073   
00074   void MonitoredQuantity::addSampleIfLarger(const double& value)
00075   {
00076     if (value > workingLastSampleValue_)
00077       addSample(value);
00078   }
00079   
00080   void MonitoredQuantity::calculateStatistics(const utils::TimePoint_t& currentTime)
00081   {
00082     if (! enabled_) {return;}
00083     
00084     // create local copies of the working values to minimize the
00085     // time that we could block a thread trying to add a sample.
00086     // Also, reset the working values.
00087     long long latestSampleCount;
00088     double latestValueSum;
00089     double latestValueSumOfSquares;
00090     double latestValueMin;
00091     double latestValueMax;
00092     utils::Duration_t latestDuration;
00093     utils::TimePoint_t latestSnapshotTime;
00094     double latestLastLatchedSampleValue;
00095     {
00096       boost::mutex::scoped_lock sl(accumulationMutex_);
00097 
00098       if (lastCalculationTime_.is_not_a_date_time()) {return;}
00099       if (currentTime - lastCalculationTime_ < expectedCalculationInterval_) {return;}
00100       
00101       latestSampleCount = workingSampleCount_;
00102       latestValueSum = workingValueSum_;
00103       latestValueSumOfSquares = workingValueSumOfSquares_;
00104       latestValueMin = workingValueMin_;
00105       latestValueMax = workingValueMax_;
00106       latestDuration = currentTime - lastCalculationTime_;
00107       latestSnapshotTime = currentTime;
00108       latestLastLatchedSampleValue = workingLastSampleValue_;
00109       
00110       lastCalculationTime_ = currentTime;
00111       workingSampleCount_ = 0;
00112       workingValueSum_ = 0.0;
00113       workingValueSumOfSquares_ = 0.0;
00114       workingValueMin_ =  INFINITY;
00115       workingValueMax_ = -INFINITY;
00116     }
00117     
00118     // lock out any interaction with the results while we update them
00119     {
00120       boost::mutex::scoped_lock sl(resultsMutex_);
00121       lastLatchedSampleValue_ = latestLastLatchedSampleValue;
00122       
00123       // we simply add the latest results to the full set
00124       fullSampleCount_ += latestSampleCount;
00125       fullValueSum_ += latestValueSum;
00126       fullValueSumOfSquares_ += latestValueSumOfSquares;
00127       if (latestValueMin < fullValueMin_) {fullValueMin_ = latestValueMin;}
00128       if (latestValueMax > fullValueMax_) {fullValueMax_ = latestValueMax;}
00129       fullDuration_ += latestDuration;
00130       
00131       // for the recent results, we need to replace the contents of
00132       // the working bin and re-calculate the recent values
00133       binSampleCount_[workingBinId_] = latestSampleCount;
00134       binValueSum_[workingBinId_] = latestValueSum;
00135       binValueSumOfSquares_[workingBinId_] = latestValueSumOfSquares;
00136       binValueMin_[workingBinId_] = latestValueMin;
00137       binValueMax_[workingBinId_] = latestValueMax;
00138       binDuration_[workingBinId_] = latestDuration;
00139       binSnapshotTime_[workingBinId_] = latestSnapshotTime;
00140       
00141       lastLatchedValueRate_ = latestValueSum / utils::durationToSeconds(latestDuration);
00142       
00143       recentSampleCount_ = 0;
00144       recentValueSum_ = 0.0;
00145       recentValueSumOfSquares_ = 0.0;
00146       recentValueMin_ =  INFINITY;
00147       recentValueMax_ = -INFINITY;
00148       recentDuration_ = boost::posix_time::seconds(0);
00149       
00150       for (unsigned int idx = 0; idx < binCount_; ++idx) {
00151         recentSampleCount_ += binSampleCount_[idx];
00152         recentValueSum_ += binValueSum_[idx];
00153         recentValueSumOfSquares_ += binValueSumOfSquares_[idx];
00154         if (binValueMin_[idx] < recentValueMin_) {
00155           recentValueMin_ = binValueMin_[idx];
00156         }
00157         if (binValueMax_[idx] > recentValueMax_) {
00158           recentValueMax_ = binValueMax_[idx];
00159         }
00160         recentDuration_ += binDuration_[idx];
00161       }
00162       
00163       // update the working bin ID here so that we are ready for
00164       // the next calculation request
00165       ++workingBinId_;
00166       if (workingBinId_ >= binCount_) {workingBinId_ = 0;}
00167       
00168       // calculate the derived full values
00169       const double fullDuration = utils::durationToSeconds(fullDuration_);
00170       fullSampleRate_ = fullSampleCount_ / fullDuration;
00171       fullValueRate_ = fullValueSum_ / fullDuration;
00172       
00173       if (fullSampleCount_ > 0) {
00174         fullValueAverage_ = fullValueSum_ / static_cast<double>(fullSampleCount_);
00175         
00176         double squareAvg = fullValueSumOfSquares_ / static_cast<double>(fullSampleCount_);
00177         double avg = fullValueSum_ / static_cast<double>(fullSampleCount_);
00178         double sigSquared = squareAvg - avg*avg;
00179         if(sigSquared > 0.0) {
00180           fullValueRMS_ = sqrt(sigSquared);
00181         }
00182         else {
00183           fullValueRMS_ = 0.0;
00184         }
00185       }
00186       else {
00187         fullValueAverage_ = 0.0;
00188         fullValueRMS_ = 0.0;
00189       }
00190       
00191       // calculate the derived recent values
00192       const double recentDuration = utils::durationToSeconds(recentDuration_);
00193       if (recentDuration > 0) {
00194         recentSampleRate_ = recentSampleCount_ / recentDuration;
00195         recentValueRate_ = recentValueSum_ / recentDuration;
00196       }
00197       else {
00198         recentSampleRate_ = 0.0;
00199         recentValueRate_ = 0.0;
00200       }
00201       
00202       if (recentSampleCount_ > 0) {
00203         recentValueAverage_ = recentValueSum_ / static_cast<double>(recentSampleCount_);
00204         
00205         double squareAvg = recentValueSumOfSquares_ /
00206           static_cast<double>(recentSampleCount_);
00207         double avg = recentValueSum_ / static_cast<double>(recentSampleCount_);
00208         double sigSquared = squareAvg - avg*avg;
00209         if(sigSquared > 0.0) {
00210           recentValueRMS_ = sqrt(sigSquared);
00211         }
00212         else {
00213           recentValueRMS_ = 0.0;
00214         }
00215       }
00216       else {
00217         recentValueAverage_ = 0.0;
00218         recentValueRMS_ = 0.0;
00219       }
00220     }
00221   }
00222   
00223   void MonitoredQuantity::resetAccumulators()
00224   {
00225     lastCalculationTime_ = boost::posix_time::not_a_date_time;
00226     workingSampleCount_ = 0;
00227     workingValueSum_ = 0.0;
00228     workingValueSumOfSquares_ = 0.0;
00229     workingValueMin_ =  INFINITY;
00230     workingValueMax_ = -INFINITY;
00231     workingLastSampleValue_ = 0;
00232   }
00233   
00234   void MonitoredQuantity::resetResults()
00235   {
00236     workingBinId_ = 0;
00237     for (unsigned int idx = 0; idx < binCount_; ++idx) {
00238       binSampleCount_[idx] = 0;
00239       binValueSum_[idx] = 0.0;
00240       binValueSumOfSquares_[idx] = 0.0;
00241       binValueMin_[idx] =  INFINITY;
00242       binValueMax_[idx] = -INFINITY;
00243       binDuration_[idx] = boost::posix_time::seconds(0);
00244       binSnapshotTime_[idx] = boost::posix_time::not_a_date_time;
00245     }
00246     
00247     fullSampleCount_ = 0;
00248     fullSampleRate_ = 0.0;
00249     fullValueSum_ = 0.0;
00250     fullValueSumOfSquares_ = 0.0;
00251     fullValueAverage_ = 0.0;
00252     fullValueRMS_ = 0.0;
00253     fullValueMin_ =  INFINITY;
00254     fullValueMax_ = -INFINITY;
00255     fullValueRate_ = 0.0;
00256     fullDuration_ = boost::posix_time::seconds(0);
00257     
00258     recentSampleCount_ = 0;
00259     recentSampleRate_ = 0.0;
00260     recentValueSum_ = 0.0;
00261     recentValueSumOfSquares_ = 0.0;
00262     recentValueAverage_ = 0.0;
00263     recentValueRMS_ = 0.0;
00264     recentValueMin_ =  INFINITY;
00265     recentValueMax_ = -INFINITY;
00266     recentValueRate_ = 0.0;
00267     recentDuration_ = boost::posix_time::seconds(0);
00268     lastLatchedSampleValue_ = 0.0;
00269     lastLatchedValueRate_ = 0.0;
00270   }
00271   
00272   void MonitoredQuantity::reset()
00273   {
00274     {
00275       boost::mutex::scoped_lock sl(accumulationMutex_);
00276       resetAccumulators();
00277     }
00278     
00279     {
00280       boost::mutex::scoped_lock sl(resultsMutex_);
00281       resetResults();
00282     }
00283   }
00284   
00285   void MonitoredQuantity::enable()
00286   {
00287     if (! enabled_) {
00288       reset();
00289       enabled_ = true;
00290     }
00291   }
00292   
00293   void MonitoredQuantity::disable()
00294   {
00295     // It is faster to just set enabled_ to false than to test and set
00296     // it conditionally.
00297     enabled_ = false;
00298   }
00299   
00300   void MonitoredQuantity::setNewTimeWindowForRecentResults(const utils::Duration_t& interval)
00301   {
00302     // lock the results objects since we're dramatically changing the
00303     // bins used for the recent results
00304     {
00305       boost::mutex::scoped_lock sl(resultsMutex_);
00306       
00307       intervalForRecentStats_ = interval;
00308       
00309       // determine how many bins we should use in our sliding window
00310       // by dividing the input time window by the expected calculation
00311       // interval and rounding to the nearest integer.
00312       // In case that the calculation interval is larger then the 
00313       // interval for recent stats, keep the last one.
00314       binCount_ = std::max(1U,
00315         static_cast<unsigned int>(
00316           (intervalForRecentStats_.total_nanoseconds() / expectedCalculationInterval_.total_nanoseconds()) + 0.5
00317         )      
00318       );
00319       
00320       // create the vectors for the binned quantities
00321       binSampleCount_.reserve(binCount_);
00322       binValueSum_.reserve(binCount_);
00323       binValueSumOfSquares_.reserve(binCount_);
00324       binValueMin_.reserve(binCount_);
00325       binValueMax_.reserve(binCount_);
00326       binDuration_.reserve(binCount_);
00327       binSnapshotTime_.reserve(binCount_);
00328       
00329       resetResults();
00330     }
00331     
00332     {
00333       boost::mutex::scoped_lock sl(accumulationMutex_);
00334       resetAccumulators();
00335     }
00336     
00337     // call the reset method to populate the correct initial values
00338     // for the internal sample data
00339     //reset();
00340   }
00341   
00342   void
00343   MonitoredQuantity::getStats(Stats& s) const
00344   {
00345     boost::mutex::scoped_lock results(resultsMutex_);
00346     
00347     s.fullSampleCount = fullSampleCount_;
00348     s.fullSampleRate = fullSampleRate_;
00349     s.fullValueSum = fullValueSum_;
00350     s.fullValueSumOfSquares = fullValueSumOfSquares_;
00351     s.fullValueAverage = fullValueAverage_;
00352     s.fullValueRMS = fullValueRMS_;
00353     s.fullValueMin = fullValueMin_;
00354     s.fullValueMax = fullValueMax_;
00355     s.fullValueRate = fullValueRate_;
00356     s.fullDuration = fullDuration_;
00357     
00358     s.recentSampleCount = recentSampleCount_;
00359     s.recentSampleRate = recentSampleRate_;
00360     s.recentValueSum = recentValueSum_;
00361     s.recentValueSumOfSquares = recentValueSumOfSquares_;
00362     s.recentValueAverage = recentValueAverage_;
00363     s.recentValueRMS = recentValueRMS_;
00364     s.recentValueMin = recentValueMin_;
00365     s.recentValueMax = recentValueMax_;
00366     s.recentValueRate = recentValueRate_;
00367     s.recentDuration = recentDuration_;
00368     
00369     s.recentBinnedSampleCounts.resize(binCount_);
00370     s.recentBinnedValueSums.resize(binCount_);
00371     s.recentBinnedDurations.resize(binCount_);
00372     s.recentBinnedSnapshotTimes.resize(binCount_);
00373     uint32_t sourceBinId = workingBinId_;
00374     for (uint32_t idx = 0; idx < binCount_; ++idx) {
00375       if (sourceBinId >= binCount_) {sourceBinId = 0;}
00376       s.recentBinnedSampleCounts[idx] = binSampleCount_[sourceBinId];
00377       s.recentBinnedValueSums[idx] = binValueSum_[sourceBinId];
00378       s.recentBinnedDurations[idx] = binDuration_[sourceBinId];
00379       s.recentBinnedSnapshotTimes[idx] = binSnapshotTime_[sourceBinId];
00380       ++sourceBinId;
00381     }
00382     
00383     s.lastSampleValue = lastLatchedSampleValue_;
00384     s.lastValueRate = lastLatchedValueRate_;
00385     s.enabled = enabled_;
00386   }
00387   
00388 } // namespace stor
00389 
00390