00001
00005 #include "EventFilter/StorageManager/interface/RateLimiter.h"
00006 #include "EventFilter/StorageManager/interface/BaseCounter.h"
00007
00008 using namespace stor;
00009
00015 RateLimiter::RateLimiter(double maxEventRate, double maxDataRate)
00016 {
00017 this->maxEventRate_ = maxEventRate;
00018 this->maxDataRate_ = maxDataRate;
00019
00020 generator_.reset(new boost::uniform_01<boost::mt19937>(baseGenerator_));
00021 }
00022
00026 void RateLimiter::addConsumer(uint32 consumerId)
00027 {
00028 boost::mutex::scoped_lock sl(dataMutex_);
00029
00030
00031 consumerList_.push_back(consumerId);
00032
00033
00034 boost::shared_ptr<RollingIntervalCounter>
00035 analyzer(new RollingIntervalCounter(180.0, 5.0, 10.0));
00036 dataRateTable_[consumerId] = analyzer;
00037 }
00038
00042 void RateLimiter::removeConsumer(uint32 consumerId)
00043 {
00044 boost::mutex::scoped_lock sl(dataMutex_);
00045
00046
00047 dataRateTable_.erase(consumerId);
00048
00049
00050 std::vector<uint32>::iterator vecIter =
00051 std::find(consumerList_.begin(), consumerList_.end(), consumerId);
00052 consumerList_.erase(vecIter);
00053 }
00054
00062
00063 std::vector<uint32> RateLimiter::
00064 getAllowedConsumersFromList(double dataSize,
00065 const std::vector<uint32>& candidateList)
00066 {
00067 boost::mutex::scoped_lock sl(dataMutex_);
00068
00069
00070
00071 boost::shared_ptr<RollingIntervalCounter> rateAnalyzer;
00072 std::vector<uint32> allowedList;
00073 double now = BaseCounter::getCurrentTime();
00074
00075
00076 if (candidateList.size() == 0) {
00077 return allowedList;
00078 }
00079
00080
00081 for (uint32 idx = 0; idx < candidateList.size(); ++idx) {
00082 uint32 consumerId = candidateList[idx];
00083 rateAnalyzer = dataRateTable_[consumerId];
00084 if (rateAnalyzer.get() != 0) {
00085 rateAnalyzer->addSample(dataSize, now);
00086 }
00087 }
00088
00089
00090
00091 uint32 consumerCount = consumerList_.size();
00092 std::vector<double> rawEventRates;
00093 std::vector<double> rawDataRates;
00094 std::vector<double> eventPrescales;
00095 std::vector<double> dataPrescales;
00096
00097
00098 for (uint32 idx = 0; idx < consumerCount; ++idx) {
00099 uint32 consumerId = consumerList_[idx];
00100 rateAnalyzer = dataRateTable_[consumerId];
00101
00102 double eventRate = rateAnalyzer->getSampleRate(now);
00103
00104 if (eventRate > 0.0) {
00105
00106 rawEventRates.push_back(eventRate);
00107 }
00108
00109 else if (! rateAnalyzer->hasValidResult()) {
00110
00111
00112 rawEventRates.push_back(10.0 * maxEventRate_ / consumerCount);
00113 }
00114 else {
00115
00116
00117
00118 rawEventRates.push_back(0.1 * maxEventRate_ / consumerCount);
00119 }
00120
00121 double dataRate = rateAnalyzer->getValueRate(now);
00122 if (dataRate > 0.0) {
00123
00124 rawDataRates.push_back(dataRate);
00125 }
00126
00127 else if (! rateAnalyzer->hasValidResult()) {
00128
00129
00130 rawDataRates.push_back(10.0 * maxDataRate_ / consumerCount);
00131 }
00132 else {
00133
00134
00135
00136 rawDataRates.push_back(0.1 * maxDataRate_ / consumerCount);
00137 }
00138
00139
00140
00141
00142
00143
00144
00145
00146 }
00147
00148
00149 determineTargetPrescales(maxEventRate_, rawEventRates, eventPrescales);
00150 determineTargetPrescales(maxDataRate_, rawDataRates, dataPrescales);
00151
00152
00153 std::vector<double> overallPrescales;
00154 std::vector<double> minimumPrescales;
00155 int eventLimitCount = 0;
00156 int dataLimitCount = 0;
00157 for (uint32 idx = 0; idx < consumerCount; ++idx) {
00158 if (eventPrescales[idx] > dataPrescales[idx]) {
00159 overallPrescales.push_back(eventPrescales[idx]);
00160 minimumPrescales.push_back(dataPrescales[idx]);
00161 ++eventLimitCount;
00162 }
00163 else if (dataPrescales[idx] > eventPrescales[idx]) {
00164 overallPrescales.push_back(dataPrescales[idx]);
00165 minimumPrescales.push_back(eventPrescales[idx]);
00166 ++dataLimitCount;
00167 }
00168 else {
00169 overallPrescales.push_back(eventPrescales[idx]);
00170 minimumPrescales.push_back(dataPrescales[idx]);
00171 }
00172 }
00173
00174
00175
00176
00177
00178
00179
00180
00181
00182
00183
00184
00185
00186 if (eventLimitCount > 0 && dataLimitCount > 0) {
00187 loosenPrescales(rawEventRates, maxEventRate_,
00188 rawDataRates, maxDataRate_,
00189 overallPrescales, minimumPrescales);
00190
00191
00192
00193
00194
00195
00196
00197
00198
00199 }
00200
00201
00202
00203 for (uint32 idx = 0; idx < candidateList.size(); ++idx) {
00204 uint32 consumerId = candidateList[idx];
00205 for (uint32 jdx = 0; jdx < consumerCount; ++jdx) {
00206 if (consumerList_[jdx] == consumerId) {
00207 double psValue = overallPrescales[jdx];
00208 if (psValue <= 1.0) {
00209 allowedList.push_back(consumerId);
00210 }
00211 else {
00212 double instantRatio = 1.0 / psValue;
00213 double randValue = (*generator_)();
00214 if (randValue < instantRatio) {
00215 allowedList.push_back(consumerId);
00216 }
00217 }
00218 break;
00219 }
00220 }
00221 }
00222
00223
00224 return allowedList;
00225 }
00226
00231 void RateLimiter::dumpData(std::ostream& outStream)
00232 {
00233 boost::shared_ptr<RollingIntervalCounter> rateAnalyzer;
00234 char nowString[32];
00235
00236 outStream << "RateLimiter 0x" << std::hex
00237 << ((int) this) << std::dec << std::endl;
00238 sprintf(nowString, "%16.4f", BaseCounter::getCurrentTime());
00239 outStream << " Now = " << nowString << std::endl;
00240 outStream << " Consumers:" << std::endl;
00241 for (uint32 idx = 0; idx < consumerList_.size(); ++idx) {
00242 uint32 consumerId = consumerList_[idx];
00243 rateAnalyzer = dataRateTable_[consumerId];
00244 outStream << " ID = " << consumerId
00245 << ", event rate = " << rateAnalyzer->getSampleRate()
00246 << ", data rate = " << rateAnalyzer->getValueRate()
00247 << std::endl;
00248 }
00249 }
00250
00255 double RateLimiter::calcRate(std::vector<double> rates,
00256 std::vector<double> prescales)
00257 {
00258 double sum = 0.0;
00259 for (uint32 idx = 0; idx < rates.size(); ++idx) {
00260 sum += rates[idx] / prescales[idx];
00261 }
00262 return sum;
00263 }
00264
00270 void RateLimiter::determineTargetPrescales(double fullRate,
00271 const std::vector<double>& rawRates,
00272 std::vector<double>& targetPrescales)
00273 {
00274 uint32 rateCount = rawRates.size();
00275
00276
00277 std::vector<double> targetRates;
00278 determineTargetRates(fullRate, rawRates, targetRates);
00279 assert(targetRates.size() == rateCount);
00280
00281
00282 targetPrescales.clear();
00283 for (uint32 idx = 0; idx < rateCount; ++idx) {
00284 if (targetRates[idx] > 0.0) {
00285 targetPrescales.push_back(rawRates[idx] / targetRates[idx]);
00286 }
00287 else {
00288 targetPrescales.push_back(1.0);
00289 }
00290 }
00291 }
00292
00298 void RateLimiter::determineTargetRates(double fullRate,
00299 const std::vector<double>& rawRates,
00300 std::vector<double>& targetRates)
00301 {
00302 uint32 rateCount = rawRates.size();
00303 targetRates.clear();
00304
00305
00306 if (rateCount == 1) {
00307 targetRates.push_back(std::min(fullRate, rawRates[0]));
00308 return;
00309 }
00310
00311
00312 double sum = 0.0;
00313 for (uint32 idx = 0; idx < rateCount; ++idx) {
00314 sum += rawRates[idx];
00315 }
00316 if (sum <= fullRate) {
00317 targetRates.resize(rateCount);
00318 std::copy(rawRates.begin(), rawRates.end(), targetRates.begin());
00319 return;
00320 }
00321
00322
00323
00324 for (uint32 idx = 0; idx < rateCount; ++idx) {
00325 targetRates.push_back(-1.0);
00326 }
00327
00328
00329 fairShareAlgo(fullRate, rawRates, targetRates);
00330 }
00331
00339 void RateLimiter::fairShareAlgo(double fullRate,
00340 const std::vector<double>& rawRates,
00341 std::vector<double>& targetRates)
00342 {
00343 uint32 rateCount = rawRates.size();
00344
00345
00346 int targetCount = 0;
00347 for (uint32 idx = 0; idx < rateCount; ++idx) {
00348 if (targetRates[idx] < 0.0) {
00349 ++targetCount;
00350 }
00351 }
00352
00353
00354 if (targetCount == 0) {return;}
00355
00356
00357 double fairShareRate = fullRate / (double) targetCount;
00358
00359
00360 double accomodatedRate = 0.0;
00361 int graceCount = 0;
00362 for (uint32 idx = 0; idx < rateCount; ++idx) {
00363 if (targetRates[idx] < 0.0 && rawRates[idx] < fairShareRate) {
00364 targetRates[idx] = rawRates[idx];
00365 accomodatedRate += rawRates[idx];
00366 ++graceCount;
00367 }
00368 }
00369
00370
00371 if (graceCount > 0) {
00372 fairShareAlgo((fullRate - accomodatedRate), rawRates, targetRates);
00373 }
00374
00375
00376 else {
00377 for (uint32 idx = 0; idx < rateCount; ++idx) {
00378 if (targetRates[idx] < 0.0) {
00379 targetRates[idx] = fairShareRate;
00380 }
00381 }
00382 }
00383 }
00384
00390 void RateLimiter::loosenPrescales(const std::vector<double>& rawRates1,
00391 double fullRate1,
00392 const std::vector<double>& rawRates2,
00393 double fullRate2,
00394 std::vector<double>& prescales,
00395 const std::vector<double>& minPrescales)
00396 {
00397
00398 double fom;
00399 double baseFOM = calcFigureOfMerit(rawRates1, fullRate1,
00400 rawRates2, fullRate2,
00401 prescales);
00402
00403
00404
00405 std::vector<double> fomImpact;
00406 fomImpact.resize(prescales.size());
00407 for (uint32 idx = 0; idx < prescales.size(); ++idx) {
00408 if (prescales[idx] > 1.0) {
00409 prescales[idx] *= 0.99;
00410 fom = calcFigureOfMerit(rawRates1, fullRate1,
00411 rawRates2, fullRate2,
00412 prescales);
00413 if ((baseFOM - fom) >= 0.0) {
00414 fomImpact[idx] = baseFOM - fom;
00415 }
00416 else {
00417 fomImpact[idx] = fom - baseFOM;
00418 }
00419 prescales[idx] /= 0.99;
00420 }
00421 else {
00422 fomImpact[idx] = 0.0;
00423 }
00424 }
00425
00426
00427
00428
00429
00430
00431
00432
00433
00434
00435 std::vector<int> impactOrder;
00436 for (uint32 idx = 0; idx < prescales.size(); ++idx) {
00437 double maxValue = -1.0;
00438 int maxIndex = -1;
00439 for (uint32 jdx = 0; jdx < prescales.size(); ++jdx) {
00440 if (fomImpact[jdx] >= 0.0 && fomImpact[jdx] > maxValue) {
00441 maxValue = fomImpact[jdx];
00442 maxIndex = jdx;
00443 }
00444 }
00445 fomImpact[maxIndex] = -1.0;
00446 impactOrder.push_back(maxIndex);
00447 }
00448
00449
00450
00451
00452
00453
00454
00455
00456
00457
00458
00459
00460 fom = baseFOM;
00461 bool allDone = false;
00462 int watchdogCounter = -1;
00463 while (! allDone && ++watchdogCounter < 10) {
00464
00465
00466 allDone = true;
00467 for (uint32 idx = 0; idx < prescales.size(); ++idx) {
00468
00469
00470 uint32 psIdx = impactOrder[idx];
00471
00472
00473 if (prescales[psIdx] > 1.0) {
00474
00475
00476
00477
00478 if (loosenOnePrescale(rawRates1, fullRate1, rawRates2, fullRate2,
00479 prescales, psIdx, minPrescales[psIdx])) {
00480 allDone = false;
00481 }
00482 }
00483 }
00484 }
00485 }
00486
00492 bool RateLimiter::loosenOnePrescale(const std::vector<double>& rawRates1,
00493 double fullRate1,
00494 const std::vector<double>& rawRates2,
00495 double fullRate2,
00496 std::vector<double>& prescales,
00497 uint32 psIndex, double lowBound)
00498 {
00499
00500 bool resetToOneDone = false;
00501 double baseFOM = calcFigureOfMerit(rawRates1, fullRate1,
00502 rawRates2, fullRate2,
00503 prescales);
00504 double fom = baseFOM;
00505 double lastFOM = fom * 2;
00506 int loopCount = 0;
00507 if (lowBound < 1.0) {lowBound = 1.0;}
00508 if (prescales[psIndex] <= 1.0) {return false;}
00509
00510
00511
00512
00513 while (fom < lastFOM && prescales[psIndex] >= lowBound &&
00514 loopCount < 1000) {
00515 ++loopCount;
00516 lastFOM = fom;
00517
00518
00519 prescales[psIndex] *= 0.99;
00520 if (! resetToOneDone && prescales[psIndex] < 1.0) {
00521 resetToOneDone = true;
00522 prescales[psIndex] = 1.0;
00523 }
00524 fom = calcFigureOfMerit(rawRates1, fullRate1,
00525 rawRates2, fullRate2,
00526 prescales);
00527 }
00528
00529
00530
00531 prescales[psIndex] /= 0.99;
00532
00533
00534
00535
00536
00537
00538
00539
00540
00541
00542
00543
00544
00545
00546
00547 return (loopCount > 1);
00548 }
00549
00555 double RateLimiter::calcFigureOfMerit(const std::vector<double>& rawRates1,
00556 double fullRate1,
00557 const std::vector<double>& rawRates2,
00558 double fullRate2,
00559 const std::vector<double>& prescales)
00560 {
00561 double sum1 = 0.0;
00562 for (uint32 idx = 0; idx < rawRates1.size(); ++idx) {
00563 sum1 += (rawRates1[idx] / prescales[idx]);
00564 }
00565 double delta1 = sum1 - fullRate1;
00566 double ratio1 = delta1 / fullRate1;
00567
00568 if (ratio1 > 0.0) {ratio1 *= 5.0;}
00569
00570 double sum2 = 0.0;
00571 for (uint32 idx = 0; idx < rawRates2.size(); ++idx) {
00572 sum2 += (rawRates2[idx] / prescales[idx]);
00573 }
00574 double delta2 = sum2 - fullRate2;
00575 double ratio2 = delta2 / fullRate2;
00576
00577 if (ratio2 > 0.0) {ratio2 *= 5.0;}
00578
00579 return ((ratio1 * ratio1) + (ratio2 * ratio2));
00580 }