00001
00010 #include "EventFilter/StorageManager/interface/ConsumerPipe.h"
00011 #include "FWCore/Utilities/interface/DebugMacros.h"
00012 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00013
00014
00015
00016
00017 using namespace std;
00018 using namespace stor;
00019 using namespace edm;
00020
00024 uint32 ConsumerPipe::rootId_ = 1;
00025
00029 boost::mutex ConsumerPipe::rootIdLock_;
00030
00034 const double ConsumerPipe::MAX_ACCEPT_INTERVAL = 86400.0;
00035
00039 ConsumerPipe::ConsumerPipe(std::string name, std::string priority,
00040 int activeTimeout, int idleTimeout,
00041 Strings triggerSelection, double rateRequest,
00042 std::string hltOutputSelection,
00043 std::string hostName, int queueSize):
00044 han_(curl_easy_init()),
00045 headers_(),
00046 consumerName_(name),consumerPriority_(priority),
00047 events_(0),
00048 triggerSelection_(triggerSelection),
00049 rateRequest_(rateRequest),
00050 hltOutputSelection_(hltOutputSelection),
00051 hostName_(hostName),
00052 pushEventFailures_(0),
00053 maxQueueSize_(queueSize)
00054 {
00055
00056 timeToIdleState_ = activeTimeout;
00057 timeToDisconnectedState_ = activeTimeout + idleTimeout;
00058 lastEventRequestTime_ = time(NULL);
00059 initializationDone = false;
00060 pushMode_ = false;
00061 if(consumerPriority_.compare("PushMode") == 0) pushMode_ = true;
00062 registryWarningWasReported_ = false;
00063
00064
00065 consumerIsProxyServer_ = false;
00066
00067 if (consumerName_.find("urn") != std::string::npos &&
00068 consumerName_.find("xdaq") != std::string::npos &&
00069 consumerName_.find("pushEventData") != std::string::npos)
00070 {
00071 consumerIsProxyServer_ = true;
00072 }
00073
00074
00075 boost::mutex::scoped_lock scopedLockForRootId(rootIdLock_);
00076 consumerId_ = rootId_;
00077 rootId_++;
00078
00079 if(han_==0)
00080 {
00081 edm::LogError("ConsumerPipe") << "Could not create curl handle";
00082
00083
00084
00085 } else {
00086 headers_ = curl_slist_append(headers_, "Content-Type: application/octet-stream");
00087 headers_ = curl_slist_append(headers_, "Content-Transfer-Encoding: binary");
00088
00089
00090 headers_ = curl_slist_append(headers_, "Expect:");
00091 setopt(han_, CURLOPT_HTTPHEADER, headers_);
00092 setopt(han_, CURLOPT_URL, consumerName_.c_str());
00093 setopt(han_, CURLOPT_WRITEFUNCTION, func);
00094
00095
00096
00097 }
00098
00099
00100
00101
00102
00103 if (rateRequest_ < (1.0 / MAX_ACCEPT_INTERVAL))
00104 {
00105 minTimeBetweenEvents_ = MAX_ACCEPT_INTERVAL;
00106 }
00107 else
00108 {
00109 minTimeBetweenEvents_ = 1.0 / rateRequest_;
00110 }
00111 lastConsideredEventTime_ = BaseCounter::getCurrentTime();
00112 rateRequestCounter_.reset(new RollingSampleCounter(50,1,60,RollingSampleCounter::INCLUDE_SAMPLES_IMMEDIATELY));
00113
00114
00115 longTermDesiredCounter_.reset(new ForeverCounter());
00116 shortTermDesiredCounter_.reset(new RollingIntervalCounter(180,5,20));
00117 longTermQueuedCounter_.reset(new ForeverCounter());
00118 shortTermQueuedCounter_.reset(new RollingIntervalCounter(180,5,20));
00119 longTermServedCounter_.reset(new ForeverCounter());
00120 shortTermServedCounter_.reset(new RollingIntervalCounter(180,5,20));
00121
00122 ltQueueSizeWhenDesiredCounter_.reset(new ForeverCounter());
00123 stQueueSizeWhenDesiredCounter_.reset(new RollingIntervalCounter(180,5,20));
00124 ltQueueSizeWhenQueuedCounter_.reset(new ForeverCounter());
00125 stQueueSizeWhenQueuedCounter_.reset(new RollingIntervalCounter(180,5,20));
00126 }
00127
00131 ConsumerPipe::~ConsumerPipe()
00132 {
00133 FDEBUG(5) << "Executing destructor for consumer pipe with ID = " <<
00134 consumerId_ << std::endl;
00135 curl_slist_free_all(headers_);
00136 curl_easy_cleanup(han_);
00137 }
00138
00142 uint32 ConsumerPipe::getConsumerId() const
00143 {
00144 return consumerId_;
00145 }
00146
00152 void ConsumerPipe::initializeSelection(Strings const& fullTriggerList,
00153 uint32 outputModuleId)
00154 {
00155 FDEBUG(5) << "Initializing consumer pipe, ID = " <<
00156 consumerId_ << std::endl;
00157
00158
00159 hltOutputModuleId_ = outputModuleId;
00160
00161
00162 eventSelector_.reset(new EventSelector(triggerSelection_,
00163 fullTriggerList));
00164
00165 initializationDone = true;
00166
00167 }
00168
00173 bool ConsumerPipe::isActive() const
00174 {
00175 time_t timeDiff = time(NULL) - lastEventRequestTime_;
00176 return (timeDiff < timeToIdleState_);
00177 }
00178
00184 bool ConsumerPipe::isIdle() const
00185 {
00186 time_t timeDiff = time(NULL) - lastEventRequestTime_;
00187 return (timeDiff >= timeToIdleState_ &&
00188 timeDiff < timeToDisconnectedState_);
00189 }
00190
00194 bool ConsumerPipe::isDisconnected() const
00195 {
00196 time_t timeDiff = time(NULL) - lastEventRequestTime_;
00197 return (timeDiff >= timeToDisconnectedState_);
00198 }
00199
00207 bool ConsumerPipe::isReadyForEvent(double currentTime) const
00208 {
00209
00210
00211 if (! initializationDone) {return false;}
00212 if (! this->isActive()) {return false;}
00213
00214
00215
00216
00217
00218
00219
00220
00221
00222
00223
00224 if (! rateRequestCounter_->hasValidResult()) {
00225 return ((currentTime - lastConsideredEventTime_) >= minTimeBetweenEvents_);
00226 }
00227 else {
00228 return (rateRequestCounter_->getSampleRate(currentTime) < rateRequest_);
00229 }
00230 }
00231
00238 bool ConsumerPipe::wantsEvent(EventMsgView const& eventView) const
00239 {
00240
00241
00242 if (! initializationDone) {return false;}
00243 if (! this->isActive()) {return false;}
00244
00245
00246 if (! this->isProxyServer() &&
00247 eventView.outModId() != hltOutputModuleId_) {return false;}
00248
00249
00250 std::vector<unsigned char> hlt_out;
00251 hlt_out.resize(1 + (eventView.hltCount()-1)/4);
00252 eventView.hltTriggerBits(&hlt_out[0]);
00253 int num_paths = eventView.hltCount();
00254 bool rc = (eventSelector_->wantAll() || eventSelector_->acceptEvent(&hlt_out[0], num_paths));
00255
00256
00257
00258 if (rc) {
00259 double now = BaseCounter::getCurrentTime();
00260 double sizeInMB = static_cast<double>(eventView.size()) / 1048576.0;
00261 ltQueueSizeWhenDesiredCounter_->addSample(eventQueue_.size());
00262 stQueueSizeWhenDesiredCounter_->addSample(eventQueue_.size(), now);
00263 longTermDesiredCounter_->addSample(sizeInMB);
00264 shortTermDesiredCounter_->addSample(sizeInMB, now);
00265 }
00266 return rc;
00267 }
00268
00283 void ConsumerPipe::wasConsidered(double currentTime)
00284 {
00285 lastConsideredEventTime_ = currentTime;
00286 rateRequestCounter_->addSample(1.0, currentTime);
00287 }
00288
00292 void ConsumerPipe::putEvent(boost::shared_ptr< std::vector<char> > bufPtr)
00293 {
00294
00295 boost::mutex::scoped_lock scopedLockForEventQueue(eventQueueLock_);
00296
00297
00298 double now = BaseCounter::getCurrentTime();
00299 double sizeInMB = static_cast<double>(bufPtr->size()) / 1048576.0;
00300 ltQueueSizeWhenQueuedCounter_->addSample(eventQueue_.size());
00301 stQueueSizeWhenQueuedCounter_->addSample(eventQueue_.size(), now);
00302 longTermQueuedCounter_->addSample(sizeInMB);
00303 shortTermQueuedCounter_->addSample(sizeInMB, now);
00304
00305 eventQueue_.push_back(bufPtr);
00306
00307 while (eventQueue_.size() > maxQueueSize_) {
00308 eventQueue_.pop_front();
00309 }
00310
00311 if(pushMode_) {
00312 bool success = pushEvent();
00313
00314 if(!success) ++pushEventFailures_;
00315 else
00316 {
00317 lastEventRequestTime_ = time(NULL);
00318 ++events_;
00319
00320 longTermServedCounter_->addSample(sizeInMB);
00321 shortTermServedCounter_->addSample(sizeInMB, now);
00322 }
00323 }
00324 }
00325
00331 boost::shared_ptr< std::vector<char> > ConsumerPipe::getEvent()
00332 {
00333
00334 if (isIdle() || isDisconnected())
00335 {
00336 this->clearQueue();
00337 }
00338
00339
00340 boost::shared_ptr< std::vector<char> > bufPtr;
00341 {
00342 boost::mutex::scoped_lock scopedLockForEventQueue(eventQueueLock_);
00343 if (! eventQueue_.empty())
00344 {
00345 bufPtr = eventQueue_.front();
00346 eventQueue_.pop_front();
00347
00348
00349 ++events_;
00350 double sizeInMB = static_cast<double>(bufPtr->size()) / 1048576.0;
00351 longTermServedCounter_->addSample(sizeInMB);
00352 shortTermServedCounter_->addSample(sizeInMB);
00353 }
00354 }
00355
00356
00357 lastEventRequestTime_ = time(NULL);
00358
00359
00360 return bufPtr;
00361 }
00362
00363 bool ConsumerPipe::pushEvent()
00364 {
00365
00366 boost::shared_ptr< std::vector<char> > bufPtr;
00367 {
00368 boost::mutex::scoped_lock scopedLockForEventQueue(eventQueueLock_);
00369 if (! eventQueue_.empty())
00370 {
00371 bufPtr = eventQueue_.front();
00372 eventQueue_.pop_front();
00373 }
00374 }
00375 if (bufPtr.get() == NULL)
00376 {
00377 edm::LogError("pushEvent") << "========================================";
00378 edm::LogError("pushEvent") << "pushEvent called with empty event queue!";
00379 return false;
00380 }
00381
00382
00383 FDEBUG(5) << "pushing out event to " << consumerName_ << std::endl;
00384 stor::ReadData data;
00385
00386 data.d_.clear();
00387
00388 if(han_==0)
00389 {
00390 han_ = curl_easy_init();
00391 if(han_==0)
00392 {
00393 edm::LogError("pushEvent") << "Could not create curl handle";
00394 return false;
00395 }
00396 headers_ = curl_slist_append(headers_, "Content-Type: application/octet-stream");
00397 headers_ = curl_slist_append(headers_, "Content-Transfer-Encoding: binary");
00398
00399 headers_ = curl_slist_append(headers_, "Expect:");
00400 setopt(han_, CURLOPT_HTTPHEADER, headers_);
00401 setopt(han_, CURLOPT_URL, consumerName_.c_str());
00402 setopt(han_, CURLOPT_WRITEFUNCTION, func);
00403 }
00404
00405 setopt(han_,CURLOPT_WRITEDATA,&data);
00406
00407
00408 EventMsgView msgView(&(*bufPtr)[0]);
00409
00410
00411 setopt(han_, CURLOPT_POSTFIELDS, msgView.startAddress());
00412 setopt(han_, CURLOPT_POSTFIELDSIZE, msgView.size());
00413
00414
00415
00416
00417 setopt(han_,CURLOPT_FORBID_REUSE, 1);
00418 CURLcode messageStatus = curl_easy_perform(han_);
00419
00420 if(messageStatus!=0)
00421 {
00422 cerr << "curl perform failed for pushEvent" << endl;
00423 edm::LogError("pushEvent") << "curl perform failed for pushEvent. "
00424 << "Could not register: probably XDAQ not running on Storage Manager"
00425 << " at " << consumerName_;
00426 return false;
00427 }
00428
00429 if(data.d_.length() == 0)
00430 {
00431 return true;
00432 } else {
00433 if(data.d_.length() > 0) {
00434 std::vector<char> buf(1024);
00435 int len = data.d_.length();
00436 buf.resize(len);
00437 for (int i=0; i<len ; i++) buf[i] = data.d_[i];
00438 const unsigned int MAX_DUMP_LENGTH = 1000;
00439 edm::LogError("pushEvent") << "========================================";
00440 edm::LogError("pushEvent") << "Unexpected pushEvent response!";
00441 if (data.d_.length() <= MAX_DUMP_LENGTH) {
00442 edm::LogError("pushEvent") << "Here is the raw text that was returned:";
00443 edm::LogError("pushEvent") << data.d_;
00444 }
00445 else {
00446 edm::LogError("pushEvent") << "Here are the first " << MAX_DUMP_LENGTH <<
00447 " characters of the raw text that was returned:";
00448 edm::LogError("pushEvent") << (data.d_.substr(0, MAX_DUMP_LENGTH));
00449 }
00450 edm::LogError("pushEvent") << "========================================";
00451 }
00452 }
00453 return false;
00454 }
00455
00456 void ConsumerPipe::clearQueue()
00457 {
00458 boost::mutex::scoped_lock scopedLockForEventQueue(eventQueueLock_);
00459 eventQueue_.clear();
00460 }
00461
00462 std::vector<std::string> ConsumerPipe::getTriggerRequest() const
00463 {
00464 return triggerSelection_;
00465 }
00466
00467 void ConsumerPipe::setRegistryWarning(std::string const& message)
00468 {
00469
00470 std::vector<char> warningBuff(message.size());;
00471 std::copy(message.begin(), message.end(), warningBuff.begin());
00472 setRegistryWarning(warningBuff);
00473 }
00474
00475 void ConsumerPipe::setRegistryWarning(std::vector<char> const& message)
00476 {
00477
00478
00479
00480
00481 registryWarningMessage_ = message;
00482 registryWarningWasReported_ = true;
00483 }
00484
00489 long long ConsumerPipe::getEventCount(STATS_TIME_FRAME timeFrame,
00490 STATS_SAMPLE_TYPE sampleType,
00491 double currentTime)
00492 {
00493 if (timeFrame == SHORT_TERM) {
00494 if (sampleType == QUEUED_EVENTS) {
00495 return shortTermQueuedCounter_->getSampleCount(currentTime);
00496 }
00497 else if (sampleType == DESIRED_EVENTS) {
00498 return shortTermDesiredCounter_->getSampleCount(currentTime);
00499 }
00500 else {
00501 return shortTermServedCounter_->getSampleCount(currentTime);
00502 }
00503 }
00504 else {
00505 if (sampleType == QUEUED_EVENTS) {
00506 return longTermQueuedCounter_->getSampleCount();
00507 }
00508 else if (sampleType == DESIRED_EVENTS) {
00509 return longTermDesiredCounter_->getSampleCount();
00510 }
00511 else {
00512 return longTermServedCounter_->getSampleCount();
00513 }
00514 }
00515 }
00516
00521 double ConsumerPipe::getEventRate(STATS_TIME_FRAME timeFrame,
00522 STATS_SAMPLE_TYPE sampleType,
00523 double currentTime)
00524 {
00525 if (timeFrame == SHORT_TERM) {
00526 if (sampleType == QUEUED_EVENTS) {
00527 return shortTermQueuedCounter_->getSampleRate(currentTime);
00528 }
00529 else if (sampleType == DESIRED_EVENTS) {
00530 return shortTermDesiredCounter_->getSampleRate(currentTime);
00531 }
00532 else {
00533 return shortTermServedCounter_->getSampleRate(currentTime);
00534 }
00535 }
00536 else {
00537 if (sampleType == QUEUED_EVENTS) {
00538 return longTermQueuedCounter_->getSampleRate(currentTime);
00539 }
00540 else if (sampleType == DESIRED_EVENTS) {
00541 return longTermDesiredCounter_->getSampleRate(currentTime);
00542 }
00543 else {
00544 return longTermServedCounter_->getSampleRate(currentTime);
00545 }
00546 }
00547 }
00548
00553 double ConsumerPipe::getDataRate(STATS_TIME_FRAME timeFrame,
00554 STATS_SAMPLE_TYPE sampleType,
00555 double currentTime)
00556 {
00557 if (timeFrame == SHORT_TERM) {
00558 if (sampleType == QUEUED_EVENTS) {
00559 return shortTermQueuedCounter_->getValueRate(currentTime);
00560 }
00561 else if (sampleType == DESIRED_EVENTS) {
00562 return shortTermDesiredCounter_->getValueRate(currentTime);
00563 }
00564 else {
00565 return shortTermServedCounter_->getValueRate(currentTime);
00566 }
00567 }
00568 else {
00569 if (sampleType == QUEUED_EVENTS) {
00570 return longTermQueuedCounter_->getValueRate(currentTime);
00571 }
00572 else if (sampleType == DESIRED_EVENTS) {
00573 return longTermDesiredCounter_->getValueRate(currentTime);
00574 }
00575 else {
00576 return longTermServedCounter_->getValueRate(currentTime);
00577 }
00578 }
00579 }
00580
00587 double ConsumerPipe::getDuration(STATS_TIME_FRAME timeFrame,
00588 STATS_SAMPLE_TYPE sampleType,
00589 double currentTime)
00590 {
00591 if (timeFrame == SHORT_TERM) {
00592 if (sampleType == QUEUED_EVENTS) {
00593 return shortTermQueuedCounter_->getDuration(currentTime);
00594 }
00595 else if (sampleType == DESIRED_EVENTS) {
00596 return shortTermDesiredCounter_->getDuration(currentTime);
00597 }
00598 else {
00599 return shortTermServedCounter_->getDuration(currentTime);
00600 }
00601 }
00602 else {
00603 if (sampleType == QUEUED_EVENTS) {
00604 return longTermQueuedCounter_->getDuration(currentTime);
00605 }
00606 else if (sampleType == DESIRED_EVENTS) {
00607 return longTermDesiredCounter_->getDuration(currentTime);
00608 }
00609 else {
00610 return longTermServedCounter_->getDuration(currentTime);
00611 }
00612 }
00613 }
00614
00622 double ConsumerPipe::getAverageQueueSize(STATS_TIME_FRAME timeFrame,
00623 STATS_SAMPLE_TYPE sampleType,
00624 double currentTime)
00625 {
00626 if (timeFrame == SHORT_TERM) {
00627 if (sampleType == QUEUED_EVENTS) {
00628 return stQueueSizeWhenQueuedCounter_->getValueAverage(currentTime);
00629 }
00630 else {
00631 return stQueueSizeWhenDesiredCounter_->getValueAverage(currentTime);
00632 }
00633 }
00634 else {
00635 if (sampleType == QUEUED_EVENTS) {
00636 return ltQueueSizeWhenQueuedCounter_->getValueAverage();
00637 }
00638 else {
00639 return ltQueueSizeWhenDesiredCounter_->getValueAverage();
00640 }
00641 }
00642 }