#include <EventFilter/SMProxyServer/interface/DataProcessManager.h>
Definition at line 28 of file DataProcessManager.h.
typedef std::vector<char> stor::DataProcessManager::Buf |
Definition at line 34 of file DataProcessManager.h.
typedef std::map<std::string, bool> stor::DataProcessManager::HeaderConsumer_map |
Definition at line 36 of file DataProcessManager.h.
typedef std::map<std::string, struct timeval> stor::DataProcessManager::LastReqTime_map |
Definition at line 37 of file DataProcessManager.h.
typedef std::map<std::string, unsigned int> stor::DataProcessManager::RegConsumer_map |
Definition at line 35 of file DataProcessManager.h.
Definition at line 32 of file DataProcessManager.h.
00032 { EVENT_FETCH = 10, DQMEVENT_FETCH = 11 };
stor::DataProcessManager::DataProcessManager | ( | ) |
Definition at line 31 of file DataProcessManager.cc.
References init(), and pmeter_.
00031 : 00032 cmd_q_(edm::getEventBuffer(voidptr_size,50)), 00033 alreadyRegistered_(false), 00034 alreadyRegisteredDQM_(false), 00035 headerRefetchRequested_(false), 00036 buf_(2000), 00037 headerRetryInterval_(5), 00038 dqmServiceManager_(new stor::DQMServiceManager()), 00039 receivedEvents_(0), 00040 receivedDQMEvents_(0), 00041 samples_(100), 00042 period4samples_(5) 00043 { 00044 // for performance measurements 00045 pmeter_ = new stor::SMPerformanceMeter(); 00046 init(); 00047 }
stor::DataProcessManager::~DataProcessManager | ( | ) |
Definition at line 49 of file DataProcessManager.cc.
References pmeter_.
00050 { 00051 delete pmeter_; 00052 }
void stor::DataProcessManager::addDQMSM2Register | ( | std::string | DQMsmURL | ) |
Definition at line 332 of file DataProcessManager.cc.
References DQMsmList_, DQMsmRegMap_, i, and lastDQMReqMap_.
00333 { 00334 // Check if already in the list 00335 bool alreadyInList = false; 00336 if(DQMsmList_.size() > 0) { 00337 for(unsigned int i = 0; i < DQMsmList_.size(); ++i) { 00338 if(DQMsmURL.compare(DQMsmList_[i]) == 0) { 00339 alreadyInList = true; 00340 break; 00341 } 00342 } 00343 } 00344 if(alreadyInList) return; 00345 DQMsmList_.push_back(DQMsmURL); 00346 DQMsmRegMap_.insert(std::make_pair(DQMsmURL,0)); 00347 struct timeval lastRequestTime; 00348 lastRequestTime.tv_sec = 0; 00349 lastRequestTime.tv_usec = 0; 00350 lastDQMReqMap_.insert(std::make_pair(DQMsmURL,lastRequestTime)); 00351 }
void stor::DataProcessManager::addMeasurement | ( | unsigned long | size | ) |
Definition at line 1154 of file DataProcessManager.cc.
References stor::SMPerformanceMeter::addSample(), stor::SMPerformanceMeter::getStats(), pmeter_, and stats_.
Referenced by getOneDQMEventFromSM(), and getOneEventFromSM().
01155 { 01156 // for bandwidth performance measurements 01157 if(pmeter_->addSample(size)) 01158 { 01159 stats_ = pmeter_->getStats(); 01160 } 01161 }
void stor::DataProcessManager::addSM2Register | ( | std::string | smURL | ) |
Definition at line 308 of file DataProcessManager.cc.
References i, lastReqMap_, smHeaderMap_, smList_, smRegMap_, and updateMinEventRequestInterval().
00309 { 00310 // This smURL is the URN of the StorageManager without the page extension 00311 // Check if already in the list 00312 bool alreadyInList = false; 00313 if(smList_.size() > 0) { 00314 for(unsigned int i = 0; i < smList_.size(); ++i) { 00315 if(smURL.compare(smList_[i]) == 0) { 00316 alreadyInList = true; 00317 break; 00318 } 00319 } 00320 } 00321 if(alreadyInList) return; 00322 smList_.push_back(smURL); 00323 smRegMap_.insert(std::make_pair(smURL,0)); 00324 smHeaderMap_.insert(std::make_pair(smURL,false)); 00325 struct timeval lastRequestTime; 00326 lastRequestTime.tv_sec = 0; 00327 lastRequestTime.tv_usec = 0; 00328 lastReqMap_.insert(std::make_pair(smURL,lastRequestTime)); 00329 updateMinEventRequestInterval(); 00330 }
bool stor::DataProcessManager::getAnyHeaderFromSM | ( | ) | [private] |
Definition at line 556 of file DataProcessManager.cc.
References getHeaderFromSM(), i, smList_, and smRegMap_.
00557 { 00558 // Try the list of SM in order of registration to get one Header 00559 bool gotOneHeader = false; 00560 if(smList_.size() > 0) { 00561 for(unsigned int i = 0; i < smList_.size(); ++i) { 00562 if(smRegMap_[smList_[i] ] > 0) { 00563 bool success = getHeaderFromSM(smList_[i]); 00564 if(success) { // should cleam this up! 00565 gotOneHeader = true; 00566 return gotOneHeader; 00567 } 00568 } 00569 } 00570 } else { 00571 // this is a problem (but maybe not with non-blocking processing loop) 00572 return false; 00573 } 00574 return gotOneHeader; 00575 }
double stor::DataProcessManager::getAverageValue | ( | STATS_TIME_FRAME | timeFrame = SHORT_TERM , |
|
STATS_TIMING_TYPE | timingType = EVENT_FETCH , |
|||
double | currentTime = BaseCounter::getCurrentTime() | |||
) |
Definition at line 1185 of file DataProcessManager.cc.
References DQMEVENT_FETCH, ltDQMFetchTimeCounter_, ltEventFetchTimeCounter_, SHORT_TERM, stDQMFetchTimeCounter_, and stEventFetchTimeCounter_.
01188 { 01189 if (timeFrame == SHORT_TERM) { 01190 if (timingType == DQMEVENT_FETCH) { 01191 return stDQMFetchTimeCounter_->getValueAverage(currentTime); 01192 } 01193 else { 01194 return stEventFetchTimeCounter_->getValueAverage(currentTime); 01195 } 01196 } 01197 else { 01198 if (timingType == DQMEVENT_FETCH) { 01199 return ltDQMFetchTimeCounter_->getValueAverage(); 01200 } 01201 else { 01202 return ltEventFetchTimeCounter_->getValueAverage(); 01203 } 01204 } 01205 }
edm::EventBuffer& stor::DataProcessManager::getCommandQueue | ( | ) | [inline] |
void stor::DataProcessManager::getDQMEventFromAllSM | ( | ) | [private] |
Definition at line 984 of file DataProcessManager.cc.
References DQMsmRegMap_, getOneDQMEventFromSM(), i, and smList_.
Referenced by processCommands().
00985 { 00986 // Try the list of SM in order of registration to get one event 00987 // so long as we have the header from SM already 00988 if(smList_.size() > 0) { 00989 double time2wait = 0.0; 00990 double sleepTime = 300.0; 00991 bool gotOneEvent = false; 00992 bool gotOne = false; 00993 for(unsigned int i = 0; i < smList_.size(); ++i) { 00994 if(DQMsmRegMap_[smList_[i] ] > 0) { // is registered 00995 gotOne = getOneDQMEventFromSM(smList_[i], time2wait); 00996 if(gotOne) { 00997 gotOneEvent = true; 00998 } else { 00999 if(time2wait < sleepTime && time2wait >= 0.0) sleepTime = time2wait; 01000 } 01001 } 01002 } 01003 // check if we need to sleep (to enforce the allowed request rate) 01004 // we don't want to ping the StorageManager app too often 01005 // TODO fixme: Cannot sleep for DQM as this is a long time usually 01006 // and we block the event request poll if we sleep! 01007 // have to find out how to ensure the correct poll rate 01008 if(!gotOneEvent) { 01009 //if(sleepTime > 0.0) usleep(static_cast<int>(1000000 * sleepTime)); 01010 } 01011 } 01012 }
boost::shared_ptr<DQMEventServer>& stor::DataProcessManager::getDQMEventServer | ( | ) | [inline] |
Definition at line 94 of file DataProcessManager.h.
References DQMeventServer_.
00094 { return DQMeventServer_; }
boost::shared_ptr<stor::DQMServiceManager>& stor::DataProcessManager::getDQMServiceManager | ( | ) | [inline] |
Definition at line 95 of file DataProcessManager.h.
References dqmServiceManager_.
00095 { return dqmServiceManager_; }
double stor::DataProcessManager::getDQMTime2Wait | ( | std::string | smURL | ) | [private] |
Definition at line 1014 of file DataProcessManager.cc.
References lastDQMReqMap_, and minDQMEventRequestInterval_.
Referenced by getOneDQMEventFromSM().
01015 { 01016 // calculate time since last ping of this SM in seconds 01017 struct timeval now; 01018 struct timezone dummyTZ; 01019 gettimeofday(&now, &dummyTZ); 01020 double timeDiff = (double) now.tv_sec; 01021 timeDiff -= (double) lastDQMReqMap_[smURL].tv_sec; 01022 timeDiff += ((double) now.tv_usec / 1000000.0); 01023 timeDiff -= ((double) lastDQMReqMap_[smURL].tv_usec / 1000000.0); 01024 if (timeDiff < minDQMEventRequestInterval_) 01025 { 01026 return (minDQMEventRequestInterval_ - timeDiff); 01027 } 01028 else 01029 { 01030 return 0.0; 01031 } 01032 }
double stor::DataProcessManager::getDuration | ( | STATS_TIME_FRAME | timeFrame = SHORT_TERM , |
|
STATS_TIMING_TYPE | timingType = EVENT_FETCH , |
|||
double | currentTime = BaseCounter::getCurrentTime() | |||
) |
Definition at line 1207 of file DataProcessManager.cc.
References DQMEVENT_FETCH, ltDQMFetchTimeCounter_, ltEventFetchTimeCounter_, SHORT_TERM, stDQMFetchTimeCounter_, and stEventFetchTimeCounter_.
01210 { 01211 if (timeFrame == SHORT_TERM) { 01212 if (timingType == DQMEVENT_FETCH) { 01213 return stDQMFetchTimeCounter_->getDuration(currentTime); 01214 } 01215 else { 01216 return stEventFetchTimeCounter_->getDuration(currentTime); 01217 } 01218 } 01219 else { 01220 if (timingType == DQMEVENT_FETCH) { 01221 return ltDQMFetchTimeCounter_->getDuration(currentTime); 01222 } 01223 else { 01224 return ltEventFetchTimeCounter_->getDuration(currentTime); 01225 } 01226 } 01227 }
void stor::DataProcessManager::getEventFromAllSM | ( | ) | [private] |
Definition at line 784 of file DataProcessManager.cc.
References getOneEventFromSM(), haveHeader(), i, smList_, and smRegMap_.
Referenced by processCommands().
00785 { 00786 // Try the list of SM in order of registration to get one event 00787 // so long as we have the header from SM already 00788 if(smList_.size() > 0 && haveHeader()) { 00789 double time2wait = 0.0; 00790 double sleepTime = 300.0; 00791 bool gotOneEvent = false; 00792 bool gotOne = false; 00793 for(unsigned int i = 0; i < smList_.size(); ++i) { 00794 if(smRegMap_[smList_[i] ] > 0) { // is registered 00795 gotOne = getOneEventFromSM(smList_[i], time2wait); 00796 if(gotOne) { 00797 gotOneEvent = true; 00798 } else { 00799 if(time2wait < sleepTime && time2wait >= 0.0) sleepTime = time2wait; 00800 } 00801 } 00802 } 00803 // check if we need to sleep (to enforce the allowed request rate) 00804 // we don't want to ping the StorageManager app too often 00805 if(!gotOneEvent) { 00806 if(sleepTime > 0.0) usleep(static_cast<int>(1000000 * sleepTime)); 00807 } 00808 } 00809 }
boost::shared_ptr<EventServer>& stor::DataProcessManager::getEventServer | ( | ) | [inline] |
Definition at line 51 of file DataProcessManager.h.
References eventServer_.
00051 { return eventServer_; }
bool stor::DataProcessManager::getHeaderFromAllSM | ( | ) | [private] |
Definition at line 577 of file DataProcessManager.cc.
References getHeaderFromSM(), i, smHeaderMap_, smList_, and smRegMap_.
Referenced by processCommands().
00578 { 00579 // Try the list of SM in order of registration to get one Header from each 00580 // TODO: how do we get multiple headers if there are more than one? 00581 bool gotAllHeaders = true; 00582 if(smList_.size() > 0) { 00583 for(unsigned int i = 0; i < smList_.size(); ++i) { 00584 if(smRegMap_[smList_[i] ] > 0) { // is registered 00585 if(smHeaderMap_[smList_[i] ]) continue; // already got header 00586 bool success = getHeaderFromSM(smList_[i]); 00587 if(success) { 00588 smHeaderMap_[smList_[i] ] = true; 00589 } else { 00590 gotAllHeaders = false; 00591 } 00592 } else { 00593 gotAllHeaders = false; 00594 } 00595 } 00596 } else { 00597 return false; 00598 } 00599 return gotAllHeaders; 00600 }
bool stor::DataProcessManager::getHeaderFromSM | ( | std::string | smURL | ) | [private] |
Definition at line 602 of file DataProcessManager.cc.
References OtherMessageView::bodySize(), TestMuL1L2Filter_cff::cerr, HeaderView::code(), convert(), stor::ReadData::d_, data, lat::endl(), eventServer_, Exception, FDEBUG, stor::func(), Header::HEADER_REQUEST, headerpage_, Header::INIT, Header::INIT_SET, initMsgCollection_, len, OtherMessageBuilder::msgBody(), OtherMessageView::msgBody(), NULL, stor::setopt(), OtherMessageBuilder::size(), InitMsgView::size(), smRegMap_, and OtherMessageBuilder::startAddress().
Referenced by getAnyHeaderFromSM(), and getHeaderFromAllSM().
00603 { 00604 // One single try to get a header from this SM URL 00605 stor::ReadData data; 00606 00607 data.d_.clear(); 00608 CURL* han = curl_easy_init(); 00609 if(han==0) 00610 { 00611 edm::LogError("getHeaderFromSM") << "Could not create curl handle"; 00612 // this is a fatal error isn't it? Are we catching this? TODO 00613 throw cms::Exception("getHeaderFromSM","DataProcessManager") 00614 << "Unable to create curl handle\n"; 00615 } 00616 // set the standard https request options 00617 std::string url2use = smURL + headerpage_; 00618 setopt(han,CURLOPT_URL,url2use.c_str()); 00619 setopt(han,CURLOPT_WRITEFUNCTION,func); 00620 setopt(han,CURLOPT_WRITEDATA,&data); 00621 00622 // send our consumer ID as part of the header request 00623 char msgBuff[100]; 00624 OtherMessageBuilder requestMessage(&msgBuff[0], Header::HEADER_REQUEST, 00625 sizeof(char_uint32)); 00626 uint8 *bodyPtr = requestMessage.msgBody(); 00627 convert(smRegMap_[smURL], bodyPtr); 00628 setopt(han, CURLOPT_POSTFIELDS, requestMessage.startAddress()); 00629 setopt(han, CURLOPT_POSTFIELDSIZE, requestMessage.size()); 00630 struct curl_slist *headers=NULL; 00631 headers = curl_slist_append(headers, "Content-Type: application/octet-stream"); 00632 headers = curl_slist_append(headers, "Content-Transfer-Encoding: binary"); 00633 setopt(han, CURLOPT_HTTPHEADER, headers); 00634 00635 // send the HTTP POST, read the reply, and cleanup before going on 00636 CURLcode messageStatus = curl_easy_perform(han); 00637 curl_slist_free_all(headers); 00638 curl_easy_cleanup(han); 00639 00640 if(messageStatus!=0) 00641 { 00642 cerr << "curl perform failed for header" << endl; 00643 edm::LogError("getHeaderFromSM") << "curl perform failed for header. " 00644 << "Could not get header from an already registered Storage Manager" 00645 << " at " << smURL; 00646 return false; 00647 } 00648 if(data.d_.length() == 0) 00649 { 00650 return false; 00651 } 00652 00653 // rely on https transfer string of correct length! 00654 int len = data.d_.length(); 00655 FDEBUG(9) << "getHeaderFromSM received registry len = " << len << std::endl; 00656 00657 // check that we've received a valid INIT message 00658 // or a set of INIT messages. Save everything that we receive. 00659 bool addedNewInitMsg = false; 00660 try 00661 { 00662 HeaderView hdrView(&data.d_[0]); 00663 if (hdrView.code() == Header::INIT) 00664 { 00665 InitMsgView initView(&data.d_[0]); 00666 if (initMsgCollection_->addIfUnique(initView)) 00667 { 00668 addedNewInitMsg = true; 00669 } 00670 } 00671 else if (hdrView.code() == Header::INIT_SET) 00672 { 00673 OtherMessageView otherView(&data.d_[0]); 00674 bodyPtr = otherView.msgBody(); 00675 uint32 fullSize = otherView.bodySize(); 00676 while ((unsigned int) (bodyPtr-otherView.msgBody()) < fullSize) 00677 { 00678 InitMsgView initView(bodyPtr); 00679 if (initMsgCollection_->addIfUnique(initView)) 00680 { 00681 addedNewInitMsg = true; 00682 } 00683 bodyPtr += initView.size(); 00684 } 00685 } 00686 else 00687 { 00688 throw cms::Exception("getHeaderFromSM", "DataProcessManager"); 00689 } 00690 } 00691 catch (cms::Exception excpt) 00692 { 00693 const unsigned int MAX_DUMP_LENGTH = 1000; 00694 edm::LogError("getHeaderFromSM") << "========================================"; 00695 edm::LogError("getHeaderFromSM") << "Exception decoding the getRegistryData response!"; 00696 if (data.d_.length() <= MAX_DUMP_LENGTH) { 00697 edm::LogError("getHeaderFromSM") << "Here is the raw text that was returned:"; 00698 edm::LogError("getHeaderFromSM") << data.d_; 00699 } 00700 else { 00701 edm::LogError("getHeaderFromSM") << "Here are the first " << MAX_DUMP_LENGTH << 00702 " characters of the raw text that was returned:"; 00703 edm::LogError("getHeaderFromSM") << (data.d_.substr(0, MAX_DUMP_LENGTH)); 00704 } 00705 edm::LogError("getHeaderFromSM") << "========================================"; 00706 throw excpt; 00707 } 00708 00709 // check if any currently connected consumers did not specify 00710 // an HLT output module label and we now have multiple, different, 00711 // INIT messages. If so, we need to complain because the 00712 // SelectHLTOutput parameter needs to be specified when there 00713 // is more than one HLT output module (and correspondingly, more 00714 // than one INIT message) 00715 if (addedNewInitMsg && eventServer_.get() != NULL && 00716 initMsgCollection_->size() > 1) 00717 { 00718 std::map< uint32, boost::shared_ptr<ConsumerPipe> > consumerTable = 00719 eventServer_->getConsumerTable(); 00720 std::map< uint32, boost::shared_ptr<ConsumerPipe> >::const_iterator 00721 consumerIter; 00722 for (consumerIter = consumerTable.begin(); 00723 consumerIter != consumerTable.end(); 00724 consumerIter++) 00725 { 00726 boost::shared_ptr<ConsumerPipe> consPtr = consumerIter->second; 00727 00728 if (consPtr->getHLTOutputSelection().empty()) 00729 { 00730 // store a warning message in the consumer pipe to be 00731 // sent to the consumer at the next opportunity 00732 std::string errorString; 00733 errorString.append("ERROR: The configuration for this "); 00734 errorString.append("consumer does not specify an HLT output "); 00735 errorString.append("module.\nPlease specify one of the HLT "); 00736 errorString.append("output modules listed below as the "); 00737 errorString.append("SelectHLTOutput parameter "); 00738 errorString.append("in the InputSource configuration.\n"); 00739 errorString.append(initMsgCollection_->getSelectionHelpString()); 00740 errorString.append("\n"); 00741 consPtr->setRegistryWarning(errorString); 00742 } 00743 } 00744 } 00745 00746 return true; 00747 }
boost::shared_ptr<InitMsgCollection>& stor::DataProcessManager::getInitMsgCollection | ( | ) | [inline] |
Definition at line 57 of file DataProcessManager.h.
References initMsgCollection_.
00057 { return initMsgCollection_; }
bool stor::DataProcessManager::getOneDQMEventFromSM | ( | std::string | smURL, | |
double & | time2wait | |||
) | [private] |
Definition at line 1042 of file DataProcessManager.cc.
References addMeasurement(), buf_, TestMuL1L2Filter_cff::cerr, OtherMessageView::code(), convert(), GenMuonPlsPt100GeV_cfg::cout, stor::ReadData::d_, data, Header::DONE, Header::DQMEVENT_REQUEST, DQMeventpage_, dqmFetchTimer_, dqmServiceManager_, DQMsmRegMap_, lat::endl(), eventFetchTimer_, Exception, FDEBUG, stor::func(), getDQMTime2Wait(), i, len, ltDQMFetchTimeCounter_, OtherMessageBuilder::msgBody(), NULL, edm::CPUTimer::realTime(), receivedDQMEvents_, edm::CPUTimer::reset(), setDQMTime2Now(), stor::setopt(), OtherMessageBuilder::size(), edm::CPUTimer::start(), OtherMessageBuilder::startAddress(), stDQMFetchTimeCounter_, and edm::CPUTimer::stop().
Referenced by getDQMEventFromAllSM().
01043 { 01044 // See if we will exceed the request rate, if so just return false 01045 // Return values: 01046 // true = we have an event; false = no event (whatever reason) 01047 // time2wait values: 01048 // 0.0 = we pinged this SM this time; >0 = did not ping, wait this time 01049 // if every SM returns false we sleep some time 01050 time2wait = getDQMTime2Wait(smURL); 01051 if(time2wait > 0.0) { 01052 return false; 01053 } else { 01054 setDQMTime2Now(smURL); 01055 } 01056 01057 // One single try to get a event from this SM URL 01058 stor::ReadData data; 01059 01060 // start a measurement of how long the HTTP POST takes 01061 dqmFetchTimer_.stop(); 01062 dqmFetchTimer_.reset(); 01063 dqmFetchTimer_.start(); 01064 01065 data.d_.clear(); 01066 CURL* han = curl_easy_init(); 01067 if(han==0) 01068 { 01069 edm::LogError("getOneDQMEventFromSM") << "Could not create curl handle"; 01070 // this is a fatal error isn't it? Are we catching this? TODO 01071 throw cms::Exception("getOneDQMEventFromSM","DataProcessManager") 01072 << "Unable to create curl handle\n"; 01073 } 01074 // set the standard https request options 01075 std::string url2use = smURL + DQMeventpage_; 01076 setopt(han,CURLOPT_URL,url2use.c_str()); 01077 setopt(han,CURLOPT_WRITEFUNCTION,stor::func); 01078 setopt(han,CURLOPT_WRITEDATA,&data); 01079 01080 // send our consumer ID as part of the event request 01081 char msgBuff[100]; 01082 OtherMessageBuilder requestMessage(&msgBuff[0], Header::DQMEVENT_REQUEST, 01083 sizeof(char_uint32)); 01084 uint8 *bodyPtr = requestMessage.msgBody(); 01085 convert(DQMsmRegMap_[smURL], bodyPtr); 01086 setopt(han, CURLOPT_POSTFIELDS, requestMessage.startAddress()); 01087 setopt(han, CURLOPT_POSTFIELDSIZE, requestMessage.size()); 01088 struct curl_slist *headers=NULL; 01089 headers = curl_slist_append(headers, "Content-Type: application/octet-stream"); 01090 headers = curl_slist_append(headers, "Content-Transfer-Encoding: binary"); 01091 stor::setopt(han, CURLOPT_HTTPHEADER, headers); 01092 01093 // send the HTTP POST, read the reply, and cleanup before going on 01094 CURLcode messageStatus = curl_easy_perform(han); 01095 curl_slist_free_all(headers); 01096 curl_easy_cleanup(han); 01097 01098 if(messageStatus!=0) 01099 { 01100 cerr << "curl perform failed for DQM event" << endl; 01101 edm::LogError("getOneDQMEventFromSM") << "curl perform failed for DQM event. " 01102 << "Could not get DQMevent from an already registered Storage Manager" 01103 << " at " << smURL; 01104 01105 // keep statistics for all HTTP POSTS 01106 dqmFetchTimer_.stop(); 01107 ltDQMFetchTimeCounter_->addSample(eventFetchTimer_.realTime()); 01108 stDQMFetchTimeCounter_->addSample(eventFetchTimer_.realTime()); 01109 01110 return false; 01111 } 01112 01113 // rely on https transfer string of correct length! 01114 int len = data.d_.length(); 01115 FDEBUG(9) << "getOneDQMEventFromSM received len = " << len << std::endl; 01116 if(data.d_.length() == 0) 01117 { 01118 // keep statistics for all HTTP POSTS 01119 dqmFetchTimer_.stop(); 01120 ltDQMFetchTimeCounter_->addSample(eventFetchTimer_.realTime()); 01121 stDQMFetchTimeCounter_->addSample(eventFetchTimer_.realTime()); 01122 01123 return false; 01124 } 01125 01126 buf_.resize(len); 01127 for (int i=0; i<len ; i++) buf_[i] = data.d_[i]; 01128 01129 // keep statistics for all HTTP POSTS 01130 dqmFetchTimer_.stop(); 01131 ltDQMFetchTimeCounter_->addSample(eventFetchTimer_.realTime()); 01132 stDQMFetchTimeCounter_->addSample(eventFetchTimer_.realTime()); 01133 01134 // first check if done message 01135 OtherMessageView msgView(&buf_[0]); 01136 01137 if (msgView.code() == Header::DONE) { 01138 // TODO fixme:just print message for now 01139 std::cout << " SM " << smURL << " has halted" << std::endl; 01140 return false; 01141 } else { 01142 DQMEventMsgView dqmEventView(&buf_[0]); 01143 ++receivedDQMEvents_; 01144 addMeasurement((unsigned long)data.d_.length()); 01145 if(dqmServiceManager_.get() != NULL) { 01146 dqmServiceManager_->manageDQMEventMsg(dqmEventView); 01147 return true; 01148 } 01149 } 01150 return false; 01151 }
bool stor::DataProcessManager::getOneEventFromSM | ( | std::string | smURL, | |
double & | time2wait | |||
) | [private] |
Definition at line 839 of file DataProcessManager.cc.
References addMeasurement(), buf_, TestMuL1L2Filter_cff::cerr, HeaderView::code(), OtherMessageView::code(), convert(), GenMuonPlsPt100GeV_cfg::cout, stor::ReadData::d_, data, Header::DONE, lat::endl(), Header::EVENT, Header::EVENT_REQUEST, eventFetchTimer_, eventpage_, eventServer_, Exception, FDEBUG, stor::func(), getTime2Wait(), headerRefetchRequested_, i, initMsgCollection_, len, ltEventFetchTimeCounter_, OtherMessageBuilder::msgBody(), Header::NEW_INIT_AVAILABLE, NULL, edm::CPUTimer::realTime(), receivedEvents_, edm::CPUTimer::reset(), stor::setopt(), setTime2Now(), OtherMessageBuilder::size(), smRegMap_, edm::CPUTimer::start(), OtherMessageBuilder::startAddress(), stEventFetchTimeCounter_, and edm::CPUTimer::stop().
Referenced by getEventFromAllSM().
00840 { 00841 // See if we will exceed the request rate, if so just return false 00842 // Return values: 00843 // true = we have an event; false = no event (whatever reason) 00844 // time2wait values: 00845 // 0.0 = we pinged this SM this time; >0 = did not ping, wait this time 00846 // if every SM returns false we sleep some time 00847 time2wait = getTime2Wait(smURL); 00848 if(time2wait > 0.0) { 00849 return false; 00850 } else { 00851 setTime2Now(smURL); 00852 } 00853 00854 // One single try to get a event from this SM URL 00855 stor::ReadData data; 00856 00857 // start a measurement of how long the HTTP POST takes 00858 eventFetchTimer_.stop(); 00859 eventFetchTimer_.reset(); 00860 eventFetchTimer_.start(); 00861 00862 data.d_.clear(); 00863 CURL* han = curl_easy_init(); 00864 if(han==0) 00865 { 00866 edm::LogError("getOneEventFromSM") << "Could not create curl handle"; 00867 // this is a fatal error isn't it? Are we catching this? TODO 00868 throw cms::Exception("getOneEventFromSM","DataProcessManager") 00869 << "Unable to create curl handle\n"; 00870 } 00871 // set the standard https request options 00872 std::string url2use = smURL + eventpage_; 00873 setopt(han,CURLOPT_URL,url2use.c_str()); 00874 setopt(han,CURLOPT_WRITEFUNCTION,stor::func); 00875 setopt(han,CURLOPT_WRITEDATA,&data); 00876 00877 // send our consumer ID as part of the event request 00878 // The event request body consists of the consumerId and the 00879 // number of INIT messages in our collection. The latter is used 00880 // to determine if we need to re-fetch the INIT message collection. 00881 char msgBuff[100]; 00882 OtherMessageBuilder requestMessage(&msgBuff[0], Header::EVENT_REQUEST, 00883 2 * sizeof(char_uint32)); 00884 uint8 *bodyPtr = requestMessage.msgBody(); 00885 convert(smRegMap_[smURL], bodyPtr); 00886 bodyPtr += sizeof(char_uint32); 00887 convert((uint32) initMsgCollection_->size(), bodyPtr); 00888 setopt(han, CURLOPT_POSTFIELDS, requestMessage.startAddress()); 00889 setopt(han, CURLOPT_POSTFIELDSIZE, requestMessage.size()); 00890 struct curl_slist *headers=NULL; 00891 headers = curl_slist_append(headers, "Content-Type: application/octet-stream"); 00892 headers = curl_slist_append(headers, "Content-Transfer-Encoding: binary"); 00893 stor::setopt(han, CURLOPT_HTTPHEADER, headers); 00894 00895 // send the HTTP POST, read the reply, and cleanup before going on 00896 CURLcode messageStatus = curl_easy_perform(han); 00897 curl_slist_free_all(headers); 00898 curl_easy_cleanup(han); 00899 00900 if(messageStatus!=0) 00901 { 00902 cerr << "curl perform failed for event" << endl; 00903 edm::LogError("getOneEventFromSM") << "curl perform failed for event. " 00904 << "Could not get event from an already registered Storage Manager" 00905 << " at " << smURL; 00906 00907 // keep statistics for all HTTP POSTS 00908 eventFetchTimer_.stop(); 00909 ltEventFetchTimeCounter_->addSample(eventFetchTimer_.realTime()); 00910 stEventFetchTimeCounter_->addSample(eventFetchTimer_.realTime()); 00911 00912 return false; 00913 } 00914 00915 // rely on https transfer string of correct length! 00916 int len = data.d_.length(); 00917 FDEBUG(9) << "getOneEventFromSM received len = " << len << std::endl; 00918 if(data.d_.length() == 0) 00919 { 00920 // keep statistics for all HTTP POSTS 00921 eventFetchTimer_.stop(); 00922 ltEventFetchTimeCounter_->addSample(eventFetchTimer_.realTime()); 00923 stEventFetchTimeCounter_->addSample(eventFetchTimer_.realTime()); 00924 00925 return false; 00926 } 00927 00928 buf_.resize(len); 00929 for (int i=0; i<len ; i++) buf_[i] = data.d_[i]; 00930 00931 // keep statistics for all HTTP POSTS 00932 eventFetchTimer_.stop(); 00933 ltEventFetchTimeCounter_->addSample(eventFetchTimer_.realTime()); 00934 stEventFetchTimeCounter_->addSample(eventFetchTimer_.realTime()); 00935 00936 // first check if done message 00937 OtherMessageView msgView(&buf_[0]); 00938 00939 if (msgView.code() == Header::DONE) { 00940 // TODO fixme:just print message for now 00941 std::cout << " SM " << smURL << " has halted" << std::endl; 00942 return false; 00943 } else if (msgView.code() == Header::NEW_INIT_AVAILABLE) { 00944 std::cout << "Received NEW_INIT_AVAILABLE message" << std::endl; 00945 headerRefetchRequested_ = true; 00946 return false; 00947 } else { 00948 // 05-Feb-2008, KAB: catch (and rethrow) any exceptions decoding 00949 // the event data so that we can display the returned HTML and 00950 // (hopefully) give the user a hint as to the cause of the problem. 00951 try { 00952 HeaderView hdrView(&buf_[0]); 00953 if (hdrView.code() != Header::EVENT) { 00954 throw cms::Exception("getOneEventFromSM", "DataProcessManager"); 00955 } 00956 EventMsgView eventView(&buf_[0]); 00957 ++receivedEvents_; 00958 addMeasurement((unsigned long)data.d_.length()); 00959 if(eventServer_.get() != NULL) { 00960 eventServer_->processEvent(eventView); 00961 return true; 00962 } 00963 } 00964 catch (cms::Exception excpt) { 00965 const unsigned int MAX_DUMP_LENGTH = 1000; 00966 edm::LogError("getOneEventFromSM") << "========================================"; 00967 edm::LogError("getOneEventFromSM") << "Exception decoding the getEventData response!"; 00968 if (data.d_.length() <= MAX_DUMP_LENGTH) { 00969 edm::LogError("getOneEventFromSM") << "Here is the raw text that was returned:"; 00970 edm::LogError("getOneEventFromSM") << data.d_; 00971 } 00972 else { 00973 edm::LogError("getOneEventFromSM") << "Here are the first " << MAX_DUMP_LENGTH << 00974 " characters of the raw text that was returned:"; 00975 edm::LogError("getOneEventFromSM") << (data.d_.substr(0, MAX_DUMP_LENGTH)); 00976 } 00977 edm::LogError("getOneEventFromSM") << "========================================"; 00978 throw excpt; 00979 } 00980 } 00981 return false; 00982 }
double stor::DataProcessManager::getSampleCount | ( | STATS_TIME_FRAME | timeFrame = SHORT_TERM , |
|
STATS_TIMING_TYPE | timingType = EVENT_FETCH , |
|||
double | currentTime = BaseCounter::getCurrentTime() | |||
) |
Definition at line 1163 of file DataProcessManager.cc.
References DQMEVENT_FETCH, ltDQMFetchTimeCounter_, ltEventFetchTimeCounter_, SHORT_TERM, stDQMFetchTimeCounter_, and stEventFetchTimeCounter_.
01166 { 01167 if (timeFrame == SHORT_TERM) { 01168 if (timingType == DQMEVENT_FETCH) { 01169 return stDQMFetchTimeCounter_->getSampleCount(currentTime); 01170 } 01171 else { 01172 return stEventFetchTimeCounter_->getSampleCount(currentTime); 01173 } 01174 } 01175 else { 01176 if (timingType == DQMEVENT_FETCH) { 01177 return ltDQMFetchTimeCounter_->getSampleCount(); 01178 } 01179 else { 01180 return ltEventFetchTimeCounter_->getSampleCount(); 01181 } 01182 } 01183 }
stor::SMPerfStats stor::DataProcessManager::getStats | ( | ) | [inline] |
double stor::DataProcessManager::getTime2Wait | ( | std::string | smURL | ) | [private] |
Definition at line 811 of file DataProcessManager.cc.
References lastReqMap_, and minEventRequestInterval_.
Referenced by getOneEventFromSM().
00812 { 00813 // calculate time since last ping of this SM in seconds 00814 struct timeval now; 00815 struct timezone dummyTZ; 00816 gettimeofday(&now, &dummyTZ); 00817 double timeDiff = (double) now.tv_sec; 00818 timeDiff -= (double) lastReqMap_[smURL].tv_sec; 00819 timeDiff += ((double) now.tv_usec / 1000000.0); 00820 timeDiff -= ((double) lastReqMap_[smURL].tv_usec / 1000000.0); 00821 if (timeDiff < minEventRequestInterval_) 00822 { 00823 return (minEventRequestInterval_ - timeDiff); 00824 } 00825 else 00826 { 00827 return 0.0; 00828 } 00829 }
bool stor::DataProcessManager::haveHeader | ( | ) |
Definition at line 778 of file DataProcessManager.cc.
References initMsgCollection_.
Referenced by getEventFromAllSM(), and processCommands().
00779 { 00780 if(initMsgCollection_->size() > 0) return true; 00781 return false; 00782 }
bool stor::DataProcessManager::haveRegWithDQMServer | ( | ) |
Definition at line 767 of file DataProcessManager.cc.
References DQMsmList_, DQMsmRegMap_, and i.
00768 { 00769 // registered with any of the SM DQM servers 00770 if(DQMsmList_.size() > 0) { 00771 for(unsigned int i = 0; i < DQMsmList_.size(); ++i) { 00772 if(DQMsmRegMap_[DQMsmList_[i] ] > 0) return true; 00773 } 00774 } 00775 return false; 00776 }
bool stor::DataProcessManager::haveRegWithEventServer | ( | ) |
Definition at line 756 of file DataProcessManager.cc.
References i, smList_, and smRegMap_.
00757 { 00758 // registered with any of the SM event servers 00759 if(smList_.size() > 0) { 00760 for(unsigned int i = 0; i < smList_.size(); ++i) { 00761 if(smRegMap_[smList_[i] ] > 0) return true; 00762 } 00763 } 00764 return false; 00765 }
Definition at line 54 of file DataProcessManager.cc.
References alreadyRegistered_, alreadyRegisteredDQM_, consumerId_, consumerName_, consumerPriority_, consumerTopFolderName_, DQMconsumerId_, DQMconsumerName_, DQMconsumerPriority_, DQMeventpage_, DQMregpage_, DQMsmList_, DQMsmRegMap_, eventpage_, stor::SMPerfStats::fullReset(), headerpage_, headerRefetchRequested_, stor::SMPerformanceMeter::init(), ltDQMFetchTimeCounter_, ltEventFetchTimeCounter_, period4samples_, pmeter_, stor::PROXY_SERVER_NAME(), receivedDQMEvents_, receivedEvents_, regpage_, samples_, setMaxDQMEventRequestRate(), setMaxEventRequestRate(), smHeaderMap_, smList_, smRegMap_, stats_, stDQMFetchTimeCounter_, and stEventFetchTimeCounter_.
Referenced by DataProcessManager().
00055 { 00056 regpage_ = "/registerConsumer"; 00057 DQMregpage_ = "/registerDQMConsumer"; 00058 eventpage_ = "/geteventdata"; 00059 DQMeventpage_ = "/getDQMeventdata"; 00060 headerpage_ = "/getregdata"; 00061 consumerName_ = stor::PROXY_SERVER_NAME; 00062 //consumerPriority_ = "PushMode"; // this means push mode! 00063 consumerPriority_ = "Normal"; 00064 DQMconsumerName_ = stor::PROXY_SERVER_NAME; 00065 //DQMconsumerPriority_ = "PushMode"; // this means push mode! 00066 DQMconsumerPriority_ = "Normal"; 00067 00068 this->setMaxEventRequestRate(10.0); // just a default until set in config action 00069 consumerId_ = (time(0) & 0xffffff); // temporary - will get from ES later 00070 00071 this->setMaxDQMEventRequestRate(0.2); // set in config later 00072 DQMconsumerId_ = (time(0) & 0xffffff); // temporary - will get from ES later 00073 00074 alreadyRegistered_ = false; 00075 alreadyRegisteredDQM_ = false; 00076 headerRefetchRequested_ = false; 00077 00078 smList_.clear(); 00079 smRegMap_.clear(); 00080 smHeaderMap_.clear(); 00081 DQMsmList_.clear(); 00082 DQMsmRegMap_.clear(); 00083 00084 // TODO fixme: only request folders that connected consumers want? 00085 consumerTopFolderName_ = "*"; 00086 //consumerTopFolderName_ = "C1"; 00087 receivedEvents_ = 0; 00088 receivedDQMEvents_ = 0; 00089 pmeter_->init(samples_, period4samples_); 00090 stats_.fullReset(); 00091 00092 // initialize the counters that we use for statistics 00093 ltEventFetchTimeCounter_.reset(new ForeverCounter()); 00094 stEventFetchTimeCounter_.reset(new RollingIntervalCounter(180,5,20)); 00095 ltDQMFetchTimeCounter_.reset(new ForeverCounter()); 00096 stDQMFetchTimeCounter_.reset(new RollingIntervalCounter(180,5,20)); 00097 }
void stor::DataProcessManager::join | ( | ) |
Definition at line 189 of file DataProcessManager.cc.
References me_.
00190 { 00191 // invoked from a different thread - block until "me_" is done 00192 if(me_) me_->join(); 00193 }
unsigned long stor::DataProcessManager::period4samples | ( | ) | [inline] |
Definition at line 116 of file DataProcessManager.h.
References stor::SMPerformanceMeter::getPeriod4Samples(), and pmeter_.
00116 { return pmeter_->getPeriod4Samples(); }
void stor::DataProcessManager::processCommands | ( | ) | [private] |
Definition at line 195 of file DataProcessManager.cc.
References alreadyRegistered_, alreadyRegisteredDQM_, edm::EventBuffer::OperateBuffer< T >::buffer(), cmd_q_, count, dqmServiceManager_, edm::EventBuffer::empty(), getDQMEventFromAllSM(), getEventFromAllSM(), getHeaderFromAllSM(), haveHeader(), headerRefetchRequested_, NULL, registerWithAllDQMSM(), registerWithAllSM(), edm::EventBuffer::OperateBuffer< T >::size(), smHeaderMap_, and waitBetweenRegTrys().
Referenced by run().
00196 { 00197 // called with this data process manager's own thread. 00198 // first register with the SM for each subfarm 00199 bool doneWithRegistration = false; 00200 // TODO fixme: improve method of hardcored fixed retries 00201 unsigned int count = 0; // keep of count of tries and quit after 255 00202 unsigned int maxcount = 255; 00203 bool doneWithDQMRegistration = false; 00204 unsigned int countDQM = 0; // keep of count of tries and quit after 255 00205 bool alreadysaid = false; 00206 bool alreadysaidDQM = false; 00207 00208 //bool gotOneHeader = false; 00209 bool gotOneHeaderFromAll = false; 00210 unsigned int countINIT = 0; // keep of count of tries and quit after 255 00211 bool alreadysaidINIT = false; 00212 00213 bool DoneWithJob = false; 00214 while(!DoneWithJob) 00215 { 00216 // work loop 00217 // if a header re-fetch has been requested, reset the header vars 00218 if (headerRefetchRequested_) { 00219 headerRefetchRequested_ = false; 00220 //gotOneHeader = false; 00221 gotOneHeaderFromAll = false; 00222 smHeaderMap_.clear(); 00223 countINIT = 0; 00224 } 00225 // register as event consumer to all SM senders 00226 if(!alreadyRegistered_) { 00227 if(!doneWithRegistration) 00228 { 00229 waitBetweenRegTrys(); 00230 bool success = registerWithAllSM(); 00231 if(success) doneWithRegistration = true; 00232 ++count; 00233 } 00234 // TODO fixme: decide what to do after max tries 00235 if(count >= maxcount) edm::LogInfo("processCommands") << "Could not register with all SM Servers" 00236 << " after " << maxcount << " tries"; 00237 if(doneWithRegistration && !alreadysaid) { 00238 edm::LogInfo("processCommands") << "Registered with all SM Event Servers"; 00239 alreadysaid = true; 00240 } 00241 if(doneWithRegistration) alreadyRegistered_ = true; 00242 } 00243 // now register as DQM consumers 00244 if(!alreadyRegisteredDQM_) { 00245 if(!doneWithDQMRegistration) 00246 { 00247 waitBetweenRegTrys(); 00248 bool success = registerWithAllDQMSM(); 00249 if(success) doneWithDQMRegistration = true; 00250 ++countDQM; 00251 } 00252 // TODO fixme: decide what to do after max tries 00253 if(count >= maxcount) edm::LogInfo("processCommands") << "Could not register with all SM DQMEvent Servers" 00254 << " after " << maxcount << " tries"; 00255 if(doneWithDQMRegistration && !alreadysaidDQM) { 00256 edm::LogInfo("processCommands") << "Registered with all SM DQMEvent Servers"; 00257 alreadysaidDQM = true; 00258 } 00259 if(doneWithDQMRegistration) alreadyRegisteredDQM_ = true; 00260 } 00261 // now get one INIT header (product registry) and save it 00262 // as long as at least one SMsender registered with 00263 // TODO fixme: use the data member for got header to go across runs 00264 // With multiple SMs, we need to get a Header from each else that consumer 00265 // is counted as not initialized 00266 // TODO how to we get all INIT messages from each SM (and know it!) 00267 //if(!gotOneHeader) 00268 if(!gotOneHeaderFromAll) 00269 { 00270 waitBetweenRegTrys(); 00271 //bool success = getAnyHeaderFromSM(); 00272 bool success = getHeaderFromAllSM(); 00273 //if(success) gotOneHeader = true; 00274 if(success) gotOneHeaderFromAll = true; 00275 ++countINIT; 00276 } 00277 if(countINIT >= maxcount) edm::LogInfo("processCommands") << "Could not get product registry!" 00278 << " after " << maxcount << " tries"; 00279 //if(gotOneHeader && !alreadysaidINIT) { 00280 if(gotOneHeaderFromAll && !alreadysaidINIT) { 00281 edm::LogInfo("processCommands") << "Got the product registry"; 00282 alreadysaidINIT = true; 00283 } 00284 //if(alreadyRegistered_ && gotOneHeader && haveHeader()) { 00285 if(alreadyRegistered_ && gotOneHeaderFromAll && haveHeader()) { 00286 getEventFromAllSM(); 00287 } 00288 if(alreadyRegisteredDQM_) { 00289 getDQMEventFromAllSM(); 00290 } 00291 00292 // check for any commands - empty() does not block 00293 if(!cmd_q_->empty()) 00294 { 00295 // the next line blocks until there is an entry in cmd_q 00296 edm::EventBuffer::ConsumerBuffer cb(*cmd_q_); 00297 MsgCode mc(cb.buffer(),cb.size()); 00298 00299 if(mc.getCode()==MsgCode::DONE) DoneWithJob = true; 00300 // right now we will ignore all messages other than DONE 00301 } 00302 00303 } // done with process loop 00304 edm::LogInfo("processCommands") << "Received done - stopping"; 00305 if(dqmServiceManager_.get() != NULL) dqmServiceManager_->stop(); 00306 }
unsigned long stor::DataProcessManager::receivedDQMevents | ( | ) | [inline] |
Definition at line 110 of file DataProcessManager.h.
References receivedDQMEvents_.
00110 { return receivedDQMEvents_; }
unsigned long stor::DataProcessManager::receivedevents | ( | ) | [inline] |
Definition at line 109 of file DataProcessManager.h.
References receivedEvents_.
00109 { return receivedEvents_; }
bool stor::DataProcessManager::registerWithAllDQMSM | ( | ) | [private] |
Definition at line 369 of file DataProcessManager.cc.
References DQMsmList_, DQMsmRegMap_, i, and registerWithDQMSM().
Referenced by processCommands().
00370 { 00371 // One try at registering with the SM on each subfarm 00372 // return true if registered with all SM 00373 // Only make one attempt and return so we can make this thread stop 00374 if(DQMsmList_.size() == 0) return false; 00375 bool allRegistered = true; 00376 for(unsigned int i = 0; i < DQMsmList_.size(); ++i) { 00377 if(DQMsmRegMap_[DQMsmList_[i] ] > 0) continue; // already registered 00378 int consumerid = registerWithDQMSM(DQMsmList_[i]); 00379 if(consumerid > 0) DQMsmRegMap_[DQMsmList_[i] ] = consumerid; 00380 else allRegistered = false; 00381 } 00382 return allRegistered; 00383 }
bool stor::DataProcessManager::registerWithAllSM | ( | ) | [private] |
Definition at line 353 of file DataProcessManager.cc.
References i, registerWithSM(), smList_, and smRegMap_.
Referenced by processCommands().
00354 { 00355 // One try at registering with the SM on each subfarm 00356 // return true if registered with all SM 00357 // Only make one attempt and return so we can make this thread stop 00358 if(smList_.size() == 0) return false; 00359 bool allRegistered = true; 00360 for(unsigned int i = 0; i < smList_.size(); ++i) { 00361 if(smRegMap_[smList_[i] ] > 0) continue; // already registered 00362 int consumerid = registerWithSM(smList_[i]); 00363 if(consumerid > 0) smRegMap_[smList_[i] ] = consumerid; 00364 else allRegistered = false; 00365 } 00366 return allRegistered; 00367 }
int stor::DataProcessManager::registerWithDQMSM | ( | std::string | smURL | ) | [private] |
Definition at line 472 of file DataProcessManager.cc.
References buf_, TestMuL1L2Filter_cff::cerr, consumerTopFolderName_, stor::ReadData::d_, data, DQMconsumerName_, DQMconsumerPriority_, DQMregpage_, lat::endl(), ConsRegResponseBuilder::ES_NOT_READY, Exception, FDEBUG, stor::func(), i, len, NULL, stor::setopt(), ConsRegRequestBuilder::size(), and ConsRegRequestBuilder::startAddress().
Referenced by registerWithAllDQMSM().
00473 { 00474 // Use this same registration method for both event data and DQM data 00475 // return with consumerID or 0 for failure 00476 stor::ReadData data; 00477 00478 data.d_.clear(); 00479 CURL* han = curl_easy_init(); 00480 if(han==0) 00481 { 00482 edm::LogError("registerWithDQMSM") << "Could not create curl handle"; 00483 // this is a fatal error isn't it? Are we catching this? TODO 00484 throw cms::Exception("registerWithDQMSM","DataProcessManager") 00485 << "Unable to create curl handle\n"; 00486 } 00487 // set the standard https request options 00488 std::string url2use = smURL + DQMregpage_; 00489 setopt(han,CURLOPT_URL,url2use.c_str()); 00490 setopt(han,CURLOPT_WRITEFUNCTION,func); 00491 setopt(han,CURLOPT_WRITEDATA,&data); 00492 00493 // build the registration request message to send to the storage manager 00494 const int BUFFER_SIZE = 2000; 00495 char msgBuff[BUFFER_SIZE]; 00496 ConsRegRequestBuilder requestMessage(msgBuff, BUFFER_SIZE, DQMconsumerName_, 00497 DQMconsumerPriority_, consumerTopFolderName_); 00498 00499 // add the request message as a https post 00500 setopt(han, CURLOPT_POSTFIELDS, requestMessage.startAddress()); 00501 setopt(han, CURLOPT_POSTFIELDSIZE, requestMessage.size()); 00502 struct curl_slist *headers=NULL; 00503 headers = curl_slist_append(headers, "Content-Type: application/octet-stream"); 00504 headers = curl_slist_append(headers, "Content-Transfer-Encoding: binary"); 00505 setopt(han, CURLOPT_HTTPHEADER, headers); 00506 00507 // send the HTTP POST, read the reply, and cleanup before going on 00508 CURLcode messageStatus = curl_easy_perform(han); 00509 curl_slist_free_all(headers); 00510 curl_easy_cleanup(han); 00511 00512 if(messageStatus!=0) 00513 { 00514 cerr << "curl perform failed for DQM registration" << endl; 00515 edm::LogError("registerWithDQMSM") << "curl perform failed for registration. " 00516 << "Could not register with DQM: probably XDAQ not running on Storage Manager" 00517 << " at " << smURL; 00518 return 0; 00519 } 00520 uint32 registrationStatus = ConsRegResponseBuilder::ES_NOT_READY; 00521 int consumerId = 0; 00522 if(data.d_.length() > 0) 00523 { 00524 int len = data.d_.length(); 00525 FDEBUG(9) << "registerWithDQMSM received len = " << len << std::endl; 00526 buf_.resize(len); 00527 for (int i=0; i<len ; i++) buf_[i] = data.d_[i]; 00528 00529 try { 00530 ConsRegResponseView respView(&buf_[0]); 00531 registrationStatus = respView.getStatus(); 00532 consumerId = respView.getConsumerId(); 00533 } 00534 catch (cms::Exception excpt) { 00535 const unsigned int MAX_DUMP_LENGTH = 1000; 00536 edm::LogError("registerWithDQMSM") << "========================================"; 00537 edm::LogError("registerWithDQMSM") << "Exception decoding the registerWithSM response!"; 00538 if (data.d_.length() <= MAX_DUMP_LENGTH) { 00539 edm::LogError("registerWithDQMSM") << "Here is the raw text that was returned:"; 00540 edm::LogError("registerWithDQMSM") << data.d_; 00541 } 00542 else { 00543 edm::LogError("registerWithDQMSM") << "Here are the first " << MAX_DUMP_LENGTH << 00544 " characters of the raw text that was returned:"; 00545 edm::LogError("registerWithDQMSM") << (data.d_.substr(0, MAX_DUMP_LENGTH)); 00546 } 00547 edm::LogError("registerWithDQMSM") << "========================================"; 00548 return 0; 00549 } 00550 } 00551 if(registrationStatus == ConsRegResponseBuilder::ES_NOT_READY) return 0; 00552 FDEBUG(5) << "Consumer ID = " << consumerId << endl; 00553 return consumerId; 00554 }
int stor::DataProcessManager::registerWithSM | ( | std::string | smURL | ) | [private] |
Definition at line 385 of file DataProcessManager.cc.
References buf_, TestMuL1L2Filter_cff::cerr, consumerName_, consumerPriority_, consumerPSetString_, stor::ReadData::d_, data, lat::endl(), ConsRegResponseBuilder::ES_NOT_READY, eventServer_, Exception, FDEBUG, stor::func(), i, len, NULL, regpage_, stor::setopt(), ConsRegRequestBuilder::size(), and ConsRegRequestBuilder::startAddress().
Referenced by registerWithAllSM().
00386 { 00387 // Use this same registration method for both event data and DQM data 00388 // return with consumerID or 0 for failure 00389 stor::ReadData data; 00390 00391 data.d_.clear(); 00392 CURL* han = curl_easy_init(); 00393 if(han==0) 00394 { 00395 edm::LogError("registerWithSM") << "Could not create curl handle"; 00396 // this is a fatal error isn't it? Are we catching this? TODO 00397 throw cms::Exception("registerWithSM","DataProcessManager") 00398 << "Unable to create curl handle\n"; 00399 } 00400 // set the standard https request options 00401 std::string url2use = smURL + regpage_; 00402 setopt(han,CURLOPT_URL,url2use.c_str()); 00403 setopt(han,CURLOPT_WRITEFUNCTION,func); 00404 setopt(han,CURLOPT_WRITEDATA,&data); 00405 00406 // build the registration request message to send to the storage manager 00407 const int BUFFER_SIZE = 2000; 00408 char msgBuff[BUFFER_SIZE]; 00409 ConsRegRequestBuilder requestMessage(msgBuff, BUFFER_SIZE, consumerName_, 00410 consumerPriority_, consumerPSetString_); 00411 00412 // add the request message as a https post 00413 setopt(han, CURLOPT_POSTFIELDS, requestMessage.startAddress()); 00414 setopt(han, CURLOPT_POSTFIELDSIZE, requestMessage.size()); 00415 struct curl_slist *headers=NULL; 00416 headers = curl_slist_append(headers, "Content-Type: application/octet-stream"); 00417 headers = curl_slist_append(headers, "Content-Transfer-Encoding: binary"); 00418 setopt(han, CURLOPT_HTTPHEADER, headers); 00419 00420 // send the HTTP POST, read the reply, and cleanup before going on 00421 CURLcode messageStatus = curl_easy_perform(han); 00422 curl_slist_free_all(headers); 00423 curl_easy_cleanup(han); 00424 00425 if(messageStatus!=0) 00426 { 00427 cerr << "curl perform failed for registration" << endl; 00428 edm::LogError("registerWithSM") << "curl perform failed for registration. " 00429 << "Could not register: probably XDAQ not running on Storage Manager" 00430 << " at " << smURL; 00431 return 0; 00432 } 00433 uint32 registrationStatus = ConsRegResponseBuilder::ES_NOT_READY; 00434 int consumerId = 0; 00435 if(data.d_.length() > 0) 00436 { 00437 int len = data.d_.length(); 00438 FDEBUG(9) << "registerWithSM received len = " << len << std::endl; 00439 buf_.resize(len); 00440 for (int i=0; i<len ; i++) buf_[i] = data.d_[i]; 00441 00442 try { 00443 ConsRegResponseView respView(&buf_[0]); 00444 registrationStatus = respView.getStatus(); 00445 consumerId = respView.getConsumerId(); 00446 if (eventServer_.get() != NULL) { 00447 eventServer_->setStreamSelectionTable(respView.getStreamSelectionTable()); 00448 } 00449 } 00450 catch (cms::Exception excpt) { 00451 const unsigned int MAX_DUMP_LENGTH = 1000; 00452 edm::LogError("registerWithSM") << "========================================"; 00453 edm::LogError("registerWithSM") << "Exception decoding the registerWithSM response!"; 00454 if (data.d_.length() <= MAX_DUMP_LENGTH) { 00455 edm::LogError("registerWithSM") << "Here is the raw text that was returned:"; 00456 edm::LogError("registerWithSM") << data.d_; 00457 } 00458 else { 00459 edm::LogError("registerWithSM") << "Here are the first " << MAX_DUMP_LENGTH << 00460 " characters of the raw text that was returned:"; 00461 edm::LogError("registerWithSM") << (data.d_.substr(0, MAX_DUMP_LENGTH)); 00462 } 00463 edm::LogError("registerWithSM") << "========================================"; 00464 return 0; 00465 } 00466 } 00467 if(registrationStatus == ConsRegResponseBuilder::ES_NOT_READY) return 0; 00468 FDEBUG(5) << "Consumer ID = " << consumerId << endl; 00469 return consumerId; 00470 }
void stor::DataProcessManager::run | ( | DataProcessManager * | t | ) | [static, private] |
Definition at line 166 of file DataProcessManager.cc.
References processCommands().
Referenced by start().
00167 { 00168 t->processCommands(); 00169 }
unsigned long stor::DataProcessManager::samples | ( | ) | [inline] |
Definition at line 115 of file DataProcessManager.h.
References stor::SMPerformanceMeter::getSetSamples(), and pmeter_.
00115 { return pmeter_->getSetSamples(); }
Definition at line 67 of file DataProcessManager.h.
References dqmServiceManager_.
00068 { dqmServiceManager_->setArchiveDQM(archiveDQM); }
Definition at line 70 of file DataProcessManager.h.
References dqmServiceManager_.
00071 { dqmServiceManager_->setArchiveInterval(archiveInterval); }
Definition at line 64 of file DataProcessManager.h.
References dqmServiceManager_.
00065 { dqmServiceManager_->setCollateDQM(collateDQM); }
Definition at line 85 of file DataProcessManager.h.
References dqmServiceManager_.
00086 { dqmServiceManager_->setCompressionLevel(compressionLevelDQM);}
void stor::DataProcessManager::setConsumerName | ( | std::string | s | ) | [inline] |
Definition at line 99 of file DataProcessManager.h.
References consumerName_.
00099 { consumerName_ = s; }
void stor::DataProcessManager::setDQMConsumerName | ( | std::string | s | ) | [inline] |
Definition at line 100 of file DataProcessManager.h.
References DQMconsumerName_.
00100 { DQMconsumerName_ = s; }
void stor::DataProcessManager::setDQMEventServer | ( | boost::shared_ptr< DQMEventServer > & | es | ) | [inline] |
Definition at line 88 of file DataProcessManager.h.
References DQMeventServer_, dqmServiceManager_, and NULL.
00089 { 00090 // The auto_ptr still owns the memory after this get() 00091 if (dqmServiceManager_.get() != NULL) dqmServiceManager_->setDQMEventServer(es); 00092 DQMeventServer_ = es; 00093 }
void stor::DataProcessManager::setDQMTime2Now | ( | std::string | smURL | ) | [private] |
Definition at line 1034 of file DataProcessManager.cc.
References lastDQMReqMap_.
Referenced by getOneDQMEventFromSM().
01035 { 01036 struct timeval now; 01037 struct timezone dummyTZ; 01038 gettimeofday(&now, &dummyTZ); 01039 lastDQMReqMap_[smURL] = now; 01040 }
void stor::DataProcessManager::setEventServer | ( | boost::shared_ptr< EventServer > & | es | ) | [inline] |
Definition at line 47 of file DataProcessManager.h.
References eventServer_.
00048 { 00049 eventServer_ = es; 00050 }
void stor::DataProcessManager::setFilePrefixDQM | ( | std::string | filePrefixDQM | ) | [inline] |
Definition at line 79 of file DataProcessManager.h.
References dqmServiceManager_.
00080 { dqmServiceManager_->setFilePrefix(filePrefixDQM);}
void stor::DataProcessManager::setInitMsgCollection | ( | boost::shared_ptr< InitMsgCollection > & | imColl | ) | [inline] |
Definition at line 53 of file DataProcessManager.h.
References initMsgCollection_.
00054 { 00055 initMsgCollection_ = imColl; 00056 }
void stor::DataProcessManager::setMaxDQMEventRequestRate | ( | double | rate | ) |
Definition at line 119 of file DataProcessManager.cc.
References minDQMEventRequestInterval_.
Referenced by init().
00120 { 00121 const double MAX_REQUEST_INTERVAL = 300.0; // seconds 00122 if(rate <= 0.0) return; // TODO make sure config is checked! 00123 if (rate < (1.0 / MAX_REQUEST_INTERVAL)) { 00124 minDQMEventRequestInterval_ = MAX_REQUEST_INTERVAL; 00125 } 00126 else { 00127 minDQMEventRequestInterval_ = 1.0 / rate; // seconds 00128 } 00129 }
void stor::DataProcessManager::setMaxEventRequestRate | ( | double | rate | ) |
Definition at line 112 of file DataProcessManager.cc.
References maxEventRequestRate_, and updateMinEventRequestInterval().
Referenced by init().
00113 { 00114 if(rate <= 0.0) return; // TODO make sure config is checked! 00115 maxEventRequestRate_ = rate; 00116 updateMinEventRequestInterval(); 00117 }
void stor::DataProcessManager::setPeriod4Samples | ( | unsigned long | period4samples | ) |
Definition at line 105 of file DataProcessManager.cc.
References period4samples_, pmeter_, and stor::SMPerformanceMeter::setPeriod4Samples().
00106 { 00107 period4samples_ = period4samples; 00108 pmeter_->setPeriod4Samples(period4samples); 00109 }
Definition at line 73 of file DataProcessManager.h.
References dqmServiceManager_.
00074 { dqmServiceManager_->setPurgeTime(purgeTimeDQM);}
Definition at line 76 of file DataProcessManager.h.
References dqmServiceManager_.
00077 { dqmServiceManager_->setReadyTime(readyTimeDQM);}
void stor::DataProcessManager::setSamples | ( | unsigned long | num_samples | ) |
Definition at line 99 of file DataProcessManager.cc.
References pmeter_, samples_, and stor::SMPerformanceMeter::setSamples().
00100 { 00101 samples_ = num_samples; 00102 pmeter_->setSamples(num_samples); 00103 }
void stor::DataProcessManager::setTime2Now | ( | std::string | smURL | ) | [private] |
Definition at line 831 of file DataProcessManager.cc.
References lastReqMap_.
Referenced by getOneEventFromSM().
00832 { 00833 struct timeval now; 00834 struct timezone dummyTZ; 00835 gettimeofday(&now, &dummyTZ); 00836 lastReqMap_[smURL] = now; 00837 }
Definition at line 82 of file DataProcessManager.h.
References dqmServiceManager_.
00083 { dqmServiceManager_->setUseCompression(useCompressionDQM);}
Definition at line 171 of file DataProcessManager.cc.
References me_, run(), and thread.
00172 { 00173 // called from a different thread to start things going 00174 00175 me_.reset(new boost::thread(boost::bind(DataProcessManager::run,this))); 00176 }
void stor::DataProcessManager::stop | ( | ) |
Definition at line 178 of file DataProcessManager.cc.
References edm::EventBuffer::OperateBuffer< T >::buffer(), cmd_q_, and edm::EventBuffer::OperateBuffer< T >::commit().
00179 { 00180 // called from a different thread - trigger completion to the 00181 // data process manager loop 00182 00183 edm::EventBuffer::ProducerBuffer cb(*cmd_q_); 00184 MsgCode mc(cb.buffer(),MsgCode::DONE); 00185 mc.setCode(MsgCode::DONE); 00186 cb.commit(mc.codeSize()); 00187 }
double stor::DataProcessManager::totalvolumemb | ( | ) | [inline] |
Definition at line 117 of file DataProcessManager.h.
References pmeter_, and stor::SMPerformanceMeter::totalvolumemb().
00117 { return pmeter_->totalvolumemb(); }
void stor::DataProcessManager::updateMinEventRequestInterval | ( | ) |
Definition at line 131 of file DataProcessManager.cc.
References consumerPSetString_, edm::ParameterSet::insert(), maxEventRequestRate_, minEventRequestInterval_, smList_, and edm::ParameterSet::toString().
Referenced by addSM2Register(), and setMaxEventRequestRate().
00132 { 00133 const double MAX_REQUEST_INTERVAL = 300.0; // seconds 00134 double rate = maxEventRequestRate_; 00135 00136 if (rate < (1.0 / MAX_REQUEST_INTERVAL)) { 00137 minEventRequestInterval_ = MAX_REQUEST_INTERVAL; 00138 } 00139 else { 00140 // base the interval on the number of storage managers 00141 // (so that the requested rate doesn't increase proportionally 00142 // with number of SMs) 00143 // We could simply use smCount/rate, but let's be a bit more 00144 // generous than that so that if one of the SMs isn't queueing 00145 // events for us, we still get enough rate 00146 if (smList_.size() <= 1) { 00147 minEventRequestInterval_ = 1.0 / rate; // seconds 00148 } 00149 else { 00150 minEventRequestInterval_ = (smList_.size() - 1) / rate; 00151 } 00152 } 00153 00154 // 16-Apr-2008, KAB: set maxEventRequestRate in the parameterSet that 00155 // we send to the storage manager so that the SM knows how many events 00156 // to queue for the SMPS. 00157 edm::ParameterSet ps = ParameterSet(); 00158 Entry maxRateEntry("maxEventRequestRate", 00159 (1.0 / minEventRequestInterval_), 00160 false); 00161 ps.insert(true, "maxEventRequestRate", maxRateEntry); 00162 // TODO fixme: only request event types that are requested by connected consumers? 00163 consumerPSetString_ = ps.toString(); 00164 }
void stor::DataProcessManager::waitBetweenRegTrys | ( | ) | [private] |
Definition at line 749 of file DataProcessManager.cc.
References headerRetryInterval_.
Referenced by processCommands().
00750 { 00751 // for now just a simple wait for a fixed time 00752 sleep(headerRetryInterval_); 00753 return; 00754 }
std::vector<unsigned char> stor::DataProcessManager::buf_ [private] |
Definition at line 156 of file DataProcessManager.h.
Referenced by getOneDQMEventFromSM(), getOneEventFromSM(), registerWithDQMSM(), and registerWithSM().
edm::EventBuffer* stor::DataProcessManager::cmd_q_ [private] |
Definition at line 151 of file DataProcessManager.h.
Referenced by getCommandQueue(), processCommands(), and stop().
unsigned int stor::DataProcessManager::consumerId_ [private] |
std::string stor::DataProcessManager::consumerName_ [private] |
Definition at line 170 of file DataProcessManager.h.
Referenced by init(), registerWithSM(), and setConsumerName().
std::string stor::DataProcessManager::consumerPriority_ [private] |
std::string stor::DataProcessManager::consumerPSetString_ [private] |
Definition at line 172 of file DataProcessManager.h.
Referenced by registerWithSM(), and updateMinEventRequestInterval().
std::string stor::DataProcessManager::consumerTopFolderName_ [private] |
unsigned int stor::DataProcessManager::DQMconsumerId_ [private] |
std::string stor::DataProcessManager::DQMconsumerName_ [private] |
Definition at line 181 of file DataProcessManager.h.
Referenced by init(), registerWithDQMSM(), and setDQMConsumerName().
std::string stor::DataProcessManager::DQMconsumerPriority_ [private] |
std::string stor::DataProcessManager::DQMeventpage_ [private] |
Definition at line 165 of file DataProcessManager.h.
Referenced by getOneDQMEventFromSM(), and init().
boost::shared_ptr<DQMEventServer> stor::DataProcessManager::DQMeventServer_ [private] |
Definition at line 189 of file DataProcessManager.h.
Referenced by getDQMEventServer(), and setDQMEventServer().
std::string stor::DataProcessManager::DQMregpage_ [private] |
boost::shared_ptr<stor::DQMServiceManager> stor::DataProcessManager::dqmServiceManager_ [private] |
Definition at line 186 of file DataProcessManager.h.
Referenced by getDQMServiceManager(), getOneDQMEventFromSM(), processCommands(), setArchiveDQM(), setArchiveIntervalDQM(), setCollateDQM(), setCompressionLevelDQM(), setDQMEventServer(), setFilePrefixDQM(), setPurgeTimeDQM(), setReadyTimeDQM(), and setUseCompressionDQM().
std::vector<std::string> stor::DataProcessManager::DQMsmList_ [private] |
Definition at line 161 of file DataProcessManager.h.
Referenced by addDQMSM2Register(), haveRegWithDQMServer(), init(), and registerWithAllDQMSM().
Definition at line 162 of file DataProcessManager.h.
Referenced by addDQMSM2Register(), getDQMEventFromAllSM(), getOneDQMEventFromSM(), haveRegWithDQMServer(), init(), and registerWithAllDQMSM().
Definition at line 204 of file DataProcessManager.h.
Referenced by getOneDQMEventFromSM(), and getOneEventFromSM().
std::string stor::DataProcessManager::eventpage_ [private] |
boost::shared_ptr<EventServer> stor::DataProcessManager::eventServer_ [private] |
Definition at line 188 of file DataProcessManager.h.
Referenced by getEventServer(), getHeaderFromSM(), getOneEventFromSM(), registerWithSM(), and setEventServer().
std::string stor::DataProcessManager::headerpage_ [private] |
Definition at line 155 of file DataProcessManager.h.
Referenced by getOneEventFromSM(), init(), and processCommands().
boost::shared_ptr<InitMsgCollection> stor::DataProcessManager::initMsgCollection_ [private] |
Definition at line 190 of file DataProcessManager.h.
Referenced by getHeaderFromSM(), getInitMsgCollection(), getOneEventFromSM(), haveHeader(), and setInitMsgCollection().
Definition at line 180 of file DataProcessManager.h.
Referenced by addDQMSM2Register(), getDQMTime2Wait(), and setDQMTime2Now().
Definition at line 177 of file DataProcessManager.h.
Referenced by addSM2Register(), getTime2Wait(), and setTime2Now().
boost::shared_ptr<ForeverCounter> stor::DataProcessManager::ltDQMFetchTimeCounter_ [private] |
Definition at line 208 of file DataProcessManager.h.
Referenced by getAverageValue(), getDuration(), getOneDQMEventFromSM(), getSampleCount(), and init().
boost::shared_ptr<ForeverCounter> stor::DataProcessManager::ltEventFetchTimeCounter_ [private] |
Definition at line 206 of file DataProcessManager.h.
Referenced by getAverageValue(), getDuration(), getOneEventFromSM(), getSampleCount(), and init().
double stor::DataProcessManager::maxEventRequestRate_ [private] |
Definition at line 174 of file DataProcessManager.h.
Referenced by setMaxEventRequestRate(), and updateMinEventRequestInterval().
boost::shared_ptr<boost::thread> stor::DataProcessManager::me_ [private] |
double stor::DataProcessManager::minDQMEventRequestInterval_ [private] |
Definition at line 178 of file DataProcessManager.h.
Referenced by getDQMTime2Wait(), and setMaxDQMEventRequestRate().
double stor::DataProcessManager::minEventRequestInterval_ [private] |
Definition at line 175 of file DataProcessManager.h.
Referenced by getTime2Wait(), and updateMinEventRequestInterval().
xdata::UnsignedInteger32 stor::DataProcessManager::period4samples_ [private] |
Definition at line 197 of file DataProcessManager.h.
Referenced by addMeasurement(), DataProcessManager(), init(), period4samples(), samples(), setPeriod4Samples(), setSamples(), totalvolumemb(), and ~DataProcessManager().
unsigned long stor::DataProcessManager::receivedDQMEvents_ [private] |
Definition at line 196 of file DataProcessManager.h.
Referenced by getOneDQMEventFromSM(), init(), and receivedDQMevents().
unsigned long stor::DataProcessManager::receivedEvents_ [private] |
Definition at line 195 of file DataProcessManager.h.
Referenced by getOneEventFromSM(), init(), and receivedevents().
std::string stor::DataProcessManager::regpage_ [private] |
xdata::UnsignedInteger32 stor::DataProcessManager::samples_ [private] |
Definition at line 160 of file DataProcessManager.h.
Referenced by addSM2Register(), getHeaderFromAllSM(), init(), and processCommands().
std::vector<std::string> stor::DataProcessManager::smList_ [private] |
Definition at line 158 of file DataProcessManager.h.
Referenced by addSM2Register(), getAnyHeaderFromSM(), getDQMEventFromAllSM(), getEventFromAllSM(), getHeaderFromAllSM(), haveRegWithEventServer(), init(), registerWithAllSM(), and updateMinEventRequestInterval().
Definition at line 159 of file DataProcessManager.h.
Referenced by addSM2Register(), getAnyHeaderFromSM(), getEventFromAllSM(), getHeaderFromAllSM(), getHeaderFromSM(), getOneEventFromSM(), haveRegWithEventServer(), init(), and registerWithAllSM().
Definition at line 201 of file DataProcessManager.h.
Referenced by addMeasurement(), getStats(), and init().
boost::shared_ptr<RollingIntervalCounter> stor::DataProcessManager::stDQMFetchTimeCounter_ [private] |
Definition at line 209 of file DataProcessManager.h.
Referenced by getAverageValue(), getDuration(), getOneDQMEventFromSM(), getSampleCount(), and init().
boost::shared_ptr<RollingIntervalCounter> stor::DataProcessManager::stEventFetchTimeCounter_ [private] |
Definition at line 207 of file DataProcessManager.h.
Referenced by getAverageValue(), getDuration(), getOneEventFromSM(), getSampleCount(), and init().
char stor::DataProcessManager::subscriptionurl_[2048] [private] |
Definition at line 168 of file DataProcessManager.h.