CMS 3D CMS Logo

RollingIntervalCounter.cc

Go to the documentation of this file.
00001 
00005 #include "EventFilter/StorageManager/interface/RollingIntervalCounter.h"
00006 
00007 using namespace stor;
00008 
00012 RollingIntervalCounter::
00013 RollingIntervalCounter(double timeWindowSize, double timeBinSize,
00014                        double validSubWindowSize, AccumulationStyle style):
00015   accumStyle_(style),
00016   startTime_(0.0),
00017   processedBinCount_(0),
00018   workingBinSum_(0.0),
00019   workingBinSampleCount_(0),
00020   workingBinId_(-1),
00021   currentTotal_(0.0),
00022   currentSampleCount_(0)
00023 {
00024   // provide a reasonable default for the window size, if needed
00025   if (timeWindowSize <= 0.0) {
00026     this->windowSize_ = 180.0;  // 3 minutes
00027   }
00028   else {
00029     this->windowSize_ = timeWindowSize;
00030   }
00031 
00032   // provide a reasonable default for the bin size, if needed
00033   if (timeBinSize <= 0.0) {
00034     this->binSize_ = this->windowSize_;
00035   }
00036   else {
00037     this->binSize_ = timeBinSize;
00038   }
00039 
00040   // determine the bin count from the window and bin sizes
00041   if (this->windowSize_ > 0.0 && this->binSize_ > 0.0) {
00042     this->binCount_ = (int) (0.5 + (this->windowSize_ / this->binSize_));
00043     if (this->binCount_ < 1) {
00044       this->binCount_ = 1;
00045     }
00046 
00047     // recalculate the window size to handle rounding
00048     this->windowSize_ = this->binCount_ * this->binSize_;
00049   }
00050   else {
00051     this->binCount_ = 1;
00052   }
00053 
00054   // determine the number of bins needed for a valid result
00055   this->validBinCount_ = (int) (0.5 + (validSubWindowSize / this->binSize_));
00056   if (this->validBinCount_ <= 0) {
00057     this->validBinCount_ = 1;
00058   }
00059 
00060   // if working with data immediately, assume the first bin is good
00061   if (this->accumStyle_ == INCLUDE_SAMPLES_IMMEDIATELY) {
00062     processedBinCount_ = 1;
00063   }
00064 
00065   // initialize times and bins
00066   this->binContents_.reset(new std::vector<double>);
00067   this->binSamples_.reset(new std::vector<unsigned int>);
00068   for (int idx = 0; idx < this->binCount_; ++idx) {
00069     this->binContents_->push_back(0.0);
00070     this->binSamples_->push_back(0);
00071   }
00072 }
00073 
00077 void RollingIntervalCounter::addSample(double value, double currentTime)
00078 {
00079   boost::recursive_mutex::scoped_lock sl(dataMutex_);
00080 
00081   // initialize the working bin, if needed
00082   if (workingBinId_ < 0) {
00083     workingBinId_ = getBinId(currentTime);
00084   }
00085 
00086   // initialize the start time, if needed
00087   if (startTime_ <= 0.0) {
00088     startTime_ = currentTime;
00089   }
00090 
00091   // shuffle the bins so that they correctly reflect the sample window
00092   shuffleBins(currentTime);
00093 
00094   // add the new sample to either the working sum or the current
00095   // sum depending on the accumulation style
00096   if (accumStyle_ == INCLUDE_SAMPLES_AFTER_BINNING) {
00097     workingBinSum_ += value;
00098     ++workingBinSampleCount_;
00099   }
00100   else {
00101     int binIndex = (int) (workingBinId_ % binCount_);
00102     (*binContents_)[binIndex] += value;
00103     currentTotal_ += value;
00104     (*binSamples_)[binIndex] += 1;
00105     ++currentSampleCount_;
00106   }
00107 }
00108 
00112 bool RollingIntervalCounter::hasValidResult(double currentTime)
00113 {
00114   boost::recursive_mutex::scoped_lock sl(dataMutex_);
00115   shuffleBins(currentTime);
00116 
00117   return (processedBinCount_ >= validBinCount_);
00118 }
00119 
00125 unsigned int RollingIntervalCounter::getSampleCount(double currentTime)
00126 {
00127   boost::recursive_mutex::scoped_lock sl(dataMutex_);
00128   shuffleBins(currentTime);
00129 
00130   return currentSampleCount_;
00131 }
00132 
00138 double RollingIntervalCounter::getSampleRate(double currentTime)
00139 {
00140   boost::recursive_mutex::scoped_lock sl(dataMutex_);
00141   shuffleBins(currentTime);
00142 
00143   double duration = getDuration();
00144   if (duration > 0.0) {
00145     return ((double) getSampleCount()) / duration;
00146   }
00147   else {
00148     return 0.0;
00149   }
00150 }
00151 
00155 double RollingIntervalCounter::getValueSum(double currentTime)
00156 {
00157   boost::recursive_mutex::scoped_lock sl(dataMutex_);
00158   shuffleBins(currentTime);
00159 
00160   return currentTotal_;
00161 }
00162 
00167 double RollingIntervalCounter::getValueAverage(double currentTime)
00168 {
00169   boost::recursive_mutex::scoped_lock sl(dataMutex_);
00170   shuffleBins(currentTime);
00171 
00172   if (getSampleCount() > 0) {
00173     return (currentTotal_ / ((double) getSampleCount()));
00174   }
00175   else {
00176     return 0.0;
00177   }
00178 }
00179 
00186 double RollingIntervalCounter::getValueRate(double currentTime)
00187 {
00188   boost::recursive_mutex::scoped_lock sl(dataMutex_);
00189   shuffleBins(currentTime);
00190 
00191   double duration = getDuration();
00192   if (duration > 0.0) {
00193     return (currentTotal_ / duration);
00194   }
00195   else {
00196     return 0.0;
00197   }
00198 }
00199 
00205 double RollingIntervalCounter::getDuration(double currentTime)
00206 {
00207   boost::recursive_mutex::scoped_lock sl(dataMutex_);
00208   shuffleBins(currentTime);
00209 
00210   // include a lastTime_ for immediate mode??
00211 
00212   if (! hasValidResult()) {
00213     return 0.0;
00214   }
00215   else if (processedBinCount_ > binCount_) {
00216     return (((double) binCount_) * binSize_);
00217   }
00218   else {
00219     double val1 = ((double) processedBinCount_) * binSize_;
00220     double val2 = (workingBinId_ * binSize_) - startTime_;
00221     if (accumStyle_ == INCLUDE_SAMPLES_IMMEDIATELY) {
00222       val2 = currentTime - startTime_;
00223     }
00224     //std::cout << "Val1 = " << val1 << ", val2 = " << val2 << std::endl;
00225     if (val1 < val2) {return val1;}
00226     else {return val2;}
00227   }
00228 }
00229 
00233 void RollingIntervalCounter::dumpData(std::ostream& outStream)
00234 {
00235   outStream << "RollingIntervalCounter 0x" << std::hex
00236             << ((int) this) << std::dec << std::endl;
00237   char nowString[32];
00238   sprintf(nowString, "%16.4f", getCurrentTime());
00239   outStream << "  Now = " << nowString << std::endl;
00240   outStream << "  Window size = " << windowSize_ << std::endl;
00241   outStream << "  Bin size = " << binSize_ << std::endl;
00242   outStream << "  Processed bin count = " << processedBinCount_ << std::endl;
00243   outStream << "  Working index = "
00244             << ((int) (workingBinId_ % binCount_)) << std::endl;
00245   outStream << "  Working value = " << workingBinSum_ << std::endl;
00246   outStream << "  Current total = " << currentTotal_ << std::endl;
00247   outStream << "  Working sample count = "
00248             << workingBinSampleCount_ << std::endl;
00249   outStream << "  Current sample count = " << currentSampleCount_ << std::endl;
00250 
00251   char binString[200];
00252   for (int idx = 0; idx < binCount_; idx++) {
00253     sprintf(binString,
00254             "    bin %2d, value %10.2f, sampleCount %10d",
00255             idx, (*binContents_)[idx], (*binSamples_)[idx]);
00256     outStream << binString << std::endl;
00257   }
00258 }
00259 
00264 void RollingIntervalCounter::shuffleBins(double currentTime)
00265 {
00266   // don't start shuffling until the first sample has been added
00267   if (workingBinId_ < 0) {
00268     return;
00269   }
00270 
00271   // determine the current time bin
00272   long long currentTimeId = getBinId(currentTime);
00273   //std::cout << "RollingIntervalCounter 0x" << std::hex
00274   //          << ((int) this) << std::dec << std::endl;
00275   //std::cout << "Shuffle: " << currentTimeId
00276   //          << ", " << workingBinId_ << std::endl;
00277 
00278   // if we're still working on the current time bin, no shuffling is needed
00279   if (currentTimeId == workingBinId_) {
00280     return;
00281   }
00282 
00283   // clear out entries in the list whose time has passed
00284   long long firstIdToClear = workingBinId_;
00285   long long lastIdToClear = currentTimeId - 1;
00286   if (accumStyle_ == INCLUDE_SAMPLES_IMMEDIATELY) {
00287     ++firstIdToClear;
00288     ++lastIdToClear;
00289   }
00290   for (long long idx = firstIdToClear; idx <= lastIdToClear; ++idx) {
00291     int binIndex = (int) (idx % binCount_);
00292     currentTotal_ -= (*binContents_)[binIndex];
00293     (*binContents_)[binIndex] = 0.0;
00294     currentSampleCount_ -= (*binSamples_)[binIndex];
00295     (*binSamples_)[binIndex] = 0;
00296     ++processedBinCount_;
00297   }
00298 
00299   // move the working bin value into the list, if needed
00300   if (accumStyle_ == INCLUDE_SAMPLES_AFTER_BINNING) {
00301     int binIndex = (int) (workingBinId_ % binCount_);
00302     (*binContents_)[binIndex] = workingBinSum_;
00303     currentTotal_ += workingBinSum_;
00304     workingBinSum_ = 0.0;
00305     (*binSamples_)[binIndex] = workingBinSampleCount_;
00306     currentSampleCount_ += workingBinSampleCount_;
00307     workingBinSampleCount_ = 0;
00308   }
00309 
00310   // update the working bin ID to the current time bin
00311   workingBinId_ = currentTimeId;
00312 }
00313 
00317 long long RollingIntervalCounter::getBinId(double currentTime)
00318 {
00319   return (long long) (currentTime / binSize_);
00320 }

Generated on Tue Jun 9 17:34:57 2009 for CMSSW by  doxygen 1.5.4