00001
00005 #include "EventFilter/StorageManager/interface/RollingIntervalCounter.h"
00006
00007 using namespace stor;
00008
00012 RollingIntervalCounter::
00013 RollingIntervalCounter(double timeWindowSize, double timeBinSize,
00014 double validSubWindowSize, AccumulationStyle style):
00015 accumStyle_(style),
00016 startTime_(0.0),
00017 processedBinCount_(0),
00018 workingBinSum_(0.0),
00019 workingBinSampleCount_(0),
00020 workingBinId_(-1),
00021 currentTotal_(0.0),
00022 currentSampleCount_(0)
00023 {
00024
00025 if (timeWindowSize <= 0.0) {
00026 this->windowSize_ = 180.0;
00027 }
00028 else {
00029 this->windowSize_ = timeWindowSize;
00030 }
00031
00032
00033 if (timeBinSize <= 0.0) {
00034 this->binSize_ = this->windowSize_;
00035 }
00036 else {
00037 this->binSize_ = timeBinSize;
00038 }
00039
00040
00041 if (this->windowSize_ > 0.0 && this->binSize_ > 0.0) {
00042 this->binCount_ = (int) (0.5 + (this->windowSize_ / this->binSize_));
00043 if (this->binCount_ < 1) {
00044 this->binCount_ = 1;
00045 }
00046
00047
00048 this->windowSize_ = this->binCount_ * this->binSize_;
00049 }
00050 else {
00051 this->binCount_ = 1;
00052 }
00053
00054
00055 this->validBinCount_ = (int) (0.5 + (validSubWindowSize / this->binSize_));
00056 if (this->validBinCount_ <= 0) {
00057 this->validBinCount_ = 1;
00058 }
00059
00060
00061 if (this->accumStyle_ == INCLUDE_SAMPLES_IMMEDIATELY) {
00062 processedBinCount_ = 1;
00063 }
00064
00065
00066 this->binContents_.reset(new std::vector<double>);
00067 this->binSamples_.reset(new std::vector<unsigned int>);
00068 for (int idx = 0; idx < this->binCount_; ++idx) {
00069 this->binContents_->push_back(0.0);
00070 this->binSamples_->push_back(0);
00071 }
00072 }
00073
00077 void RollingIntervalCounter::addSample(double value, double currentTime)
00078 {
00079 boost::recursive_mutex::scoped_lock sl(dataMutex_);
00080
00081
00082 if (workingBinId_ < 0) {
00083 workingBinId_ = getBinId(currentTime);
00084 }
00085
00086
00087 if (startTime_ <= 0.0) {
00088 startTime_ = currentTime;
00089 }
00090
00091
00092 shuffleBins(currentTime);
00093
00094
00095
00096 if (accumStyle_ == INCLUDE_SAMPLES_AFTER_BINNING) {
00097 workingBinSum_ += value;
00098 ++workingBinSampleCount_;
00099 }
00100 else {
00101 int binIndex = (int) (workingBinId_ % binCount_);
00102 (*binContents_)[binIndex] += value;
00103 currentTotal_ += value;
00104 (*binSamples_)[binIndex] += 1;
00105 ++currentSampleCount_;
00106 }
00107 }
00108
00112 bool RollingIntervalCounter::hasValidResult(double currentTime)
00113 {
00114 boost::recursive_mutex::scoped_lock sl(dataMutex_);
00115 shuffleBins(currentTime);
00116
00117 return (processedBinCount_ >= validBinCount_);
00118 }
00119
00125 unsigned int RollingIntervalCounter::getSampleCount(double currentTime)
00126 {
00127 boost::recursive_mutex::scoped_lock sl(dataMutex_);
00128 shuffleBins(currentTime);
00129
00130 return currentSampleCount_;
00131 }
00132
00138 double RollingIntervalCounter::getSampleRate(double currentTime)
00139 {
00140 boost::recursive_mutex::scoped_lock sl(dataMutex_);
00141 shuffleBins(currentTime);
00142
00143 double duration = getDuration();
00144 if (duration > 0.0) {
00145 return ((double) getSampleCount()) / duration;
00146 }
00147 else {
00148 return 0.0;
00149 }
00150 }
00151
00155 double RollingIntervalCounter::getValueSum(double currentTime)
00156 {
00157 boost::recursive_mutex::scoped_lock sl(dataMutex_);
00158 shuffleBins(currentTime);
00159
00160 return currentTotal_;
00161 }
00162
00167 double RollingIntervalCounter::getValueAverage(double currentTime)
00168 {
00169 boost::recursive_mutex::scoped_lock sl(dataMutex_);
00170 shuffleBins(currentTime);
00171
00172 if (getSampleCount() > 0) {
00173 return (currentTotal_ / ((double) getSampleCount()));
00174 }
00175 else {
00176 return 0.0;
00177 }
00178 }
00179
00186 double RollingIntervalCounter::getValueRate(double currentTime)
00187 {
00188 boost::recursive_mutex::scoped_lock sl(dataMutex_);
00189 shuffleBins(currentTime);
00190
00191 double duration = getDuration();
00192 if (duration > 0.0) {
00193 return (currentTotal_ / duration);
00194 }
00195 else {
00196 return 0.0;
00197 }
00198 }
00199
00205 double RollingIntervalCounter::getDuration(double currentTime)
00206 {
00207 boost::recursive_mutex::scoped_lock sl(dataMutex_);
00208 shuffleBins(currentTime);
00209
00210
00211
00212 if (! hasValidResult()) {
00213 return 0.0;
00214 }
00215 else if (processedBinCount_ > binCount_) {
00216 return (((double) binCount_) * binSize_);
00217 }
00218 else {
00219 double val1 = ((double) processedBinCount_) * binSize_;
00220 double val2 = (workingBinId_ * binSize_) - startTime_;
00221 if (accumStyle_ == INCLUDE_SAMPLES_IMMEDIATELY) {
00222 val2 = currentTime - startTime_;
00223 }
00224
00225 if (val1 < val2) {return val1;}
00226 else {return val2;}
00227 }
00228 }
00229
00233 void RollingIntervalCounter::dumpData(std::ostream& outStream)
00234 {
00235 outStream << "RollingIntervalCounter 0x" << std::hex
00236 << ((int) this) << std::dec << std::endl;
00237 char nowString[32];
00238 sprintf(nowString, "%16.4f", getCurrentTime());
00239 outStream << " Now = " << nowString << std::endl;
00240 outStream << " Window size = " << windowSize_ << std::endl;
00241 outStream << " Bin size = " << binSize_ << std::endl;
00242 outStream << " Processed bin count = " << processedBinCount_ << std::endl;
00243 outStream << " Working index = "
00244 << ((int) (workingBinId_ % binCount_)) << std::endl;
00245 outStream << " Working value = " << workingBinSum_ << std::endl;
00246 outStream << " Current total = " << currentTotal_ << std::endl;
00247 outStream << " Working sample count = "
00248 << workingBinSampleCount_ << std::endl;
00249 outStream << " Current sample count = " << currentSampleCount_ << std::endl;
00250
00251 char binString[200];
00252 for (int idx = 0; idx < binCount_; idx++) {
00253 sprintf(binString,
00254 " bin %2d, value %10.2f, sampleCount %10d",
00255 idx, (*binContents_)[idx], (*binSamples_)[idx]);
00256 outStream << binString << std::endl;
00257 }
00258 }
00259
00264 void RollingIntervalCounter::shuffleBins(double currentTime)
00265 {
00266
00267 if (workingBinId_ < 0) {
00268 return;
00269 }
00270
00271
00272 long long currentTimeId = getBinId(currentTime);
00273
00274
00275
00276
00277
00278
00279 if (currentTimeId == workingBinId_) {
00280 return;
00281 }
00282
00283
00284 long long firstIdToClear = workingBinId_;
00285 long long lastIdToClear = currentTimeId - 1;
00286 if (accumStyle_ == INCLUDE_SAMPLES_IMMEDIATELY) {
00287 ++firstIdToClear;
00288 ++lastIdToClear;
00289 }
00290 for (long long idx = firstIdToClear; idx <= lastIdToClear; ++idx) {
00291 int binIndex = (int) (idx % binCount_);
00292 currentTotal_ -= (*binContents_)[binIndex];
00293 (*binContents_)[binIndex] = 0.0;
00294 currentSampleCount_ -= (*binSamples_)[binIndex];
00295 (*binSamples_)[binIndex] = 0;
00296 ++processedBinCount_;
00297 }
00298
00299
00300 if (accumStyle_ == INCLUDE_SAMPLES_AFTER_BINNING) {
00301 int binIndex = (int) (workingBinId_ % binCount_);
00302 (*binContents_)[binIndex] = workingBinSum_;
00303 currentTotal_ += workingBinSum_;
00304 workingBinSum_ = 0.0;
00305 (*binSamples_)[binIndex] = workingBinSampleCount_;
00306 currentSampleCount_ += workingBinSampleCount_;
00307 workingBinSampleCount_ = 0;
00308 }
00309
00310
00311 workingBinId_ = currentTimeId;
00312 }
00313
00317 long long RollingIntervalCounter::getBinId(double currentTime)
00318 {
00319 return (long long) (currentTime / binSize_);
00320 }