00001
00008 #include "EventFilter/StorageManager/interface/EventServer.h"
00009 #include "FWCore/Utilities/interface/DebugMacros.h"
00010
00011 #include <iostream>
00012 #include <boost/algorithm/string/case_conv.hpp>
00013 #include "zlib.h"
00014
00015 using namespace std;
00016 using namespace stor;
00017 using namespace edm;
00018
00023 EventServer::EventServer(double maxEventRate, double maxDataRate,
00024 std::string hltOutputSelection,
00025 bool runFairShareAlgo)
00026 {
00027
00028 disconnectedConsumerTestCounter_ = 0;
00029
00030 selTableStringSize_ = 0;
00031
00032 ltInputCounters_.clear();
00033 stInputCounters_.clear();
00034 ltAcceptCounters_.clear();
00035 stAcceptCounters_.clear();
00036 ltOutputCounters_.clear();
00037 stOutputCounters_.clear();
00038
00039
00040 this->maxEventRate_ = maxEventRate;
00041 this->maxDataRate_ = maxDataRate;
00042
00043
00044
00045
00046
00047
00048 assert(! runFairShareAlgo);
00049 this->runFairShareAlgo_ = runFairShareAlgo;
00050
00051 this->hltOutputSelection_ = hltOutputSelection;
00052 uLong crc = crc32(0L, Z_NULL, 0);
00053 Bytef* crcbuf = (Bytef*) hltOutputSelection.data();
00054 crc = crc32(crc, crcbuf, hltOutputSelection.length());
00055 this->hltOutputModuleId_ = static_cast<uint32>(crc);
00056
00057 outsideTimer_.reset();
00058 insideTimer_.reset();
00059
00060
00061 longTermInsideCPUTimeCounter_.reset(new ForeverCounter());
00062 shortTermInsideCPUTimeCounter_.reset(new RollingIntervalCounter(180,5,20));
00063 longTermInsideRealTimeCounter_.reset(new ForeverCounter());
00064 shortTermInsideRealTimeCounter_.reset(new RollingIntervalCounter(180,5,20));
00065 longTermOutsideCPUTimeCounter_.reset(new ForeverCounter());
00066 shortTermOutsideCPUTimeCounter_.reset(new RollingIntervalCounter(180,5,20));
00067 longTermOutsideRealTimeCounter_.reset(new ForeverCounter());
00068 shortTermOutsideRealTimeCounter_.reset(new RollingIntervalCounter(180,5,20));
00069
00070 generator_.reset(new boost::uniform_01<boost::mt19937>(baseGenerator_));
00071 }
00072
00076 EventServer::~EventServer()
00077 {
00078 FDEBUG(5) << "Executing destructor for event server " << std::endl;
00079 }
00080
00084 void EventServer::addConsumer(boost::shared_ptr<ConsumerPipe> consumer)
00085 {
00086 uint32 consumerId = consumer->getConsumerId();
00087 consumerTable_[consumerId] = consumer;
00088
00089
00090
00091
00092 }
00093
00094 std::map< uint32, boost::shared_ptr<ConsumerPipe> > EventServer::getConsumerTable()
00095 {
00096 return(consumerTable_);
00097 }
00098
00103 boost::shared_ptr<ConsumerPipe> EventServer::getConsumer(uint32 consumerId)
00104 {
00105
00106 boost::shared_ptr<ConsumerPipe> consPtr;
00107
00108
00109 std::map< uint32, boost::shared_ptr<ConsumerPipe> >::const_iterator consIter;
00110 consIter = consumerTable_.find(consumerId);
00111 if (consIter != consumerTable_.end())
00112 {
00113 consPtr = consIter->second;
00114 }
00115
00116
00117 return consPtr;
00118 }
00119
00128 void EventServer::processEvent(const EventMsgView &eventView)
00129 {
00130
00131 if (eventView.size() == 0) {return;}
00132
00133 boost::shared_ptr<ForeverCounter> ltCounter;
00134 boost::shared_ptr<RollingIntervalCounter> stCounter;
00135 std::map<uint32, boost::shared_ptr<ForeverCounter> >::iterator ltIter;
00136 std::map<uint32, boost::shared_ptr<RollingIntervalCounter> >::iterator stIter;
00137
00138
00139 outsideTimer_.stop();
00140
00141
00142 insideTimer_.start();
00143
00144
00145 double sizeInMB = static_cast<double>(eventView.size()) / 1048576.0;
00146 double now = BaseCounter::getCurrentTime();
00147 uint32 outputModuleId = eventView.outModId();
00148
00149 ltIter = ltInputCounters_.find(outputModuleId);
00150 if (ltIter == ltInputCounters_.end()) {
00151 ltCounter.reset(new ForeverCounter());
00152 ltInputCounters_[outputModuleId] = ltCounter;
00153 }
00154 else {
00155 ltCounter = ltIter->second;
00156 }
00157
00158 stIter = stInputCounters_.find(outputModuleId);
00159 if (stIter == stInputCounters_.end()) {
00160 stCounter.reset(new RollingIntervalCounter(180,5,20));
00161 stInputCounters_[outputModuleId] = stCounter;
00162 }
00163 else {
00164 stCounter = stIter->second;
00165 }
00166 ltCounter->addSample(sizeInMB);
00167 stCounter->addSample(sizeInMB, now);
00168
00169
00170 if (outputModuleId != hltOutputModuleId_) {
00171
00172 insideTimer_.stop();
00173 longTermInsideCPUTimeCounter_->addSample(insideTimer_.cpuTime());
00174 shortTermInsideCPUTimeCounter_->addSample(insideTimer_.cpuTime(), now);
00175 longTermInsideRealTimeCounter_->addSample(insideTimer_.realTime());
00176 shortTermInsideRealTimeCounter_->addSample(insideTimer_.realTime(), now);
00177 longTermOutsideCPUTimeCounter_->addSample(outsideTimer_.cpuTime());
00178 shortTermOutsideCPUTimeCounter_->addSample(outsideTimer_.cpuTime(), now);
00179 longTermOutsideRealTimeCounter_->addSample(outsideTimer_.realTime());
00180 shortTermOutsideRealTimeCounter_->addSample(outsideTimer_.realTime(), now);
00181 outsideTimer_.reset();
00182 insideTimer_.reset();
00183 outsideTimer_.start();
00184 return;
00185 }
00186
00187
00188
00189 if (! runFairShareAlgo_) {
00190 double eventRate = stCounter->getSampleRate(now);
00191 double dataRate = stCounter->getValueRate(now);
00192 double eventRatePrescale = eventRate / maxEventRate_;
00193 double dataRatePrescale = dataRate / maxDataRate_;
00194 double effectivePrescale = std::max(eventRatePrescale, dataRatePrescale);
00195 if (effectivePrescale > 1.0) {
00196 double instantRatio = 1.0 / effectivePrescale;
00197 double randValue = (*generator_)();
00198 if (randValue > instantRatio) {
00199
00200 insideTimer_.stop();
00201 longTermInsideCPUTimeCounter_->addSample(insideTimer_.cpuTime());
00202 shortTermInsideCPUTimeCounter_->addSample(insideTimer_.cpuTime(), now);
00203 longTermInsideRealTimeCounter_->addSample(insideTimer_.realTime());
00204 shortTermInsideRealTimeCounter_->addSample(insideTimer_.realTime(), now);
00205 longTermOutsideCPUTimeCounter_->addSample(outsideTimer_.cpuTime());
00206 shortTermOutsideCPUTimeCounter_->addSample(outsideTimer_.cpuTime(), now);
00207 longTermOutsideRealTimeCounter_->addSample(outsideTimer_.realTime());
00208 shortTermOutsideRealTimeCounter_->addSample(outsideTimer_.realTime(), now);
00209 outsideTimer_.reset();
00210 insideTimer_.reset();
00211 outsideTimer_.start();
00212 return;
00213 }
00214 }
00215 }
00216
00217
00218
00219
00220
00221
00222
00223 std::vector<uint32> candidateList;
00224 boost::shared_ptr< vector<char> > bufPtr;
00225 std::map< uint32, boost::shared_ptr<ConsumerPipe> >::const_iterator consIter;
00226 for (consIter = consumerTable_.begin();
00227 consIter != consumerTable_.end();
00228 consIter++)
00229 {
00230
00231 boost::shared_ptr<ConsumerPipe> consPipe = consIter->second;
00232 FDEBUG(5) << "Checking if consumer " << consPipe->getConsumerId() <<
00233 " wants event " << eventView.event() << std::endl;
00234 if (consPipe->wantsEvent(eventView) &&
00235 consPipe->isReadyForEvent(now))
00236 {
00237 candidateList.push_back(consPipe->getConsumerId());
00238 consPipe->wasConsidered(now);
00239 }
00240 }
00241
00242
00243 if (candidateList.size() == 0) {
00244
00245 insideTimer_.stop();
00246 longTermInsideCPUTimeCounter_->addSample(insideTimer_.cpuTime());
00247 shortTermInsideCPUTimeCounter_->addSample(insideTimer_.cpuTime(), now);
00248 longTermInsideRealTimeCounter_->addSample(insideTimer_.realTime());
00249 shortTermInsideRealTimeCounter_->addSample(insideTimer_.realTime(), now);
00250 longTermOutsideCPUTimeCounter_->addSample(outsideTimer_.cpuTime());
00251 shortTermOutsideCPUTimeCounter_->addSample(outsideTimer_.cpuTime(), now);
00252 longTermOutsideRealTimeCounter_->addSample(outsideTimer_.realTime());
00253 shortTermOutsideRealTimeCounter_->addSample(outsideTimer_.realTime(), now);
00254 outsideTimer_.reset();
00255 insideTimer_.reset();
00256 outsideTimer_.start();
00257 return;
00258 }
00259
00260
00261
00262 std::vector<uint32> allowedList;
00263
00264
00265
00266
00267
00268 allowedList = candidateList;
00269
00270
00271
00272 for (uint32 idx = 0; idx < allowedList.size(); ++idx)
00273 {
00274 uint32 consumerId = allowedList[idx];
00275
00276
00277 if (bufPtr.get() == NULL)
00278 {
00279 FDEBUG(5) << "Creating a buffer for event " <<
00280 eventView.event() << std::endl;
00281
00282
00283 boost::shared_ptr< vector<char> >
00284 tmpBufPtr(new vector<char>(eventView.size()));
00285
00286
00287 unsigned char *target = (unsigned char *) &(*tmpBufPtr)[0];
00288 unsigned char *source = eventView.startAddress();
00289 int dataSize = eventView.size();
00290 std::copy(source, source+dataSize, target);
00291
00292
00293 bufPtr.swap(tmpBufPtr);
00294
00295
00296 ltIter = ltAcceptCounters_.find(outputModuleId);
00297 if (ltIter == ltAcceptCounters_.end()) {
00298 ltCounter.reset(new ForeverCounter());
00299 ltAcceptCounters_[outputModuleId] = ltCounter;
00300 }
00301 else {
00302 ltCounter = ltIter->second;
00303 }
00304 stIter = stAcceptCounters_.find(outputModuleId);
00305 if (stIter == stAcceptCounters_.end()) {
00306 stCounter.reset(new RollingIntervalCounter(180,5,20));
00307 stAcceptCounters_[outputModuleId] = stCounter;
00308 }
00309 else {
00310 stCounter = stIter->second;
00311 }
00312 ltCounter->addSample(sizeInMB);
00313 stCounter->addSample(sizeInMB, now);
00314 }
00315
00316
00317 boost::shared_ptr<ConsumerPipe> consPipe = getConsumer(consumerId);
00318 consPipe->putEvent(bufPtr);
00319
00320
00321
00322
00323
00324
00325 ltIter = ltOutputCounters_.find(outputModuleId);
00326 if (ltIter == ltOutputCounters_.end()) {
00327 ltCounter.reset(new ForeverCounter());
00328 ltOutputCounters_[outputModuleId] = ltCounter;
00329 }
00330 else {
00331 ltCounter = ltIter->second;
00332 }
00333 stIter = stOutputCounters_.find(outputModuleId);
00334 if (stIter == stOutputCounters_.end()) {
00335 stCounter.reset(new RollingIntervalCounter(180,5,20));
00336 stOutputCounters_[outputModuleId] = stCounter;
00337 }
00338 else {
00339 stCounter = stIter->second;
00340 }
00341 ltCounter->addSample(sizeInMB);
00342 stCounter->addSample(sizeInMB, now);
00343 }
00344
00345
00346 disconnectedConsumerTestCounter_++;
00347 if (disconnectedConsumerTestCounter_ >= 500)
00348 {
00349
00350 disconnectedConsumerTestCounter_ = 0;
00351
00352
00353 std::vector<uint32> disconnectList;
00354 std::map< uint32, boost::shared_ptr<ConsumerPipe> >::const_iterator consIter;
00355 for (consIter = consumerTable_.begin();
00356 consIter != consumerTable_.end();
00357 consIter++)
00358 {
00359 boost::shared_ptr<ConsumerPipe> consPipe = consIter->second;
00360 FDEBUG(5) << "Checking if consumer " << consPipe->getConsumerId() <<
00361 " has disconnected " << std::endl;
00362 if (consPipe->isDisconnected())
00363 {
00364 disconnectList.push_back(consIter->first);
00365 }
00366 }
00367
00368
00369 std::vector<uint32>::const_iterator listIter;
00370 for (listIter = disconnectList.begin();
00371 listIter != disconnectList.end();
00372 listIter++)
00373 {
00374 uint32 consumerId = *listIter;
00375 consumerTable_.erase(consumerId);
00376
00377
00378
00379
00380 }
00381 }
00382
00383
00384 now = BaseCounter::getCurrentTime();
00385 insideTimer_.stop();
00386 longTermInsideCPUTimeCounter_->addSample(insideTimer_.cpuTime());
00387 shortTermInsideCPUTimeCounter_->addSample(insideTimer_.cpuTime(), now);
00388 longTermInsideRealTimeCounter_->addSample(insideTimer_.realTime());
00389 shortTermInsideRealTimeCounter_->addSample(insideTimer_.realTime(), now);
00390 longTermOutsideCPUTimeCounter_->addSample(outsideTimer_.cpuTime());
00391 shortTermOutsideCPUTimeCounter_->addSample(outsideTimer_.cpuTime(), now);
00392 longTermOutsideRealTimeCounter_->addSample(outsideTimer_.realTime());
00393 shortTermOutsideRealTimeCounter_->addSample(outsideTimer_.realTime(), now);
00394 outsideTimer_.reset();
00395 insideTimer_.reset();
00396 outsideTimer_.start();
00397 }
00398
00402 boost::shared_ptr< std::vector<char> > EventServer::getEvent(uint32 consumerId)
00403 {
00404
00405 boost::shared_ptr< vector<char> > bufPtr;
00406
00407
00408 std::map< uint32, boost::shared_ptr<ConsumerPipe> >::const_iterator consIter;
00409 consIter = consumerTable_.find(consumerId);
00410 if (consIter != consumerTable_.end())
00411 {
00412 boost::shared_ptr<ConsumerPipe> consPipe = consIter->second;
00413 bufPtr = consPipe->getEvent();
00414 }
00415
00416
00417 return bufPtr;
00418 }
00419
00420 void EventServer::clearQueue()
00421 {
00422 std::map< uint32, boost::shared_ptr<ConsumerPipe> >::const_iterator consIter;
00423 for (consIter = consumerTable_.begin();
00424 consIter != consumerTable_.end();
00425 consIter++)
00426 {
00427 boost::shared_ptr<ConsumerPipe> consPipe = consIter->second;
00428 consPipe->clearQueue();
00429 }
00430 }
00431
00432 void EventServer::setStreamSelectionTable(std::map<std::string, Strings> const& selTable)
00433 {
00434 streamSelectionTable_ = selTable;
00435 selTableStringSize_ = 0;
00436 std::map<std::string, Strings>::const_iterator mapIter;
00437 for (mapIter = selTable.begin(); mapIter != selTable.end(); mapIter++)
00438 {
00439 std::string streamLabel = mapIter->first;
00440 selTableStringSize_ += streamLabel.size();
00441 Strings selectionList = mapIter->second;
00442 for (unsigned int idx = 0; idx < selectionList.size(); idx++)
00443 {
00444 std::string selection = selectionList[idx];
00445 selTableStringSize_ += selection.size();
00446 }
00447 }
00448 }
00449
00450 Strings EventServer::updateTriggerSelectionForStreams(Strings const& selectionList)
00451 {
00452 Strings modifiedList;
00453 for (unsigned int idx = 0; idx < selectionList.size(); idx++) {
00454 std::string selection = selectionList[idx];
00455 std::string lcSelection = boost::algorithm::to_lower_copy(selection);
00456 if (lcSelection.find("stream", 0) == 0) {
00457 std::string streamLabel = selection.substr(6);
00458 std::map<std::string, Strings>::const_iterator mapIter =
00459 streamSelectionTable_.find(streamLabel);
00460 if (mapIter != streamSelectionTable_.end()) {
00461 Strings streamSelectionList = mapIter->second;
00462 for (unsigned int jdx = 0; jdx < streamSelectionList.size(); jdx++) {
00463 modifiedList.push_back(streamSelectionList.at(jdx));
00464 }
00465 }
00466 else {
00467 modifiedList.push_back(selection);
00468 }
00469 }
00470 else {
00471 modifiedList.push_back(selection);
00472 }
00473 }
00474 return modifiedList;
00475 }
00476
00481 long long EventServer::getEventCount(STATS_TIME_FRAME timeFrame,
00482 STATS_SAMPLE_TYPE sampleType,
00483 uint32 outputModuleId,
00484 double currentTime)
00485 {
00486 boost::shared_ptr<ForeverCounter> ltCounter;
00487 boost::shared_ptr<RollingIntervalCounter> stCounter;
00488 std::map<uint32, boost::shared_ptr<ForeverCounter> >::iterator ltIter;
00489 std::map<uint32, boost::shared_ptr<RollingIntervalCounter> >::iterator stIter;
00490
00491 if (timeFrame == SHORT_TERM_STATS) {
00492 if (sampleType == INPUT_STATS) {
00493 stIter = stInputCounters_.find(outputModuleId);
00494 if (stIter != stInputCounters_.end()) {
00495 stCounter = stIter->second;
00496 return stCounter->getSampleCount(currentTime);
00497 }
00498 else {
00499 return 0;
00500 }
00501 }
00502 else if (sampleType == UNIQUE_ACCEPT_STATS) {
00503 stIter = stAcceptCounters_.find(outputModuleId);
00504 if (stIter != stAcceptCounters_.end()) {
00505 stCounter = stIter->second;
00506 return stCounter->getSampleCount(currentTime);
00507 }
00508 else {
00509 return 0;
00510 }
00511 }
00512 else {
00513 stIter = stOutputCounters_.find(outputModuleId);
00514 if (stIter != stOutputCounters_.end()) {
00515 stCounter = stIter->second;
00516 return stCounter->getSampleCount(currentTime);
00517 }
00518 else {
00519 return 0;
00520 }
00521 }
00522 }
00523 else {
00524 if (sampleType == INPUT_STATS) {
00525 ltIter = ltInputCounters_.find(outputModuleId);
00526 if (ltIter != ltInputCounters_.end()) {
00527 ltCounter = ltIter->second;
00528 return ltCounter->getSampleCount();
00529 }
00530 else {
00531 return 0;
00532 }
00533 }
00534 else if (sampleType == UNIQUE_ACCEPT_STATS) {
00535 ltIter = ltAcceptCounters_.find(outputModuleId);
00536 if (ltIter != ltAcceptCounters_.end()) {
00537 ltCounter = ltIter->second;
00538 return ltCounter->getSampleCount();
00539 }
00540 else {
00541 return 0;
00542 }
00543 }
00544 else {
00545 ltIter = ltOutputCounters_.find(outputModuleId);
00546 if (ltIter != ltOutputCounters_.end()) {
00547 ltCounter = ltIter->second;
00548 return ltCounter->getSampleCount();
00549 }
00550 else {
00551 return 0;
00552 }
00553 }
00554 }
00555 }
00556
00561 double EventServer::getEventRate(STATS_TIME_FRAME timeFrame,
00562 STATS_SAMPLE_TYPE sampleType,
00563 uint32 outputModuleId,
00564 double currentTime)
00565 {
00566 boost::shared_ptr<ForeverCounter> ltCounter;
00567 boost::shared_ptr<RollingIntervalCounter> stCounter;
00568 std::map<uint32, boost::shared_ptr<ForeverCounter> >::iterator ltIter;
00569 std::map<uint32, boost::shared_ptr<RollingIntervalCounter> >::iterator stIter;
00570
00571 if (timeFrame == SHORT_TERM_STATS) {
00572 if (sampleType == INPUT_STATS) {
00573 stIter = stInputCounters_.find(outputModuleId);
00574 if (stIter != stInputCounters_.end()) {
00575 stCounter = stIter->second;
00576 return stCounter->getSampleRate(currentTime);
00577 }
00578 else {
00579 return 0;
00580 }
00581 }
00582 else if (sampleType == UNIQUE_ACCEPT_STATS) {
00583 stIter = stAcceptCounters_.find(outputModuleId);
00584 if (stIter != stAcceptCounters_.end()) {
00585 stCounter = stIter->second;
00586 return stCounter->getSampleRate(currentTime);
00587 }
00588 else {
00589 return 0;
00590 }
00591 }
00592 else {
00593 stIter = stOutputCounters_.find(outputModuleId);
00594 if (stIter != stOutputCounters_.end()) {
00595 stCounter = stIter->second;
00596 return stCounter->getSampleRate(currentTime);
00597 }
00598 else {
00599 return 0;
00600 }
00601 }
00602 }
00603 else {
00604 if (sampleType == INPUT_STATS) {
00605 ltIter = ltInputCounters_.find(outputModuleId);
00606 if (ltIter != ltInputCounters_.end()) {
00607 ltCounter = ltIter->second;
00608 return ltCounter->getSampleRate(currentTime);
00609 }
00610 else {
00611 return 0;
00612 }
00613 }
00614 else if (sampleType == UNIQUE_ACCEPT_STATS) {
00615 ltIter = ltAcceptCounters_.find(outputModuleId);
00616 if (ltIter != ltAcceptCounters_.end()) {
00617 ltCounter = ltIter->second;
00618 return ltCounter->getSampleRate(currentTime);
00619 }
00620 else {
00621 return 0;
00622 }
00623 }
00624 else {
00625 ltIter = ltOutputCounters_.find(outputModuleId);
00626 if (ltIter != ltOutputCounters_.end()) {
00627 ltCounter = ltIter->second;
00628 return ltCounter->getSampleRate(currentTime);
00629 }
00630 else {
00631 return 0;
00632 }
00633 }
00634 }
00635 }
00636
00641 double EventServer::getDataRate(STATS_TIME_FRAME timeFrame,
00642 STATS_SAMPLE_TYPE sampleType,
00643 uint32 outputModuleId,
00644 double currentTime)
00645 {
00646 boost::shared_ptr<ForeverCounter> ltCounter;
00647 boost::shared_ptr<RollingIntervalCounter> stCounter;
00648 std::map<uint32, boost::shared_ptr<ForeverCounter> >::iterator ltIter;
00649 std::map<uint32, boost::shared_ptr<RollingIntervalCounter> >::iterator stIter;
00650
00651 if (timeFrame == SHORT_TERM_STATS) {
00652 if (sampleType == INPUT_STATS) {
00653 stIter = stInputCounters_.find(outputModuleId);
00654 if (stIter != stInputCounters_.end()) {
00655 stCounter = stIter->second;
00656 return stCounter->getValueRate(currentTime);
00657 }
00658 else {
00659 return 0;
00660 }
00661 }
00662 else if (sampleType == UNIQUE_ACCEPT_STATS) {
00663 stIter = stAcceptCounters_.find(outputModuleId);
00664 if (stIter != stAcceptCounters_.end()) {
00665 stCounter = stIter->second;
00666 return stCounter->getValueRate(currentTime);
00667 }
00668 else {
00669 return 0;
00670 }
00671 }
00672 else {
00673 stIter = stOutputCounters_.find(outputModuleId);
00674 if (stIter != stOutputCounters_.end()) {
00675 stCounter = stIter->second;
00676 return stCounter->getValueRate(currentTime);
00677 }
00678 else {
00679 return 0;
00680 }
00681 }
00682 }
00683 else {
00684 if (sampleType == INPUT_STATS) {
00685 ltIter = ltInputCounters_.find(outputModuleId);
00686 if (ltIter != ltInputCounters_.end()) {
00687 ltCounter = ltIter->second;
00688 return ltCounter->getValueRate(currentTime);
00689 }
00690 else {
00691 return 0;
00692 }
00693 }
00694 else if (sampleType == UNIQUE_ACCEPT_STATS) {
00695 ltIter = ltAcceptCounters_.find(outputModuleId);
00696 if (ltIter != ltAcceptCounters_.end()) {
00697 ltCounter = ltIter->second;
00698 return ltCounter->getValueRate(currentTime);
00699 }
00700 else {
00701 return 0;
00702 }
00703 }
00704 else {
00705 ltIter = ltOutputCounters_.find(outputModuleId);
00706 if (ltIter != ltOutputCounters_.end()) {
00707 ltCounter = ltIter->second;
00708 return ltCounter->getValueRate(currentTime);
00709 }
00710 else {
00711 return 0;
00712 }
00713 }
00714 }
00715 }
00716
00723 double EventServer::getDuration(STATS_TIME_FRAME timeFrame,
00724 STATS_SAMPLE_TYPE sampleType,
00725 uint32 outputModuleId,
00726 double currentTime)
00727 {
00728 boost::shared_ptr<ForeverCounter> ltCounter;
00729 boost::shared_ptr<RollingIntervalCounter> stCounter;
00730 std::map<uint32, boost::shared_ptr<ForeverCounter> >::iterator ltIter;
00731 std::map<uint32, boost::shared_ptr<RollingIntervalCounter> >::iterator stIter;
00732
00733 if (timeFrame == SHORT_TERM_STATS) {
00734 if (sampleType == INPUT_STATS) {
00735 stIter = stInputCounters_.find(outputModuleId);
00736 if (stIter != stInputCounters_.end()) {
00737 stCounter = stIter->second;
00738 return stCounter->getDuration(currentTime);
00739 }
00740 else {
00741 return 0;
00742 }
00743 }
00744 else if (sampleType == UNIQUE_ACCEPT_STATS) {
00745 stIter = stAcceptCounters_.find(outputModuleId);
00746 if (stIter != stAcceptCounters_.end()) {
00747 stCounter = stIter->second;
00748 return stCounter->getDuration(currentTime);
00749 }
00750 else {
00751 return 0;
00752 }
00753 }
00754 else {
00755 stIter = stOutputCounters_.find(outputModuleId);
00756 if (stIter != stOutputCounters_.end()) {
00757 stCounter = stIter->second;
00758 return stCounter->getDuration(currentTime);
00759 }
00760 else {
00761 return 0;
00762 }
00763 }
00764 }
00765 else {
00766 if (sampleType == INPUT_STATS) {
00767 ltIter = ltInputCounters_.find(outputModuleId);
00768 if (ltIter != ltInputCounters_.end()) {
00769 ltCounter = ltIter->second;
00770 return ltCounter->getDuration(currentTime);
00771 }
00772 else {
00773 return 0;
00774 }
00775 }
00776 else if (sampleType == UNIQUE_ACCEPT_STATS) {
00777 ltIter = ltAcceptCounters_.find(outputModuleId);
00778 if (ltIter != ltAcceptCounters_.end()) {
00779 ltCounter = ltIter->second;
00780 return ltCounter->getDuration(currentTime);
00781 }
00782 else {
00783 return 0;
00784 }
00785 }
00786 else {
00787 ltIter = ltOutputCounters_.find(outputModuleId);
00788 if (ltIter != ltOutputCounters_.end()) {
00789 ltCounter = ltIter->second;
00790 return ltCounter->getDuration(currentTime);
00791 }
00792 else {
00793 return 0;
00794 }
00795 }
00796 }
00797 }
00798
00804 double EventServer::getInternalTime(STATS_TIME_FRAME timeFrame,
00805 STATS_TIMING_TYPE timingType,
00806 double currentTime)
00807 {
00808 if (timeFrame == SHORT_TERM_STATS) {
00809 if (timingType == CPUTIME) {
00810 return shortTermInsideCPUTimeCounter_->getValueSum(currentTime);
00811 }
00812 else {
00813 return shortTermInsideRealTimeCounter_->getValueSum(currentTime);
00814 }
00815 }
00816 else {
00817 if (timingType == CPUTIME) {
00818 return longTermInsideCPUTimeCounter_->getValueSum();
00819 }
00820 else {
00821 return longTermInsideRealTimeCounter_->getValueSum();
00822 }
00823 }
00824 }
00825
00831 double EventServer::getTotalTime(STATS_TIME_FRAME timeFrame,
00832 STATS_TIMING_TYPE timingType,
00833 double currentTime)
00834 {
00835 if (timeFrame == SHORT_TERM_STATS) {
00836 if (timingType == CPUTIME) {
00837 double insideTime =
00838 shortTermInsideCPUTimeCounter_->getValueSum(currentTime);
00839 double outsideTime =
00840 shortTermOutsideCPUTimeCounter_->getValueSum(currentTime);
00841 return (insideTime + outsideTime);
00842 }
00843 else {
00844 double insideTime =
00845 shortTermInsideRealTimeCounter_->getValueSum(currentTime);
00846 double outsideTime =
00847 shortTermOutsideRealTimeCounter_->getValueSum(currentTime);
00848 return (insideTime + outsideTime);
00849 }
00850 }
00851 else {
00852 if (timingType == CPUTIME) {
00853 double insideTime =
00854 longTermInsideCPUTimeCounter_->getValueSum();
00855 double outsideTime =
00856 longTermOutsideCPUTimeCounter_->getValueSum();
00857 return (insideTime + outsideTime);
00858 }
00859 else {
00860 double insideTime =
00861 longTermInsideRealTimeCounter_->getValueSum();
00862 double outsideTime =
00863 longTermOutsideRealTimeCounter_->getValueSum();
00864 return (insideTime + outsideTime);
00865 }
00866 }
00867 }
00868
00874 double EventServer::getTimeFraction(STATS_TIME_FRAME timeFrame,
00875 STATS_TIMING_TYPE timingType,
00876 double currentTime)
00877 {
00878 if (timeFrame == SHORT_TERM_STATS) {
00879 if (timingType == CPUTIME) {
00880 double insideTime =
00881 shortTermInsideCPUTimeCounter_->getValueSum(currentTime);
00882 double outsideTime =
00883 shortTermOutsideCPUTimeCounter_->getValueSum(currentTime);
00884 if (outsideTime > 0.0) {
00885 return (insideTime / (insideTime + outsideTime));
00886 }
00887 else {
00888 return 0.0;
00889 }
00890 }
00891 else {
00892 double insideTime =
00893 shortTermInsideRealTimeCounter_->getValueSum(currentTime);
00894 double outsideTime =
00895 shortTermOutsideRealTimeCounter_->getValueSum(currentTime);
00896 if (outsideTime > 0.0) {
00897 return (insideTime / (insideTime + outsideTime));
00898 }
00899 else {
00900 return 0.0;
00901 }
00902 }
00903 }
00904 else {
00905 if (timingType == CPUTIME) {
00906 double insideTime =
00907 longTermInsideCPUTimeCounter_->getValueSum();
00908 double outsideTime =
00909 longTermOutsideCPUTimeCounter_->getValueSum();
00910 if (outsideTime > 0.0) {
00911 return (insideTime / (insideTime + outsideTime));
00912 }
00913 else {
00914 return 0.0;
00915 }
00916 }
00917 else {
00918 double insideTime =
00919 longTermInsideRealTimeCounter_->getValueSum();
00920 double outsideTime =
00921 longTermOutsideRealTimeCounter_->getValueSum();
00922 if (outsideTime > 0.0) {
00923 return (insideTime / (insideTime + outsideTime));
00924 }
00925 else {
00926 return 0.0;
00927 }
00928 }
00929 }
00930 }