CMS 3D CMS Logo

RateLimiter.cc

Go to the documentation of this file.
00001 
00005 #include "EventFilter/StorageManager/interface/RateLimiter.h"
00006 #include "EventFilter/StorageManager/interface/BaseCounter.h"
00007 
00008 using namespace stor;
00009 
00015 RateLimiter::RateLimiter(double maxEventRate, double maxDataRate)
00016 {
00017   this->maxEventRate_ = maxEventRate;
00018   this->maxDataRate_ = maxDataRate;
00019 
00020   generator_.reset(new boost::uniform_01<boost::mt19937>(baseGenerator_));
00021 }
00022 
00026 void RateLimiter::addConsumer(uint32 consumerId)
00027 {
00028   boost::mutex::scoped_lock sl(dataMutex_);
00029 
00030   // add the consumer to our internal list
00031   consumerList_.push_back(consumerId);
00032 
00033   // create a rate analyzer for this consumer
00034   boost::shared_ptr<RollingIntervalCounter>
00035     analyzer(new RollingIntervalCounter(180.0, 5.0, 10.0));
00036   dataRateTable_[consumerId] = analyzer;
00037 }
00038 
00042 void RateLimiter::removeConsumer(uint32 consumerId)
00043 {
00044   boost::mutex::scoped_lock sl(dataMutex_);
00045 
00046   // discard the rate analyzer for the consumer
00047   dataRateTable_.erase(consumerId);
00048 
00049   // remove the consumer from our internal list
00050   std::vector<uint32>::iterator vecIter =
00051     std::find(consumerList_.begin(), consumerList_.end(), consumerId);
00052   consumerList_.erase(vecIter);
00053 }
00054 
00062 //uint32 debugCounter = 0;
00063 std::vector<uint32> RateLimiter::
00064 getAllowedConsumersFromList(double dataSize,
00065                             const std::vector<uint32>& candidateList)
00066 {
00067   boost::mutex::scoped_lock sl(dataMutex_);
00068   //++debugCounter;
00069 
00070   // initialization
00071   boost::shared_ptr<RollingIntervalCounter> rateAnalyzer;
00072   std::vector<uint32> allowedList;
00073   double now = BaseCounter::getCurrentTime();
00074 
00075   // nothing to do if there are no candidate consumers
00076   if (candidateList.size() == 0) {
00077     return allowedList;
00078   }
00079 
00080   // update the rate calculations for each of the candidate consumers
00081   for (uint32 idx = 0; idx < candidateList.size(); ++idx) {
00082     uint32 consumerId = candidateList[idx];
00083     rateAnalyzer = dataRateTable_[consumerId];
00084     if (rateAnalyzer.get() != 0) {
00085       rateAnalyzer->addSample(dataSize, now);
00086     }
00087   }
00088 
00089   // delare the lists of raw rates and target prescales that
00090   // will be used while running the fair-share algorithm
00091   uint32 consumerCount = consumerList_.size();
00092   std::vector<double> rawEventRates;
00093   std::vector<double> rawDataRates;
00094   std::vector<double> eventPrescales;
00095   std::vector<double> dataPrescales;
00096 
00097   // fetch the current event and data rates for all consumers
00098   for (uint32 idx = 0; idx < consumerCount; ++idx) {
00099     uint32 consumerId = consumerList_[idx];
00100     rateAnalyzer = dataRateTable_[consumerId];
00101 
00102     double eventRate = rateAnalyzer->getSampleRate(now);
00103     //std::cout << "Event rate = " << eventRate << std::endl;
00104     if (eventRate > 0.0) {
00105       // event rate is reasonable, so use it
00106       rawEventRates.push_back(eventRate);
00107     }
00108     //else if (eventRate < 0.0) {
00109     else if (! rateAnalyzer->hasValidResult()) {
00110       // event rate has not yet been determined, use a suitable large value
00111       // as a conservative guess
00112       rawEventRates.push_back(10.0 * maxEventRate_ / consumerCount);
00113     }
00114     else {
00115       // the event rate is so low that it appears to have fallen to zero
00116       // (we use a suitable small value here to avoid having zero in the
00117       // rawEventRates list)
00118       rawEventRates.push_back(0.1 * maxEventRate_ / consumerCount);
00119     }
00120 
00121     double dataRate = rateAnalyzer->getValueRate(now);
00122     if (dataRate > 0.0) {
00123       // data rate is reasonable, so use it
00124       rawDataRates.push_back(dataRate);
00125     }
00126     //else if (dataRate < 0.0) {
00127     else if (! rateAnalyzer->hasValidResult()) {
00128       // data rate has not yet been determined, use a suitable large value
00129       // as a conservative guess
00130       rawDataRates.push_back(10.0 * maxDataRate_ / consumerCount);
00131     }
00132     else {
00133       // the data rate is so low that it appears to have fallen to zero
00134       // (we use a suitable small value here to avoid having zero in the
00135       // rawDataRates list)
00136       rawDataRates.push_back(0.1 * maxDataRate_ / consumerCount);
00137     }
00138 
00139     //if ((debugCounter % 500) == 0) {
00140     //  std::cout << "Consumer ID " << consumerId
00141     //            << ", event rate = " << rawEventRates.back()
00142     //            << " (" << eventRate << ")"
00143     //            << ", data rate = " << rawDataRates.back()
00144     //            << " (" << dataRate << ")" << std::endl;
00145     //}
00146   }
00147 
00148   // run the fair-share algorithm from both event and data points of view
00149   determineTargetPrescales(maxEventRate_, rawEventRates, eventPrescales);
00150   determineTargetPrescales(maxDataRate_, rawDataRates, dataPrescales);
00151 
00152   // pick the appropriate prescale for each consumer
00153   std::vector<double> overallPrescales;
00154   std::vector<double> minimumPrescales;
00155   int eventLimitCount = 0;
00156   int dataLimitCount = 0;
00157   for (uint32 idx = 0; idx < consumerCount; ++idx) {
00158     if (eventPrescales[idx] > dataPrescales[idx]) {
00159       overallPrescales.push_back(eventPrescales[idx]);
00160       minimumPrescales.push_back(dataPrescales[idx]);
00161       ++eventLimitCount;
00162     }
00163     else if (dataPrescales[idx] > eventPrescales[idx]) {
00164       overallPrescales.push_back(dataPrescales[idx]);
00165       minimumPrescales.push_back(eventPrescales[idx]);
00166       ++dataLimitCount;
00167     }
00168     else {
00169       overallPrescales.push_back(eventPrescales[idx]);
00170       minimumPrescales.push_back(dataPrescales[idx]);
00171     }
00172   }
00173 
00174   //if ((debugCounter % 500) == 0) {
00175   //  for (uint32 idx = 0; idx < consumerCount; ++idx) {
00176   //    uint32 consumerId = consumerList_[idx];
00177   //    double psValue = overallPrescales[idx];
00178   //    std::cout << "Consumer ID = " << consumerId
00179   //              << ", prescale = " << psValue
00180   //              << ", (min = " << minimumPrescales[idx] << ")" << std::endl;
00181   //  }
00182   //}
00183 
00184   // if prescales have been imposed based on both event rate and data rates,
00185   // then it is likely that we can loosen the prescales a bit
00186   if (eventLimitCount > 0 && dataLimitCount > 0) {
00187     loosenPrescales(rawEventRates, maxEventRate_,
00188                     rawDataRates, maxDataRate_,
00189                     overallPrescales, minimumPrescales);
00190 
00191     //if ((debugCounter % 500) == 0) {
00192     //  for (uint32 idx = 0; idx < consumerCount; ++idx) {
00193     //    uint32 consumerId = consumerList_[idx];
00194     //    double psValue = overallPrescales[idx];
00195     //    std::cout << "Consumer ID = " << consumerId
00196     //              << ", loosened prescale = " << psValue << std::endl;
00197     //  }
00198     //}
00199   }
00200 
00201   // loop over the candidate consumers and test if we're willing to let
00202   // them have this event
00203   for (uint32 idx = 0; idx < candidateList.size(); ++idx) {
00204     uint32 consumerId = candidateList[idx];
00205     for (uint32 jdx = 0; jdx < consumerCount; ++jdx) {
00206       if (consumerList_[jdx] == consumerId) {
00207         double psValue = overallPrescales[jdx];
00208         if (psValue <= 1.0) {
00209           allowedList.push_back(consumerId);
00210         }
00211         else {
00212           double instantRatio = 1.0 / psValue;
00213           double randValue = (*generator_)();
00214           if (randValue < instantRatio) {
00215             allowedList.push_back(consumerId);
00216           }
00217         }
00218         break;
00219       }
00220     }
00221   }
00222 
00223   // return the list of allowed consumers
00224   return allowedList;
00225 }
00226 
00231 void RateLimiter::dumpData(std::ostream& outStream)
00232 {
00233   boost::shared_ptr<RollingIntervalCounter> rateAnalyzer;
00234   char nowString[32];
00235 
00236   outStream << "RateLimiter 0x" << std::hex
00237             << ((int) this) << std::dec << std::endl;
00238   sprintf(nowString, "%16.4f", BaseCounter::getCurrentTime());
00239   outStream << "  Now = " << nowString << std::endl;
00240   outStream << "  Consumers:" << std::endl;
00241   for (uint32 idx = 0; idx < consumerList_.size(); ++idx) {
00242     uint32 consumerId = consumerList_[idx];
00243     rateAnalyzer = dataRateTable_[consumerId];
00244     outStream << "    ID = " << consumerId
00245               << ", event rate = " << rateAnalyzer->getSampleRate()
00246               << ", data rate = " << rateAnalyzer->getValueRate()
00247               << std::endl;
00248   }
00249 }
00250 
00255 double RateLimiter::calcRate(std::vector<double> rates,
00256                              std::vector<double> prescales)
00257 {
00258   double sum = 0.0;
00259   for (uint32 idx = 0; idx < rates.size(); ++idx) {
00260     sum += rates[idx] / prescales[idx];
00261   }
00262   return sum;
00263 }
00264 
00270 void RateLimiter::determineTargetPrescales(double fullRate,
00271                                            const std::vector<double>& rawRates,
00272                                            std::vector<double>& targetPrescales)
00273 {
00274   uint32 rateCount = rawRates.size();
00275 
00276   // determine the target rates
00277   std::vector<double> targetRates;
00278   determineTargetRates(fullRate, rawRates, targetRates);
00279   assert(targetRates.size() == rateCount);
00280 
00281   // calculate the target prescales from the target rates
00282   targetPrescales.clear();
00283   for (uint32 idx = 0; idx < rateCount; ++idx) {
00284     if (targetRates[idx] > 0.0) {
00285       targetPrescales.push_back(rawRates[idx] / targetRates[idx]);
00286     }
00287     else {
00288       targetPrescales.push_back(1.0);
00289     }
00290   }
00291 }
00292 
00298 void RateLimiter::determineTargetRates(double fullRate,
00299                                        const std::vector<double>& rawRates,
00300                                        std::vector<double>& targetRates)
00301 {
00302   uint32 rateCount = rawRates.size();
00303   targetRates.clear();
00304 
00305   // easy check #1 - there is only one rate
00306   if (rateCount == 1) {
00307     targetRates.push_back(std::min(fullRate, rawRates[0]));
00308     return;
00309   }
00310 
00311   // easy check #2 - the sum of all rates fits within the full rate
00312   double sum = 0.0;
00313   for (uint32 idx = 0; idx < rateCount; ++idx) {
00314     sum += rawRates[idx];
00315   }
00316   if (sum <= fullRate) {
00317     targetRates.resize(rateCount);
00318     std::copy(rawRates.begin(), rawRates.end(), targetRates.begin());
00319     return;
00320   }
00321 
00322   // initialize the targetRates to negative values in preparation for
00323   // using the fairShareAlgo method
00324   for (uint32 idx = 0; idx < rateCount; ++idx) {
00325     targetRates.push_back(-1.0);
00326   }
00327 
00328   // run the fair share algorithm
00329   fairShareAlgo(fullRate, rawRates, targetRates);
00330 }
00331 
00339 void RateLimiter::fairShareAlgo(double fullRate,
00340                                 const std::vector<double>& rawRates,
00341                                 std::vector<double>& targetRates)
00342 {
00343   uint32 rateCount = rawRates.size();
00344 
00345   // count the number of target values that still need to be determined
00346   int targetCount = 0;
00347   for (uint32 idx = 0; idx < rateCount; ++idx) {
00348     if (targetRates[idx] < 0.0) {
00349       ++targetCount;
00350     }
00351   }
00352 
00353   // sanity check
00354   if (targetCount == 0) {return;}
00355 
00356   // calculate the maximum fair share for the remaining targets
00357   double fairShareRate = fullRate / (double) targetCount;
00358 
00359   // check for input rates that are less than the fair share
00360   double accomodatedRate = 0.0;
00361   int graceCount = 0;
00362   for (uint32 idx = 0; idx < rateCount; ++idx) {
00363     if (targetRates[idx] < 0.0 && rawRates[idx] < fairShareRate) {
00364       targetRates[idx] = rawRates[idx];
00365       accomodatedRate += rawRates[idx];
00366       ++graceCount;
00367     }
00368   }
00369 
00370   // if we found rates that could be fully accomodated, iterate
00371   if (graceCount > 0) {
00372     fairShareAlgo((fullRate - accomodatedRate), rawRates, targetRates);
00373   }
00374 
00375   // otherwise, fill in the remaining target rates will the fair share amount
00376   else {
00377     for (uint32 idx = 0; idx < rateCount; ++idx) {
00378       if (targetRates[idx] < 0.0) {
00379         targetRates[idx] = fairShareRate;
00380       }
00381     }
00382   }
00383 }
00384 
00390 void RateLimiter::loosenPrescales(const std::vector<double>& rawRates1,
00391                                   double fullRate1,
00392                                   const std::vector<double>& rawRates2,
00393                                   double fullRate2,
00394                                   std::vector<double>& prescales,
00395                                   const std::vector<double>& minPrescales)
00396 {
00397   // initialization
00398   double fom;
00399   double baseFOM = calcFigureOfMerit(rawRates1, fullRate1,
00400                                      rawRates2, fullRate2,
00401                                      prescales);
00402 
00403   // calculate how much a predefined change in each prescale makes to
00404   // the figure-of-merit
00405   std::vector<double> fomImpact;
00406   fomImpact.resize(prescales.size());
00407   for (uint32 idx = 0; idx < prescales.size(); ++idx) {
00408     if (prescales[idx] > 1.0) {
00409       prescales[idx] *= 0.99;
00410       fom = calcFigureOfMerit(rawRates1, fullRate1,
00411                               rawRates2, fullRate2,
00412                               prescales);
00413       if ((baseFOM - fom) >= 0.0) {
00414         fomImpact[idx] = baseFOM - fom;
00415       }
00416       else {
00417         fomImpact[idx] = fom - baseFOM;
00418       }
00419       prescales[idx] /= 0.99;
00420     }
00421     else {
00422       fomImpact[idx] = 0.0;
00423     }
00424   }
00425   //if ((debugCounter % 500) == 0) {
00426   //  printf("Impacts: ");
00427   //  for (uint32 pdx = 0; pdx < prescales.size(); pdx++) {
00428   //    printf("%8.4f", fomImpact[pdx]);
00429   //  }
00430   //  printf("\n");
00431   //}
00432 
00433   // determine a prescale ordering based on how much each prescale can impact
00434   // the figure of merit (ordered from most impact to least impact)
00435   std::vector<int> impactOrder;
00436   for (uint32 idx = 0; idx < prescales.size(); ++idx) {
00437     double maxValue = -1.0;
00438     int maxIndex = -1;
00439     for (uint32 jdx = 0; jdx < prescales.size(); ++jdx) {
00440       if (fomImpact[jdx] >= 0.0 && fomImpact[jdx] > maxValue) {
00441         maxValue = fomImpact[jdx];
00442         maxIndex = jdx;
00443       }
00444     }
00445     fomImpact[maxIndex] = -1.0;
00446     impactOrder.push_back(maxIndex);
00447   }
00448   //if ((debugCounter % 500) == 0) {
00449   //  printf("Impact order: ");
00450   //  for (uint32 pdx = 0; pdx < prescales.size(); pdx++) {
00451   //    printf(" %d", impactOrder[pdx]);
00452   //  }
00453   //  printf("\n");
00454   //}
00455 
00456   // loop through the list of prescales (in order of decreasing impact)
00457   // and vary each prescale until the figure-of-merit stops improving.  
00458   // Once we're reached an optimum value for all prescales, we're done.
00459   // (A watchdog counter is included to prevent infinite loops.)
00460   fom = baseFOM;
00461   bool allDone = false;
00462   int watchdogCounter = -1;
00463   while (! allDone && ++watchdogCounter < 10) {
00464 
00465     // assume that this will be the last pass through the set of prescales
00466     allDone = true;
00467     for (uint32 idx = 0; idx < prescales.size(); ++idx) {
00468 
00469       // fetch the index of the next prescale to consider
00470       uint32 psIdx = impactOrder[idx];
00471 
00472       // skip over prescale values of 1.0 (how could we improve on that?)
00473       if (prescales[psIdx] > 1.0) {
00474 
00475         // if changing this prescale made a difference, we should keep
00476         // going to see if other prescale changes can improve the FOM
00477         // even more
00478         if (loosenOnePrescale(rawRates1, fullRate1, rawRates2, fullRate2,
00479                               prescales, psIdx, minPrescales[psIdx])) {
00480           allDone = false;
00481         }
00482       }
00483     }
00484   }
00485 }
00486 
00492 bool RateLimiter::loosenOnePrescale(const std::vector<double>& rawRates1,
00493                                     double fullRate1,
00494                                     const std::vector<double>& rawRates2,
00495                                     double fullRate2,
00496                                     std::vector<double>& prescales,
00497                                     uint32 psIndex, double lowBound)
00498 {
00499   // initialization
00500   bool resetToOneDone = false;
00501   double baseFOM = calcFigureOfMerit(rawRates1, fullRate1,
00502                                      rawRates2, fullRate2,
00503                                      prescales);
00504   double fom = baseFOM;
00505   double lastFOM = fom * 2;
00506   int loopCount = 0;
00507   if (lowBound < 1.0) {lowBound = 1.0;}
00508   if (prescales[psIndex] <= 1.0) {return false;}
00509 
00510   // loop as long as the FOM is getting better
00511   // (The check here on loopCount is simply to prevent infinite loops,
00512   // although loopCount is used for a different purpose below.)
00513   while (fom < lastFOM && prescales[psIndex] >= lowBound &&
00514          loopCount < 1000) {
00515     ++loopCount;
00516     lastFOM = fom;
00517 
00518     // reduce the prescale under test
00519     prescales[psIndex] *= 0.99;
00520     if (! resetToOneDone && prescales[psIndex] < 1.0) {
00521       resetToOneDone = true;
00522       prescales[psIndex] = 1.0;
00523     }
00524     fom = calcFigureOfMerit(rawRates1, fullRate1,
00525                             rawRates2, fullRate2,
00526                             prescales);
00527   }
00528 
00529   // restore the prescale under test to the value that it had
00530   // when the minimum FOM was reached
00531   prescales[psIndex] /= 0.99;
00532 
00533   //if ((debugCounter % 500) == 0) {
00534   //  fom = calcFigureOfMerit(rawRates1, fullRate1,
00535   //                          rawRates2, fullRate2,
00536   //                          prescales);
00537   //  printf("FOM value = %10.4f for prescales", fom);
00538   //  for (uint32 pdx = 0; pdx < prescales.size(); ++pdx) {
00539   //    printf("%5.2f", prescales[pdx]);
00540   //  }
00541   //  double rate1 = calcRate(rawRates1, prescales);
00542   //  double rate2 = calcRate(rawRates2, prescales);
00543   //  printf("\n  Event and data rate = %6.2f %7.2f\n", rate1, rate2);
00544   //}
00545 
00546   // if we only looped once, then loosening didn't have any effect
00547   return (loopCount > 1);
00548 }
00549 
00555 double RateLimiter::calcFigureOfMerit(const std::vector<double>& rawRates1,
00556                                       double fullRate1,
00557                                       const std::vector<double>& rawRates2,
00558                                       double fullRate2,
00559                                       const std::vector<double>& prescales)
00560 {
00561   double sum1 = 0.0;
00562   for (uint32 idx = 0; idx < rawRates1.size(); ++idx) {
00563     sum1 += (rawRates1[idx] / prescales[idx]);
00564   }
00565   double delta1 = sum1 - fullRate1;
00566   double ratio1 = delta1 / fullRate1;
00567   // give an extra penalty to values more than the max
00568   if (ratio1 > 0.0) {ratio1 *= 5.0;}
00569 
00570   double sum2 = 0.0;
00571   for (uint32 idx = 0; idx < rawRates2.size(); ++idx) {
00572     sum2 += (rawRates2[idx] / prescales[idx]);
00573   }
00574   double delta2 = sum2 - fullRate2;
00575   double ratio2 = delta2 / fullRate2;
00576   // give an extra penalty to values more than the max
00577   if (ratio2 > 0.0) {ratio2 *= 5.0;}
00578 
00579   return ((ratio1 * ratio1) + (ratio2 * ratio2));
00580 }

Generated on Tue Jun 9 17:34:57 2009 for CMSSW by  doxygen 1.5.4