CMS 3D CMS Logo

EventServer.cc

Go to the documentation of this file.
00001 
00008 #include "EventFilter/StorageManager/interface/EventServer.h"
00009 #include "FWCore/Utilities/interface/DebugMacros.h"
00010 
00011 #include <iostream>
00012 #include <boost/algorithm/string/case_conv.hpp>
00013 #include "zlib.h"
00014 
00015 using namespace std;
00016 using namespace stor;
00017 using namespace edm;
00018 
00023 EventServer::EventServer(double maxEventRate, double maxDataRate,
00024                          std::string hltOutputSelection,
00025                          bool runFairShareAlgo)
00026 {
00027   // initialize counters
00028   disconnectedConsumerTestCounter_ = 0;
00029 
00030   selTableStringSize_ = 0;
00031 
00032   ltInputCounters_.clear();
00033   stInputCounters_.clear();
00034   ltAcceptCounters_.clear();
00035   stAcceptCounters_.clear();
00036   ltOutputCounters_.clear();
00037   stOutputCounters_.clear();
00038 
00039   //rateLimiter_.reset(new RateLimiter(maxEventRate, maxDataRate));
00040   this->maxEventRate_ = maxEventRate;
00041   this->maxDataRate_ = maxDataRate;
00042 
00043   // 11-Jun-2008, KAB - the fair-share algo needs better requirements AND
00044   // needs consumer subscriptions to be forwarded from the ProxyServer
00045   // to the SMs.  So, prevent its use, for now.
00046   // (To be honest, I'm not sure that we can even support dynamic switching
00047   // on and off of the FS algo.)
00048   assert(! runFairShareAlgo);
00049   this->runFairShareAlgo_ = runFairShareAlgo;
00050 
00051   this->hltOutputSelection_ = hltOutputSelection;
00052   uLong crc = crc32(0L, Z_NULL, 0);
00053   Bytef* crcbuf = (Bytef*) hltOutputSelection.data();
00054   crc = crc32(crc, crcbuf, hltOutputSelection.length());
00055   this->hltOutputModuleId_ = static_cast<uint32>(crc);
00056 
00057   outsideTimer_.reset();
00058   insideTimer_.reset();
00059 
00060   // initialize the counters that we use for statistics
00061   longTermInsideCPUTimeCounter_.reset(new ForeverCounter());
00062   shortTermInsideCPUTimeCounter_.reset(new RollingIntervalCounter(180,5,20));
00063   longTermInsideRealTimeCounter_.reset(new ForeverCounter());
00064   shortTermInsideRealTimeCounter_.reset(new RollingIntervalCounter(180,5,20));
00065   longTermOutsideCPUTimeCounter_.reset(new ForeverCounter());
00066   shortTermOutsideCPUTimeCounter_.reset(new RollingIntervalCounter(180,5,20));
00067   longTermOutsideRealTimeCounter_.reset(new ForeverCounter());
00068   shortTermOutsideRealTimeCounter_.reset(new RollingIntervalCounter(180,5,20));
00069 
00070   generator_.reset(new boost::uniform_01<boost::mt19937>(baseGenerator_));
00071 }
00072 
00076 EventServer::~EventServer()
00077 {
00078   FDEBUG(5) << "Executing destructor for event server " << std::endl;
00079 }
00080 
00084 void EventServer::addConsumer(boost::shared_ptr<ConsumerPipe> consumer)
00085 {
00086   uint32 consumerId = consumer->getConsumerId();
00087   consumerTable_[consumerId] = consumer;
00088 
00089   // add the consumer (by ID) to the rateLimiter instance that we use
00090   // to provide a fair share of the limited bandwidth to each consumer.
00091   //rateLimiter_->addConsumer(consumerId);
00092 }
00093 
00094 std::map< uint32, boost::shared_ptr<ConsumerPipe> > EventServer::getConsumerTable()
00095 {
00096   return(consumerTable_);
00097 }
00098 
00103 boost::shared_ptr<ConsumerPipe> EventServer::getConsumer(uint32 consumerId)
00104 {
00105   // initial empty pointer
00106   boost::shared_ptr<ConsumerPipe> consPtr;
00107 
00108   // lookup the consumer
00109   std::map< uint32, boost::shared_ptr<ConsumerPipe> >::const_iterator consIter;
00110   consIter = consumerTable_.find(consumerId);
00111   if (consIter != consumerTable_.end())
00112   {
00113     consPtr = consIter->second;
00114   }
00115 
00116   // return the pointer
00117   return consPtr;
00118 }
00119 
00128 void EventServer::processEvent(const EventMsgView &eventView)
00129 {
00130   // do nothing if the event is empty
00131   if (eventView.size() == 0) {return;}
00132 
00133   boost::shared_ptr<ForeverCounter> ltCounter;
00134   boost::shared_ptr<RollingIntervalCounter> stCounter;
00135   std::map<uint32, boost::shared_ptr<ForeverCounter> >::iterator ltIter;
00136   std::map<uint32, boost::shared_ptr<RollingIntervalCounter> >::iterator stIter;
00137 
00138   // stop the timer that we use to measure CPU and real time outside the ES
00139   outsideTimer_.stop();
00140 
00141   // stop the timer that we use to measure CPU and real time inside the ES
00142   insideTimer_.start();
00143 
00144   // add the event to our statistics for events that are input to the ES
00145   double sizeInMB = static_cast<double>(eventView.size()) / 1048576.0;
00146   double now = BaseCounter::getCurrentTime();
00147   uint32 outputModuleId = eventView.outModId();
00148   // fetch (or create) the correct long term counter for the output module
00149   ltIter = ltInputCounters_.find(outputModuleId);
00150   if (ltIter == ltInputCounters_.end()) {
00151     ltCounter.reset(new ForeverCounter());
00152     ltInputCounters_[outputModuleId] = ltCounter;
00153   }
00154   else {
00155     ltCounter = ltIter->second;
00156   }
00157   // fetch (or create) the correct short term counter for the output module
00158   stIter = stInputCounters_.find(outputModuleId);
00159   if (stIter == stInputCounters_.end()) {
00160     stCounter.reset(new RollingIntervalCounter(180,5,20));
00161     stInputCounters_[outputModuleId] = stCounter;
00162   }
00163   else {
00164     stCounter = stIter->second;
00165   }
00166   ltCounter->addSample(sizeInMB);
00167   stCounter->addSample(sizeInMB, now);
00168 
00169   // the event must be from the correct HLT output module
00170   if (outputModuleId != hltOutputModuleId_) {
00171     // track timer statistics and start/stop timers as appropriate
00172     insideTimer_.stop();
00173     longTermInsideCPUTimeCounter_->addSample(insideTimer_.cpuTime());
00174     shortTermInsideCPUTimeCounter_->addSample(insideTimer_.cpuTime(), now);
00175     longTermInsideRealTimeCounter_->addSample(insideTimer_.realTime());
00176     shortTermInsideRealTimeCounter_->addSample(insideTimer_.realTime(), now);
00177     longTermOutsideCPUTimeCounter_->addSample(outsideTimer_.cpuTime());
00178     shortTermOutsideCPUTimeCounter_->addSample(outsideTimer_.cpuTime(), now);
00179     longTermOutsideRealTimeCounter_->addSample(outsideTimer_.realTime());
00180     shortTermOutsideRealTimeCounter_->addSample(outsideTimer_.realTime(), now);
00181     outsideTimer_.reset();
00182     insideTimer_.reset();
00183     outsideTimer_.start();
00184     return;
00185   }
00186 
00187   // if we're not running the fair-share algorithm, 
00188   // prescale events based on the input event and data rates
00189   if (! runFairShareAlgo_) {
00190     double eventRate = stCounter->getSampleRate(now);
00191     double dataRate = stCounter->getValueRate(now);
00192     double eventRatePrescale = eventRate / maxEventRate_;
00193     double dataRatePrescale = dataRate / maxDataRate_;
00194     double effectivePrescale = std::max(eventRatePrescale, dataRatePrescale);
00195     if (effectivePrescale > 1.0) {
00196       double instantRatio = 1.0 / effectivePrescale;
00197       double randValue = (*generator_)();
00198       if (randValue > instantRatio) {
00199         // track timer statistics and start/stop timers as appropriate
00200         insideTimer_.stop();
00201         longTermInsideCPUTimeCounter_->addSample(insideTimer_.cpuTime());
00202         shortTermInsideCPUTimeCounter_->addSample(insideTimer_.cpuTime(), now);
00203         longTermInsideRealTimeCounter_->addSample(insideTimer_.realTime());
00204         shortTermInsideRealTimeCounter_->addSample(insideTimer_.realTime(), now);
00205         longTermOutsideCPUTimeCounter_->addSample(outsideTimer_.cpuTime());
00206         shortTermOutsideCPUTimeCounter_->addSample(outsideTimer_.cpuTime(), now);
00207         longTermOutsideRealTimeCounter_->addSample(outsideTimer_.realTime());
00208         shortTermOutsideRealTimeCounter_->addSample(outsideTimer_.realTime(), now);
00209         outsideTimer_.reset();
00210         insideTimer_.reset();
00211         outsideTimer_.start();
00212         return;
00213       }
00214     }
00215   }
00216 
00217   // loop over the consumers in our list, and for each one check whether
00218   // it is ready for an event and if it wants this specific event.  If so,
00219   // create a local copy of the event (if not already done) and pass it
00220   // to the consumer pipe.
00221 
00222   // determine which consumers are interested in the event
00223   std::vector<uint32> candidateList;
00224   boost::shared_ptr< vector<char> > bufPtr;
00225   std::map< uint32, boost::shared_ptr<ConsumerPipe> >::const_iterator consIter;
00226   for (consIter = consumerTable_.begin();
00227        consIter != consumerTable_.end();
00228        consIter++)
00229   {
00230     // test if the consumer is ready and wants the event
00231     boost::shared_ptr<ConsumerPipe> consPipe = consIter->second;
00232     FDEBUG(5) << "Checking if consumer " << consPipe->getConsumerId() <<
00233       " wants event " << eventView.event() << std::endl;
00234     if (consPipe->wantsEvent(eventView) &&
00235         consPipe->isReadyForEvent(now))
00236     {
00237       candidateList.push_back(consPipe->getConsumerId());
00238       consPipe->wasConsidered(now);
00239     }
00240   }
00241 
00242   // do nothing if there are no candidates
00243   if (candidateList.size() == 0) {
00244     // track timer statistics and start/stop timers as appropriate
00245     insideTimer_.stop();
00246     longTermInsideCPUTimeCounter_->addSample(insideTimer_.cpuTime());
00247     shortTermInsideCPUTimeCounter_->addSample(insideTimer_.cpuTime(), now);
00248     longTermInsideRealTimeCounter_->addSample(insideTimer_.realTime());
00249     shortTermInsideRealTimeCounter_->addSample(insideTimer_.realTime(), now);
00250     longTermOutsideCPUTimeCounter_->addSample(outsideTimer_.cpuTime());
00251     shortTermOutsideCPUTimeCounter_->addSample(outsideTimer_.cpuTime(), now);
00252     longTermOutsideRealTimeCounter_->addSample(outsideTimer_.realTime());
00253     shortTermOutsideRealTimeCounter_->addSample(outsideTimer_.realTime(), now);
00254     outsideTimer_.reset();
00255     insideTimer_.reset();
00256     outsideTimer_.start();
00257     return;
00258   }
00259 
00260   // determine which of the candidate consumers are allowed
00261   // to receive another event at this time
00262   std::vector<uint32> allowedList;
00263   //if (runFairShareAlgo_) {
00264   //  allowedList = rateLimiter_->getAllowedConsumersFromList(sizeInMB,
00265   //                                                          candidateList);
00266   //}
00267   //else {
00268     allowedList = candidateList;
00269   //}
00270 
00271   // send the event to the allowed consumers
00272   for (uint32 idx = 0; idx < allowedList.size(); ++idx)
00273   {
00274     uint32 consumerId = allowedList[idx];
00275 
00276     // check if we need to make a local copy of the event
00277     if (bufPtr.get() == NULL)
00278     {
00279       FDEBUG(5) << "Creating a buffer for event " <<
00280         eventView.event() << std::endl;
00281 
00282       // create a local buffer of the appropriate size
00283       boost::shared_ptr< vector<char> >
00284         tmpBufPtr(new vector<char>(eventView.size()));
00285 
00286       // copy the data to the local buffer
00287       unsigned char *target = (unsigned char *) &(*tmpBufPtr)[0];
00288       unsigned char *source = eventView.startAddress();
00289       int dataSize = eventView.size();
00290       std::copy(source, source+dataSize, target);
00291 
00292       // switch buffers
00293       bufPtr.swap(tmpBufPtr);
00294 
00295       // add the event to our statistics for "unique accept" events
00296       ltIter = ltAcceptCounters_.find(outputModuleId);
00297       if (ltIter == ltAcceptCounters_.end()) {
00298         ltCounter.reset(new ForeverCounter());
00299         ltAcceptCounters_[outputModuleId] = ltCounter;
00300       }
00301       else {
00302         ltCounter = ltIter->second;
00303       }
00304       stIter = stAcceptCounters_.find(outputModuleId);
00305       if (stIter == stAcceptCounters_.end()) {
00306         stCounter.reset(new RollingIntervalCounter(180,5,20));
00307         stAcceptCounters_[outputModuleId] = stCounter;
00308       }
00309       else {
00310         stCounter = stIter->second;
00311       }
00312       ltCounter->addSample(sizeInMB);
00313       stCounter->addSample(sizeInMB, now);
00314     }
00315 
00316     // add the event to the consumer pipe
00317     boost::shared_ptr<ConsumerPipe> consPipe = getConsumer(consumerId);
00318     consPipe->putEvent(bufPtr);
00319 
00320     // add the event to our statistics for "output" events
00321     // Adding the stats once per consumer is (currently) believed
00322     // to give a more accurate picture of what is being sent out.
00323     // (Even though we only have one copy of the event internally,
00324     // it uses up bandwidth N times for N consumers.)
00325     ltIter = ltOutputCounters_.find(outputModuleId);
00326     if (ltIter == ltOutputCounters_.end()) {
00327       ltCounter.reset(new ForeverCounter());
00328       ltOutputCounters_[outputModuleId] = ltCounter;
00329     }
00330     else {
00331       ltCounter = ltIter->second;
00332     }
00333     stIter = stOutputCounters_.find(outputModuleId);
00334     if (stIter == stOutputCounters_.end()) {
00335       stCounter.reset(new RollingIntervalCounter(180,5,20));
00336       stOutputCounters_[outputModuleId] = stCounter;
00337     }
00338     else {
00339       stCounter = stIter->second;
00340     }
00341     ltCounter->addSample(sizeInMB);
00342     stCounter->addSample(sizeInMB, now);
00343   }
00344 
00345   // periodically check for disconnected consumers
00346   disconnectedConsumerTestCounter_++;
00347   if (disconnectedConsumerTestCounter_ >= 500)
00348   {
00349     // reset counter
00350     disconnectedConsumerTestCounter_ = 0;
00351 
00352     // determine which consumers have disconnected
00353     std::vector<uint32> disconnectList;
00354     std::map< uint32, boost::shared_ptr<ConsumerPipe> >::const_iterator consIter;
00355     for (consIter = consumerTable_.begin();
00356          consIter != consumerTable_.end();
00357          consIter++)
00358     {
00359       boost::shared_ptr<ConsumerPipe> consPipe = consIter->second;
00360       FDEBUG(5) << "Checking if consumer " << consPipe->getConsumerId() <<
00361         " has disconnected " << std::endl;
00362       if (consPipe->isDisconnected())
00363       {
00364         disconnectList.push_back(consIter->first);
00365       }
00366     }
00367 
00368     // remove disconnected consumers from the consumer table
00369     std::vector<uint32>::const_iterator listIter;
00370     for (listIter = disconnectList.begin();
00371          listIter != disconnectList.end();
00372          listIter++)
00373     {
00374       uint32 consumerId = *listIter;
00375       consumerTable_.erase(consumerId);
00376 
00377       // remove the consumer from the rateLimiter instance so that it is
00378       // no longer considered for a fair share of the allowed bandwidth
00379       //rateLimiter_->removeConsumer(consumerId);
00380     }
00381   }
00382 
00383   // track timer statistics and start/stop timers as appropriate
00384   now = BaseCounter::getCurrentTime();
00385   insideTimer_.stop();
00386   longTermInsideCPUTimeCounter_->addSample(insideTimer_.cpuTime());
00387   shortTermInsideCPUTimeCounter_->addSample(insideTimer_.cpuTime(), now);
00388   longTermInsideRealTimeCounter_->addSample(insideTimer_.realTime());
00389   shortTermInsideRealTimeCounter_->addSample(insideTimer_.realTime(), now);
00390   longTermOutsideCPUTimeCounter_->addSample(outsideTimer_.cpuTime());
00391   shortTermOutsideCPUTimeCounter_->addSample(outsideTimer_.cpuTime(), now);
00392   longTermOutsideRealTimeCounter_->addSample(outsideTimer_.realTime());
00393   shortTermOutsideRealTimeCounter_->addSample(outsideTimer_.realTime(), now);
00394   outsideTimer_.reset();
00395   insideTimer_.reset();
00396   outsideTimer_.start();
00397 }
00398 
00402 boost::shared_ptr< std::vector<char> > EventServer::getEvent(uint32 consumerId)
00403 {
00404   // initial empty buffer
00405   boost::shared_ptr< vector<char> > bufPtr;
00406 
00407   // lookup the consumer
00408   std::map< uint32, boost::shared_ptr<ConsumerPipe> >::const_iterator consIter;
00409   consIter = consumerTable_.find(consumerId);
00410   if (consIter != consumerTable_.end())
00411   {
00412     boost::shared_ptr<ConsumerPipe> consPipe = consIter->second;
00413     bufPtr = consPipe->getEvent();
00414   }
00415 
00416   // return the event buffer
00417   return bufPtr;
00418 }
00419 
00420 void EventServer::clearQueue()
00421 {
00422   std::map< uint32, boost::shared_ptr<ConsumerPipe> >::const_iterator consIter;
00423   for (consIter = consumerTable_.begin();
00424        consIter != consumerTable_.end();
00425        consIter++)
00426   {
00427     boost::shared_ptr<ConsumerPipe> consPipe = consIter->second;
00428     consPipe->clearQueue();
00429   }
00430 }
00431 
00432 void EventServer::setStreamSelectionTable(std::map<std::string, Strings> const& selTable)
00433 {
00434   streamSelectionTable_ = selTable;
00435   selTableStringSize_ = 0;
00436   std::map<std::string, Strings>::const_iterator mapIter;
00437   for (mapIter = selTable.begin(); mapIter != selTable.end(); mapIter++)
00438   {
00439     std::string streamLabel = mapIter->first;
00440     selTableStringSize_ += streamLabel.size();
00441     Strings selectionList = mapIter->second;
00442     for (unsigned int idx = 0; idx < selectionList.size(); idx++)
00443     {
00444       std::string selection = selectionList[idx];
00445       selTableStringSize_ += selection.size();
00446     }
00447   }
00448 }
00449 
00450 Strings EventServer::updateTriggerSelectionForStreams(Strings const& selectionList)
00451 {
00452   Strings modifiedList;
00453   for (unsigned int idx = 0; idx < selectionList.size(); idx++) {
00454     std::string selection = selectionList[idx];
00455     std::string lcSelection = boost::algorithm::to_lower_copy(selection);
00456     if (lcSelection.find("stream", 0) == 0) {
00457       std::string streamLabel = selection.substr(6);
00458       std::map<std::string, Strings>::const_iterator mapIter =
00459         streamSelectionTable_.find(streamLabel);
00460       if (mapIter != streamSelectionTable_.end()) {
00461         Strings streamSelectionList = mapIter->second;
00462         for (unsigned int jdx = 0; jdx < streamSelectionList.size(); jdx++) {
00463           modifiedList.push_back(streamSelectionList.at(jdx));
00464         }
00465       }
00466       else {
00467         modifiedList.push_back(selection);
00468       }
00469     }
00470     else {
00471       modifiedList.push_back(selection);
00472     }
00473   }
00474   return modifiedList;
00475 }
00476 
00481 long long EventServer::getEventCount(STATS_TIME_FRAME timeFrame,
00482                                      STATS_SAMPLE_TYPE sampleType,
00483                                      uint32 outputModuleId,
00484                                      double currentTime)
00485 {
00486   boost::shared_ptr<ForeverCounter> ltCounter;
00487   boost::shared_ptr<RollingIntervalCounter> stCounter;
00488   std::map<uint32, boost::shared_ptr<ForeverCounter> >::iterator ltIter;
00489   std::map<uint32, boost::shared_ptr<RollingIntervalCounter> >::iterator stIter;
00490 
00491   if (timeFrame == SHORT_TERM_STATS) {
00492     if (sampleType == INPUT_STATS) {
00493       stIter = stInputCounters_.find(outputModuleId);
00494       if (stIter != stInputCounters_.end()) {
00495         stCounter = stIter->second;
00496         return stCounter->getSampleCount(currentTime);
00497       }
00498       else {
00499         return 0;
00500       }
00501     }
00502     else if (sampleType == UNIQUE_ACCEPT_STATS) {
00503       stIter = stAcceptCounters_.find(outputModuleId);
00504       if (stIter != stAcceptCounters_.end()) {
00505         stCounter = stIter->second;
00506         return stCounter->getSampleCount(currentTime);
00507       }
00508       else {
00509         return 0;
00510       }
00511     }
00512     else {
00513       stIter = stOutputCounters_.find(outputModuleId);
00514       if (stIter != stOutputCounters_.end()) {
00515         stCounter = stIter->second;
00516         return stCounter->getSampleCount(currentTime);
00517       }
00518       else {
00519         return 0;
00520       }
00521     }
00522   }
00523   else {
00524     if (sampleType == INPUT_STATS) {
00525       ltIter = ltInputCounters_.find(outputModuleId);
00526       if (ltIter != ltInputCounters_.end()) {
00527         ltCounter = ltIter->second;
00528         return ltCounter->getSampleCount();
00529       }
00530       else {
00531         return 0;
00532       }
00533     }
00534     else if (sampleType == UNIQUE_ACCEPT_STATS) {
00535       ltIter = ltAcceptCounters_.find(outputModuleId);
00536       if (ltIter != ltAcceptCounters_.end()) {
00537         ltCounter = ltIter->second;
00538         return ltCounter->getSampleCount();
00539       }
00540       else {
00541         return 0;
00542       }
00543     }
00544     else {
00545       ltIter = ltOutputCounters_.find(outputModuleId);
00546       if (ltIter != ltOutputCounters_.end()) {
00547         ltCounter = ltIter->second;
00548         return ltCounter->getSampleCount();
00549       }
00550       else {
00551         return 0;
00552       }
00553     }
00554   }
00555 }
00556 
00561 double EventServer::getEventRate(STATS_TIME_FRAME timeFrame,
00562                                  STATS_SAMPLE_TYPE sampleType,
00563                                  uint32 outputModuleId,
00564                                  double currentTime)
00565 {
00566   boost::shared_ptr<ForeverCounter> ltCounter;
00567   boost::shared_ptr<RollingIntervalCounter> stCounter;
00568   std::map<uint32, boost::shared_ptr<ForeverCounter> >::iterator ltIter;
00569   std::map<uint32, boost::shared_ptr<RollingIntervalCounter> >::iterator stIter;
00570 
00571   if (timeFrame == SHORT_TERM_STATS) {
00572     if (sampleType == INPUT_STATS) {
00573       stIter = stInputCounters_.find(outputModuleId);
00574       if (stIter != stInputCounters_.end()) {
00575         stCounter = stIter->second;
00576         return stCounter->getSampleRate(currentTime);
00577       }
00578       else {
00579         return 0;
00580       }
00581     }
00582     else if (sampleType == UNIQUE_ACCEPT_STATS) {
00583       stIter = stAcceptCounters_.find(outputModuleId);
00584       if (stIter != stAcceptCounters_.end()) {
00585         stCounter = stIter->second;
00586         return stCounter->getSampleRate(currentTime);
00587       }
00588       else {
00589         return 0;
00590       }
00591     }
00592     else {
00593       stIter = stOutputCounters_.find(outputModuleId);
00594       if (stIter != stOutputCounters_.end()) {
00595         stCounter = stIter->second;
00596         return stCounter->getSampleRate(currentTime);
00597       }
00598       else {
00599         return 0;
00600       }
00601     }
00602   }
00603   else {
00604     if (sampleType == INPUT_STATS) {
00605       ltIter = ltInputCounters_.find(outputModuleId);
00606       if (ltIter != ltInputCounters_.end()) {
00607         ltCounter = ltIter->second;
00608         return ltCounter->getSampleRate(currentTime);
00609       }
00610       else {
00611         return 0;
00612       }
00613     }
00614     else if (sampleType == UNIQUE_ACCEPT_STATS) {
00615       ltIter = ltAcceptCounters_.find(outputModuleId);
00616       if (ltIter != ltAcceptCounters_.end()) {
00617         ltCounter = ltIter->second;
00618         return ltCounter->getSampleRate(currentTime);
00619       }
00620       else {
00621         return 0;
00622       }
00623     }
00624     else {
00625       ltIter = ltOutputCounters_.find(outputModuleId);
00626       if (ltIter != ltOutputCounters_.end()) {
00627         ltCounter = ltIter->second;
00628         return ltCounter->getSampleRate(currentTime);
00629       }
00630       else {
00631         return 0;
00632       }
00633     }
00634   }
00635 }
00636 
00641 double EventServer::getDataRate(STATS_TIME_FRAME timeFrame,
00642                                 STATS_SAMPLE_TYPE sampleType,
00643                                 uint32 outputModuleId,
00644                                 double currentTime)
00645 {
00646   boost::shared_ptr<ForeverCounter> ltCounter;
00647   boost::shared_ptr<RollingIntervalCounter> stCounter;
00648   std::map<uint32, boost::shared_ptr<ForeverCounter> >::iterator ltIter;
00649   std::map<uint32, boost::shared_ptr<RollingIntervalCounter> >::iterator stIter;
00650 
00651   if (timeFrame == SHORT_TERM_STATS) {
00652     if (sampleType == INPUT_STATS) {
00653       stIter = stInputCounters_.find(outputModuleId);
00654       if (stIter != stInputCounters_.end()) {
00655         stCounter = stIter->second;
00656         return stCounter->getValueRate(currentTime);
00657       }
00658       else {
00659         return 0;
00660       }
00661     }
00662     else if (sampleType == UNIQUE_ACCEPT_STATS) {
00663       stIter = stAcceptCounters_.find(outputModuleId);
00664       if (stIter != stAcceptCounters_.end()) {
00665         stCounter = stIter->second;
00666         return stCounter->getValueRate(currentTime);
00667       }
00668       else {
00669         return 0;
00670       }
00671     }
00672     else {
00673       stIter = stOutputCounters_.find(outputModuleId);
00674       if (stIter != stOutputCounters_.end()) {
00675         stCounter = stIter->second;
00676         return stCounter->getValueRate(currentTime);
00677       }
00678       else {
00679         return 0;
00680       }
00681     }
00682   }
00683   else {
00684     if (sampleType == INPUT_STATS) {
00685       ltIter = ltInputCounters_.find(outputModuleId);
00686       if (ltIter != ltInputCounters_.end()) {
00687         ltCounter = ltIter->second;
00688         return ltCounter->getValueRate(currentTime);
00689       }
00690       else {
00691         return 0;
00692       }
00693     }
00694     else if (sampleType == UNIQUE_ACCEPT_STATS) {
00695       ltIter = ltAcceptCounters_.find(outputModuleId);
00696       if (ltIter != ltAcceptCounters_.end()) {
00697         ltCounter = ltIter->second;
00698         return ltCounter->getValueRate(currentTime);
00699       }
00700       else {
00701         return 0;
00702       }
00703     }
00704     else {
00705       ltIter = ltOutputCounters_.find(outputModuleId);
00706       if (ltIter != ltOutputCounters_.end()) {
00707         ltCounter = ltIter->second;
00708         return ltCounter->getValueRate(currentTime);
00709       }
00710       else {
00711         return 0;
00712       }
00713     }
00714   }
00715 }
00716 
00723 double EventServer::getDuration(STATS_TIME_FRAME timeFrame,
00724                                 STATS_SAMPLE_TYPE sampleType,
00725                                 uint32 outputModuleId,
00726                                 double currentTime)
00727 {
00728   boost::shared_ptr<ForeverCounter> ltCounter;
00729   boost::shared_ptr<RollingIntervalCounter> stCounter;
00730   std::map<uint32, boost::shared_ptr<ForeverCounter> >::iterator ltIter;
00731   std::map<uint32, boost::shared_ptr<RollingIntervalCounter> >::iterator stIter;
00732 
00733   if (timeFrame == SHORT_TERM_STATS) {
00734     if (sampleType == INPUT_STATS) {
00735       stIter = stInputCounters_.find(outputModuleId);
00736       if (stIter != stInputCounters_.end()) {
00737         stCounter = stIter->second;
00738         return stCounter->getDuration(currentTime);
00739       }
00740       else {
00741         return 0;
00742       }
00743     }
00744     else if (sampleType == UNIQUE_ACCEPT_STATS) {
00745       stIter = stAcceptCounters_.find(outputModuleId);
00746       if (stIter != stAcceptCounters_.end()) {
00747         stCounter = stIter->second;
00748         return stCounter->getDuration(currentTime);
00749       }
00750       else {
00751         return 0;
00752       }
00753     }
00754     else {
00755       stIter = stOutputCounters_.find(outputModuleId);
00756       if (stIter != stOutputCounters_.end()) {
00757         stCounter = stIter->second;
00758         return stCounter->getDuration(currentTime);
00759       }
00760       else {
00761         return 0;
00762       }
00763     }
00764   }
00765   else {
00766     if (sampleType == INPUT_STATS) {
00767       ltIter = ltInputCounters_.find(outputModuleId);
00768       if (ltIter != ltInputCounters_.end()) {
00769         ltCounter = ltIter->second;
00770         return ltCounter->getDuration(currentTime);
00771       }
00772       else {
00773         return 0;
00774       }
00775     }
00776     else if (sampleType == UNIQUE_ACCEPT_STATS) {
00777       ltIter = ltAcceptCounters_.find(outputModuleId);
00778       if (ltIter != ltAcceptCounters_.end()) {
00779         ltCounter = ltIter->second;
00780         return ltCounter->getDuration(currentTime);
00781       }
00782       else {
00783         return 0;
00784       }
00785     }
00786     else {
00787       ltIter = ltOutputCounters_.find(outputModuleId);
00788       if (ltIter != ltOutputCounters_.end()) {
00789         ltCounter = ltIter->second;
00790         return ltCounter->getDuration(currentTime);
00791       }
00792       else {
00793         return 0;
00794       }
00795     }
00796   }
00797 }
00798 
00804 double EventServer::getInternalTime(STATS_TIME_FRAME timeFrame,
00805                                     STATS_TIMING_TYPE timingType,
00806                                     double currentTime)
00807 {
00808   if (timeFrame == SHORT_TERM_STATS) {
00809     if (timingType == CPUTIME) {
00810       return shortTermInsideCPUTimeCounter_->getValueSum(currentTime);
00811     }
00812     else {
00813       return shortTermInsideRealTimeCounter_->getValueSum(currentTime);
00814     }
00815   }
00816   else {
00817     if (timingType == CPUTIME) {
00818       return longTermInsideCPUTimeCounter_->getValueSum();
00819     }
00820     else {
00821       return longTermInsideRealTimeCounter_->getValueSum();
00822     }
00823   }
00824 }
00825 
00831 double EventServer::getTotalTime(STATS_TIME_FRAME timeFrame,
00832                                  STATS_TIMING_TYPE timingType,
00833                                  double currentTime)
00834 {
00835   if (timeFrame == SHORT_TERM_STATS) {
00836     if (timingType == CPUTIME) {
00837       double insideTime =
00838         shortTermInsideCPUTimeCounter_->getValueSum(currentTime);
00839       double outsideTime =
00840         shortTermOutsideCPUTimeCounter_->getValueSum(currentTime);
00841       return (insideTime + outsideTime);
00842     }
00843     else {
00844       double insideTime =
00845         shortTermInsideRealTimeCounter_->getValueSum(currentTime);
00846       double outsideTime =
00847         shortTermOutsideRealTimeCounter_->getValueSum(currentTime);
00848       return (insideTime + outsideTime);
00849     }
00850   }
00851   else {
00852     if (timingType == CPUTIME) {
00853       double insideTime =
00854         longTermInsideCPUTimeCounter_->getValueSum();
00855       double outsideTime =
00856         longTermOutsideCPUTimeCounter_->getValueSum();
00857       return (insideTime + outsideTime);
00858     }
00859     else {
00860       double insideTime =
00861         longTermInsideRealTimeCounter_->getValueSum();
00862       double outsideTime =
00863         longTermOutsideRealTimeCounter_->getValueSum();
00864       return (insideTime + outsideTime);
00865     }
00866   }
00867 }
00868 
00874 double EventServer::getTimeFraction(STATS_TIME_FRAME timeFrame,
00875                                     STATS_TIMING_TYPE timingType,
00876                                     double currentTime)
00877 {
00878   if (timeFrame == SHORT_TERM_STATS) {
00879     if (timingType == CPUTIME) {
00880       double insideTime =
00881         shortTermInsideCPUTimeCounter_->getValueSum(currentTime);
00882       double outsideTime =
00883         shortTermOutsideCPUTimeCounter_->getValueSum(currentTime);
00884       if (outsideTime > 0.0) {
00885         return (insideTime / (insideTime + outsideTime));
00886       }
00887       else {
00888         return 0.0;
00889       }
00890     }
00891     else {
00892       double insideTime =
00893         shortTermInsideRealTimeCounter_->getValueSum(currentTime);
00894       double outsideTime =
00895         shortTermOutsideRealTimeCounter_->getValueSum(currentTime);
00896       if (outsideTime > 0.0) {
00897         return (insideTime / (insideTime + outsideTime));
00898       }
00899       else {
00900         return 0.0;
00901       }
00902     }
00903   }
00904   else {
00905     if (timingType == CPUTIME) {
00906       double insideTime =
00907         longTermInsideCPUTimeCounter_->getValueSum();
00908       double outsideTime =
00909         longTermOutsideCPUTimeCounter_->getValueSum();
00910       if (outsideTime > 0.0) {
00911         return (insideTime / (insideTime + outsideTime));
00912       }
00913       else {
00914         return 0.0;
00915       }
00916     }
00917     else {
00918       double insideTime =
00919         longTermInsideRealTimeCounter_->getValueSum();
00920       double outsideTime =
00921         longTermOutsideRealTimeCounter_->getValueSum();
00922       if (outsideTime > 0.0) {
00923         return (insideTime / (insideTime + outsideTime));
00924       }
00925       else {
00926         return 0.0;
00927       }
00928     }
00929   }
00930 }

Generated on Tue Jun 9 17:34:56 2009 for CMSSW by  doxygen 1.5.4