CMS 3D CMS Logo

ConsumerPipe.cc

Go to the documentation of this file.
00001 
00010 #include "EventFilter/StorageManager/interface/ConsumerPipe.h"
00011 #include "FWCore/Utilities/interface/DebugMacros.h"
00012 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00013 
00014 // keep this for debugging
00015 //#include "IOPool/Streamer/interface/DumpTools.h"
00016 
00017 using namespace std;
00018 using namespace stor;
00019 using namespace edm;
00020 
00024 uint32 ConsumerPipe::rootId_ = 1;
00025 
00029 boost::mutex ConsumerPipe::rootIdLock_;
00030 
00034 const double ConsumerPipe::MAX_ACCEPT_INTERVAL = 86400.0;  // seconds in 1 day
00035 
00039 ConsumerPipe::ConsumerPipe(std::string name, std::string priority,
00040                            int activeTimeout, int idleTimeout,
00041                            Strings triggerSelection, double rateRequest,
00042                            std::string hltOutputSelection,
00043                            std::string hostName, int queueSize):
00044   han_(curl_easy_init()),
00045   headers_(),
00046   consumerName_(name),consumerPriority_(priority),
00047   events_(0),
00048   triggerSelection_(triggerSelection),
00049   rateRequest_(rateRequest),
00050   hltOutputSelection_(hltOutputSelection),
00051   hostName_(hostName),
00052   pushEventFailures_(0),
00053   maxQueueSize_(queueSize)
00054 {
00055   // initialize the time values we use for defining "states"
00056   timeToIdleState_ = activeTimeout;
00057   timeToDisconnectedState_ = activeTimeout + idleTimeout;
00058   lastEventRequestTime_ = time(NULL);
00059   initializationDone = false;
00060   pushMode_ = false;
00061   if(consumerPriority_.compare("PushMode") == 0) pushMode_ = true;
00062   registryWarningWasReported_ = false;
00063 
00064   // determine if we're connected to a proxy server
00065   consumerIsProxyServer_ = false;
00066   //if (consumerName_ == PROXY_SERVER_NAME)
00067   if (consumerName_.find("urn") != std::string::npos &&
00068       consumerName_.find("xdaq") != std::string::npos &&
00069       consumerName_.find("pushEventData") != std::string::npos)
00070   {
00071     consumerIsProxyServer_ = true;
00072   }
00073 
00074   // assign the consumer ID
00075   boost::mutex::scoped_lock scopedLockForRootId(rootIdLock_);
00076   consumerId_ = rootId_;
00077   rootId_++;
00078 
00079   if(han_==0)
00080   {
00081     edm::LogError("ConsumerPipe") << "Could not create curl handle";
00082     //std::cout << "Could not create curl handle" << std::endl;
00083     // throw exception here when we can make the SM go to a fail state from
00084     // another thread
00085   } else {
00086     headers_ = curl_slist_append(headers_, "Content-Type: application/octet-stream");
00087     headers_ = curl_slist_append(headers_, "Content-Transfer-Encoding: binary");
00088     // Avoid the Expect: 100 continue automatic header that gives a 2 sec delay
00089     // for pthttps but we don't need the Expect: 100 continue anyway
00090     headers_ = curl_slist_append(headers_, "Expect:");
00091     setopt(han_, CURLOPT_HTTPHEADER, headers_);
00092     setopt(han_, CURLOPT_URL, consumerName_.c_str());
00093     setopt(han_, CURLOPT_WRITEFUNCTION, func);
00094     // debug options
00095     //setopt(han_,CURLOPT_VERBOSE, 1);
00096     //setopt(han_,CURLOPT_TCP_NODELAY, 1);
00097   }
00098 
00099   // determine the amount of time that we need to wait between accepted
00100   // events.  The request rate specified to this constructor is
00101   // converted to an interval that is used internally, and the interval
00102   // is required to be somewhat reasonable.
00103   if (rateRequest_ < (1.0 / MAX_ACCEPT_INTERVAL))
00104   {
00105     minTimeBetweenEvents_ = MAX_ACCEPT_INTERVAL;
00106   }
00107   else
00108   {
00109     minTimeBetweenEvents_ = 1.0 / rateRequest_;  // seconds
00110   }
00111   lastConsideredEventTime_ = BaseCounter::getCurrentTime();
00112   rateRequestCounter_.reset(new RollingSampleCounter(50,1,60,RollingSampleCounter::INCLUDE_SAMPLES_IMMEDIATELY));
00113 
00114   // initialize the counters that we use for statistics
00115   longTermDesiredCounter_.reset(new ForeverCounter());
00116   shortTermDesiredCounter_.reset(new RollingIntervalCounter(180,5,20));
00117   longTermQueuedCounter_.reset(new ForeverCounter());
00118   shortTermQueuedCounter_.reset(new RollingIntervalCounter(180,5,20));
00119   longTermServedCounter_.reset(new ForeverCounter());
00120   shortTermServedCounter_.reset(new RollingIntervalCounter(180,5,20));
00121 
00122   ltQueueSizeWhenDesiredCounter_.reset(new ForeverCounter());
00123   stQueueSizeWhenDesiredCounter_.reset(new RollingIntervalCounter(180,5,20));
00124   ltQueueSizeWhenQueuedCounter_.reset(new ForeverCounter());
00125   stQueueSizeWhenQueuedCounter_.reset(new RollingIntervalCounter(180,5,20));
00126 }
00127 
00131 ConsumerPipe::~ConsumerPipe()
00132 {
00133   FDEBUG(5) << "Executing destructor for consumer pipe with ID = " <<
00134     consumerId_ << std::endl;
00135   curl_slist_free_all(headers_);
00136   curl_easy_cleanup(han_);
00137 }
00138 
00142 uint32 ConsumerPipe::getConsumerId() const
00143 {
00144   return consumerId_;
00145 }
00146 
00152 void ConsumerPipe::initializeSelection(Strings const& fullTriggerList,
00153                                        uint32 outputModuleId)
00154 {
00155   FDEBUG(5) << "Initializing consumer pipe, ID = " <<
00156     consumerId_ << std::endl;
00157 
00158   // store the output module id
00159   hltOutputModuleId_ = outputModuleId;
00160 
00161   // create our event selector
00162   eventSelector_.reset(new EventSelector(triggerSelection_,
00163                                          fullTriggerList));
00164   // indicate that initialization is complete
00165   initializationDone = true;
00166 
00167 }
00168 
00173 bool ConsumerPipe::isActive() const
00174 {
00175   time_t timeDiff = time(NULL) - lastEventRequestTime_;
00176   return (timeDiff < timeToIdleState_);
00177 }
00178 
00184 bool ConsumerPipe::isIdle() const
00185 {
00186   time_t timeDiff = time(NULL) - lastEventRequestTime_;
00187   return (timeDiff >= timeToIdleState_ &&
00188           timeDiff <  timeToDisconnectedState_);
00189 }
00190 
00194 bool ConsumerPipe::isDisconnected() const
00195 {
00196   time_t timeDiff = time(NULL) - lastEventRequestTime_;
00197   return (timeDiff >= timeToDisconnectedState_);
00198 }
00199 
00207 bool ConsumerPipe::isReadyForEvent(double currentTime) const
00208 {
00209   // we're not ready if we haven't yet been initialized
00210   // or are no longer active
00211   if (! initializationDone) {return false;}
00212   if (! this->isActive()) {return false;}
00213 
00214   // check if enough time has elapsed since the last event was considered.
00215   // 16-Apr-2008, KAB:  The simple method here would be to always use
00216   // "currentTime - lastTime" is greater-than-or-equal-to the minimum time
00217   // between events.  However, that doesn't provide enough rate, in my
00218   // opinion.  For example, let's say that the rate of triggers for a
00219   // particular consumer is 1.0 Hz, and the consumer rate request is 10 Hz.
00220   // The simple method will undershoot the 1.0 Hz because occasionally
00221   // an event is just under the 1.0 sec cutoff.  Using a longer time frame
00222   // (with a RollingSampleCounter or something else) allows us to provide
00223   // a more accurate requested rate.
00224   if (! rateRequestCounter_->hasValidResult()) {
00225     return ((currentTime - lastConsideredEventTime_) >= minTimeBetweenEvents_);
00226   }
00227   else {
00228     return (rateRequestCounter_->getSampleRate(currentTime) < rateRequest_);
00229   }
00230 }
00231 
00238 bool ConsumerPipe::wantsEvent(EventMsgView const& eventView) const
00239 {
00240   // we're not interested in events if we haven't yet been initialized
00241   // or are no longer active
00242   if (! initializationDone) {return false;}
00243   if (! this->isActive()) {return false;}
00244 
00245   // the event must be from the correct HLT output module
00246   if (! this->isProxyServer() &&
00247       eventView.outModId() != hltOutputModuleId_) {return false;}
00248 
00249   // get trigger bits for this event and check using eventSelector_
00250   std::vector<unsigned char> hlt_out;
00251   hlt_out.resize(1 + (eventView.hltCount()-1)/4);
00252   eventView.hltTriggerBits(&hlt_out[0]);
00253   int num_paths = eventView.hltCount();
00254   bool rc = (eventSelector_->wantAll() || eventSelector_->acceptEvent(&hlt_out[0], num_paths));
00255 
00256   // if we want this event, add it to our statistics for "desired"
00257   // or "acceptable" events.
00258   if (rc) {
00259     double now = BaseCounter::getCurrentTime();
00260     double sizeInMB = static_cast<double>(eventView.size()) / 1048576.0;
00261     ltQueueSizeWhenDesiredCounter_->addSample(eventQueue_.size());
00262     stQueueSizeWhenDesiredCounter_->addSample(eventQueue_.size(), now);
00263     longTermDesiredCounter_->addSample(sizeInMB);
00264     shortTermDesiredCounter_->addSample(sizeInMB, now);
00265   }
00266   return rc;
00267 }
00268 
00283 void ConsumerPipe::wasConsidered(double currentTime)
00284 {
00285   lastConsideredEventTime_ = currentTime;
00286   rateRequestCounter_->addSample(1.0, currentTime);
00287 }
00288 
00292 void ConsumerPipe::putEvent(boost::shared_ptr< std::vector<char> > bufPtr)
00293 {
00294   // add this event to the queue
00295   boost::mutex::scoped_lock scopedLockForEventQueue(eventQueueLock_);
00296 
00297   // add the event to our statistics for "queued" events
00298   double now = BaseCounter::getCurrentTime();
00299   double sizeInMB = static_cast<double>(bufPtr->size()) / 1048576.0;
00300   ltQueueSizeWhenQueuedCounter_->addSample(eventQueue_.size());
00301   stQueueSizeWhenQueuedCounter_->addSample(eventQueue_.size(), now);
00302   longTermQueuedCounter_->addSample(sizeInMB);
00303   shortTermQueuedCounter_->addSample(sizeInMB, now);
00304 
00305   eventQueue_.push_back(bufPtr);
00306 
00307   while (eventQueue_.size() > maxQueueSize_) {
00308     eventQueue_.pop_front();
00309   }
00310   // if a push mode consumer actually push the event out to SMProxyServer
00311   if(pushMode_) {
00312     bool success = pushEvent();
00313     // update the time of the most recent successful transaction
00314     if(!success) ++pushEventFailures_;
00315     else
00316     {
00317       lastEventRequestTime_ = time(NULL);
00318       ++events_;
00319       // add the event to our statistics for "served" events
00320       longTermServedCounter_->addSample(sizeInMB);
00321       shortTermServedCounter_->addSample(sizeInMB, now);
00322     }
00323   }
00324 }
00325 
00331 boost::shared_ptr< std::vector<char> > ConsumerPipe::getEvent()
00332 {
00333   // 25-Aug-2005, KAB: clear out any stale event(s)
00334   if (isIdle() || isDisconnected())
00335   {
00336     this->clearQueue();
00337   }
00338 
00339   // fetch the most recent event
00340   boost::shared_ptr< std::vector<char> > bufPtr;
00341   {
00342     boost::mutex::scoped_lock scopedLockForEventQueue(eventQueueLock_);
00343     if (! eventQueue_.empty())
00344     {
00345       bufPtr = eventQueue_.front();
00346       eventQueue_.pop_front();
00347 
00348       // add the event to our statistics for "served" events
00349       ++events_;
00350       double sizeInMB = static_cast<double>(bufPtr->size()) / 1048576.0;
00351       longTermServedCounter_->addSample(sizeInMB);
00352       shortTermServedCounter_->addSample(sizeInMB);
00353     }
00354   }
00355 
00356   // update the time of the most recent request
00357   lastEventRequestTime_ = time(NULL);
00358 
00359   // return the event
00360   return bufPtr;
00361 }
00362 
00363 bool ConsumerPipe::pushEvent()
00364 {
00365   // fetch the most recent event
00366   boost::shared_ptr< std::vector<char> > bufPtr;
00367   {
00368     boost::mutex::scoped_lock scopedLockForEventQueue(eventQueueLock_);
00369     if (! eventQueue_.empty())
00370     {
00371       bufPtr = eventQueue_.front();
00372       eventQueue_.pop_front();
00373     }
00374   }
00375   if (bufPtr.get() == NULL)
00376   {
00377     edm::LogError("pushEvent") << "========================================";
00378     edm::LogError("pushEvent") << "pushEvent called with empty event queue!";
00379     return false;
00380   }
00381 
00382   // push the next event out to a push mode consumer (SMProxyServer)
00383   FDEBUG(5) << "pushing out event to " << consumerName_ << std::endl;
00384   stor::ReadData data;
00385 
00386   data.d_.clear();
00387   // check if curl handle was obtained (at ctor) if not try again
00388   if(han_==0)
00389   {
00390     han_ = curl_easy_init();
00391     if(han_==0)
00392     {
00393       edm::LogError("pushEvent") << "Could not create curl handle";
00394       return false;
00395     }
00396     headers_ = curl_slist_append(headers_, "Content-Type: application/octet-stream");
00397     headers_ = curl_slist_append(headers_, "Content-Transfer-Encoding: binary");
00398     // Avoid the Expect: 100 continue automatic header that gives a 2 sec delay
00399     headers_ = curl_slist_append(headers_, "Expect:");
00400     setopt(han_, CURLOPT_HTTPHEADER, headers_);
00401     setopt(han_, CURLOPT_URL, consumerName_.c_str());
00402     setopt(han_, CURLOPT_WRITEFUNCTION, func);
00403   }
00404 
00405   setopt(han_,CURLOPT_WRITEDATA,&data);
00406 
00407   // build the event message
00408   EventMsgView msgView(&(*bufPtr)[0]);
00409 
00410   // add the request message as a https post
00411   setopt(han_, CURLOPT_POSTFIELDS, msgView.startAddress());
00412   setopt(han_, CURLOPT_POSTFIELDSIZE, msgView.size());
00413 
00414   // send the HTTP POST, read the reply
00415   // explicitly close connection when using pthttps transport or sometimes it hangs
00416   // because somtimes curl does not see that the connection was closed and tries to reuse it
00417   setopt(han_,CURLOPT_FORBID_REUSE, 1);
00418   CURLcode messageStatus = curl_easy_perform(han_);
00419 
00420   if(messageStatus!=0)
00421   {
00422     cerr << "curl perform failed for pushEvent" << endl;
00423     edm::LogError("pushEvent") << "curl perform failed for pushEvent. "
00424         << "Could not register: probably XDAQ not running on Storage Manager"
00425         << " at " << consumerName_;
00426     return false;
00427   }
00428   // should really read the message to see if okay (if we had made one!)
00429   if(data.d_.length() == 0)
00430   {
00431     return true;
00432   } else {
00433     if(data.d_.length() > 0) {
00434       std::vector<char> buf(1024);
00435       int len = data.d_.length();
00436       buf.resize(len);
00437       for (int i=0; i<len ; i++) buf[i] = data.d_[i];
00438       const unsigned int MAX_DUMP_LENGTH = 1000;
00439       edm::LogError("pushEvent") << "========================================";
00440       edm::LogError("pushEvent") << "Unexpected pushEvent response!";
00441       if (data.d_.length() <= MAX_DUMP_LENGTH) {
00442         edm::LogError("pushEvent") << "Here is the raw text that was returned:";
00443         edm::LogError("pushEvent") << data.d_;
00444       }
00445       else {
00446         edm::LogError("pushEvent") << "Here are the first " << MAX_DUMP_LENGTH <<
00447           " characters of the raw text that was returned:";
00448         edm::LogError("pushEvent") << (data.d_.substr(0, MAX_DUMP_LENGTH));
00449       }
00450       edm::LogError("pushEvent") << "========================================";
00451     }
00452   }
00453   return false;
00454 }
00455 
00456 void ConsumerPipe::clearQueue()
00457 {
00458   boost::mutex::scoped_lock scopedLockForEventQueue(eventQueueLock_);
00459   eventQueue_.clear();
00460 }
00461 
00462 std::vector<std::string> ConsumerPipe::getTriggerRequest() const
00463 {
00464   return triggerSelection_;
00465 }
00466 
00467 void ConsumerPipe::setRegistryWarning(std::string const& message)
00468 {
00469   // convert the string to a vector of char and then pass off the work
00470   std::vector<char> warningBuff(message.size());;
00471   std::copy(message.begin(), message.end(), warningBuff.begin());
00472   setRegistryWarning(warningBuff);
00473 }
00474 
00475 void ConsumerPipe::setRegistryWarning(std::vector<char> const& message)
00476 {
00477   // assign the registry warning text before setting the warning flag
00478   // to avoid race conditions in which the hasRegistryWarning() method
00479   // would return true but the message hasn't been set (simpler than
00480   // adding a mutex for the warning message string)
00481   registryWarningMessage_ = message;
00482   registryWarningWasReported_ = true;
00483 }
00484 
00489 long long ConsumerPipe::getEventCount(STATS_TIME_FRAME timeFrame,
00490                                       STATS_SAMPLE_TYPE sampleType,
00491                                       double currentTime)
00492 {
00493   if (timeFrame == SHORT_TERM) {
00494     if (sampleType == QUEUED_EVENTS) {
00495       return shortTermQueuedCounter_->getSampleCount(currentTime);
00496     }
00497     else if (sampleType == DESIRED_EVENTS) {
00498       return shortTermDesiredCounter_->getSampleCount(currentTime);
00499     }
00500     else {
00501       return shortTermServedCounter_->getSampleCount(currentTime);
00502     }
00503   }
00504   else {
00505     if (sampleType == QUEUED_EVENTS) {
00506       return longTermQueuedCounter_->getSampleCount();
00507     }
00508     else if (sampleType == DESIRED_EVENTS) {
00509       return longTermDesiredCounter_->getSampleCount();
00510     }
00511     else {
00512       return longTermServedCounter_->getSampleCount();
00513     }
00514   }
00515 }
00516 
00521 double ConsumerPipe::getEventRate(STATS_TIME_FRAME timeFrame,
00522                                   STATS_SAMPLE_TYPE sampleType,
00523                                   double currentTime)
00524 {
00525   if (timeFrame == SHORT_TERM) {
00526     if (sampleType == QUEUED_EVENTS) {
00527       return shortTermQueuedCounter_->getSampleRate(currentTime);
00528     }
00529     else if (sampleType == DESIRED_EVENTS) {
00530       return shortTermDesiredCounter_->getSampleRate(currentTime);
00531     }
00532     else {
00533       return shortTermServedCounter_->getSampleRate(currentTime);
00534     }
00535   }
00536   else {
00537     if (sampleType == QUEUED_EVENTS) {
00538       return longTermQueuedCounter_->getSampleRate(currentTime);
00539     }
00540     else if (sampleType == DESIRED_EVENTS) {
00541       return longTermDesiredCounter_->getSampleRate(currentTime);
00542     }
00543     else {
00544       return longTermServedCounter_->getSampleRate(currentTime);
00545     }
00546   }
00547 }
00548 
00553 double ConsumerPipe::getDataRate(STATS_TIME_FRAME timeFrame,
00554                                  STATS_SAMPLE_TYPE sampleType,
00555                                  double currentTime)
00556 {
00557   if (timeFrame == SHORT_TERM) {
00558     if (sampleType == QUEUED_EVENTS) {
00559       return shortTermQueuedCounter_->getValueRate(currentTime);
00560     }
00561     else if (sampleType == DESIRED_EVENTS) {
00562       return shortTermDesiredCounter_->getValueRate(currentTime);
00563     }
00564     else {
00565       return shortTermServedCounter_->getValueRate(currentTime);
00566     }
00567   }
00568   else {
00569     if (sampleType == QUEUED_EVENTS) {
00570       return longTermQueuedCounter_->getValueRate(currentTime);
00571     }
00572     else if (sampleType == DESIRED_EVENTS) {
00573       return longTermDesiredCounter_->getValueRate(currentTime);
00574     }
00575     else {
00576       return longTermServedCounter_->getValueRate(currentTime);
00577     }
00578   }
00579 }
00580 
00587 double ConsumerPipe::getDuration(STATS_TIME_FRAME timeFrame,
00588                                  STATS_SAMPLE_TYPE sampleType,
00589                                  double currentTime)
00590 {
00591   if (timeFrame == SHORT_TERM) {
00592     if (sampleType == QUEUED_EVENTS) {
00593       return shortTermQueuedCounter_->getDuration(currentTime);
00594     }
00595     else if (sampleType == DESIRED_EVENTS) {
00596       return shortTermDesiredCounter_->getDuration(currentTime);
00597     }
00598     else {
00599       return shortTermServedCounter_->getDuration(currentTime);
00600     }
00601   }
00602   else {
00603     if (sampleType == QUEUED_EVENTS) {
00604       return longTermQueuedCounter_->getDuration(currentTime);
00605     }
00606     else if (sampleType == DESIRED_EVENTS) {
00607       return longTermDesiredCounter_->getDuration(currentTime);
00608     }
00609     else {
00610       return longTermServedCounter_->getDuration(currentTime);
00611     }
00612   }
00613 }
00614 
00622 double ConsumerPipe::getAverageQueueSize(STATS_TIME_FRAME timeFrame,
00623                                          STATS_SAMPLE_TYPE sampleType,
00624                                          double currentTime)
00625 {
00626   if (timeFrame == SHORT_TERM) {
00627     if (sampleType == QUEUED_EVENTS) {
00628       return stQueueSizeWhenQueuedCounter_->getValueAverage(currentTime);
00629     }
00630     else {
00631       return stQueueSizeWhenDesiredCounter_->getValueAverage(currentTime);
00632     }
00633   }
00634   else {
00635     if (sampleType == QUEUED_EVENTS) {
00636       return ltQueueSizeWhenQueuedCounter_->getValueAverage();
00637     }
00638     else {
00639       return ltQueueSizeWhenDesiredCounter_->getValueAverage();
00640     }
00641   }
00642 }

Generated on Tue Jun 9 17:34:55 2009 for CMSSW by  doxygen 1.5.4