CMS 3D CMS Logo

DQMConsumerPipe.cc

Go to the documentation of this file.
00001 
00012 #include "EventFilter/StorageManager/interface/DQMConsumerPipe.h"
00013 #include "FWCore/Utilities/interface/DebugMacros.h"
00014 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00015 
00016 // keep this for debugging
00017 //#include "IOPool/Streamer/interface/DumpTools.h"
00018 
00019 using namespace std;
00020 using namespace stor;
00021 using namespace edm;
00022 
00026 uint32 DQMConsumerPipe::rootId_ = 1;
00027 
00031 boost::mutex DQMConsumerPipe::rootIdLock_;
00032 
00036 DQMConsumerPipe::DQMConsumerPipe(std::string name, std::string priority,
00037                                  int activeTimeout, int idleTimeout,
00038                                  std::string folderName, std::string hostName,
00039                                  int queueSize):
00040   han_(curl_easy_init()),
00041   headers_(),
00042   consumerName_(name),consumerPriority_(priority),
00043   topFolderName_(folderName),
00044   hostName_(hostName),
00045   events_(0),
00046   pushEventFailures_(0),
00047   maxQueueSize_(queueSize)
00048 {
00049   // initialize the time values we use for defining "states"
00050   timeToIdleState_ = activeTimeout;
00051   timeToDisconnectedState_ = activeTimeout + idleTimeout;
00052   lastEventRequestTime_ = time(NULL);
00053   initializationDone = false;
00054   pushMode_ = false;
00055   if(consumerPriority_.compare("PushMode") == 0) pushMode_ = true;
00056 
00057   // determine if we're connected to a proxy server
00058   consumerIsProxyServer_ = false;
00059   //if (consumerName_ == PROXY_SERVER_NAME)
00060   if (consumerName_.find("urn") != std::string::npos &&
00061       consumerName_.find("xdaq") != std::string::npos &&
00062       consumerName_.find("pushEventData") != std::string::npos)
00063   {
00064     consumerIsProxyServer_ = true;
00065   }
00066 
00067   // assign the consumer ID
00068   boost::mutex::scoped_lock scopedLockForRootId(rootIdLock_);
00069   consumerId_ = rootId_;
00070   rootId_++;
00071 
00072   if(han_==0)
00073   {
00074     edm::LogError("DQMConsumerPipe") << "Could not create curl handle";
00075     //std::cout << "Could not create curl handle" << std::endl;
00076     // throw exception here when we can make the SM go to a fail state from
00077     // another thread
00078   } else {
00079     headers_ = curl_slist_append(headers_, "Content-Type: application/octet-stream");
00080     headers_ = curl_slist_append(headers_, "Content-Transfer-Encoding: binary");
00081     // Avoid the Expect: 100 continue automatic header that gives a 2 sec delay
00082     // for pthttps but we don't need the Expect: 100 continue anyway
00083     headers_ = curl_slist_append(headers_, "Expect:");
00084     setopt(han_, CURLOPT_HTTPHEADER, headers_);
00085     setopt(han_, CURLOPT_URL, consumerName_.c_str());
00086     setopt(han_, CURLOPT_WRITEFUNCTION, func);
00087     // debug options
00088     //setopt(han_,CURLOPT_VERBOSE, 1);
00089     //setopt(han_,CURLOPT_TCP_NODELAY, 1);
00090   }
00091 }
00092 
00096 DQMConsumerPipe::~DQMConsumerPipe()
00097 {
00098   FDEBUG(5) << "Executing destructor for DQM consumer pipe with ID = " <<
00099     consumerId_ << std::endl;
00100   curl_slist_free_all(headers_);
00101   curl_easy_cleanup(han_);
00102 }
00103 
00107 uint32 DQMConsumerPipe::getConsumerId() const
00108 {
00109   return consumerId_;
00110 }
00111 
00117 void DQMConsumerPipe::initializeSelection()
00118 {
00119   FDEBUG(5) << "Initializing DQM consumer pipe, ID = " <<
00120     consumerId_ << std::endl;
00121 
00122   // no need for initialization yet
00123   initializationDone = true;
00124 
00125 }
00126 
00132 bool DQMConsumerPipe::isIdle() const
00133 {
00134   time_t timeDiff = time(NULL) - lastEventRequestTime_;
00135   return (timeDiff >= timeToIdleState_ &&
00136           timeDiff <  timeToDisconnectedState_);
00137 }
00138 
00142 bool DQMConsumerPipe::isDisconnected() const
00143 {
00144   time_t timeDiff = time(NULL) - lastEventRequestTime_;
00145   return (timeDiff >= timeToDisconnectedState_);
00146 }
00147 
00151 bool DQMConsumerPipe::isReadyForEvent() const
00152 {
00153   // 13-Oct-2006, KAB - we're not ready if we haven't been initialized
00154   if (! initializationDone) return false;
00155 
00156   // for now, just test if we are in the active state
00157   time_t timeDiff = time(NULL) - lastEventRequestTime_;
00158   return (timeDiff < timeToIdleState_);
00159 }
00160 
00164 bool DQMConsumerPipe::wantsDQMEvent(DQMEventMsgView const& eventView) const
00165 {
00166   // for now, only allow one top folder selection or "*"
00167   std::string meansEverything = "*";
00168   if(topFolderName_.compare(meansEverything) == 0) return true;
00169   else return (topFolderName_.compare(eventView.topFolderName()) == 0);
00170 }
00171 
00175 void DQMConsumerPipe::putDQMEvent(boost::shared_ptr< std::vector<char> > bufPtr)
00176 {
00177   // update the local pointer to the most recent event
00178   boost::mutex::scoped_lock scopedLockForEventQueue(eventQueueLock_);
00179   eventQueue_.push_back(bufPtr);
00180   while (eventQueue_.size() > maxQueueSize_) {
00181     eventQueue_.pop_front();
00182   }
00183   // actually push out DQM data if this is a push mode consumer (SMProxyServer)
00184   if(pushMode_) {
00185     bool success = pushEvent();
00186     // update the time of the most recent successful transaction
00187     if(!success) ++pushEventFailures_;
00188     else 
00189     {
00190       lastEventRequestTime_ = time(NULL);
00191       ++events_;
00192     }
00193   }
00194 }
00195 
00201 boost::shared_ptr< std::vector<char> > DQMConsumerPipe::getDQMEvent()
00202 {
00203   // clear out any stale event(s)
00204   if (isIdle() || isDisconnected())
00205   {
00206     this->clearQueue();
00207   }
00208 
00209   // fetch the most recent event
00210   boost::shared_ptr< std::vector<char> > bufPtr;
00211   {
00212     boost::mutex::scoped_lock scopedLockForEventQueue(eventQueueLock_);
00213     if (! eventQueue_.empty())
00214     {
00215       bufPtr = eventQueue_.front();
00216       eventQueue_.pop_front();
00217       ++events_;
00218     }
00219   }
00220 
00221   // update the time of the most recent request
00222   lastEventRequestTime_ = time(NULL);
00223 
00224   // return the event
00225   return bufPtr;
00226 }
00227 
00228 bool DQMConsumerPipe::pushEvent()
00229 {
00230   // fetch the most recent event
00231   boost::shared_ptr< std::vector<char> > bufPtr;
00232   {
00233     boost::mutex::scoped_lock scopedLockForEventQueue(eventQueueLock_);
00234     if (! eventQueue_.empty())
00235     {
00236       bufPtr = eventQueue_.front();
00237       eventQueue_.pop_front();
00238     }
00239   }
00240   if (bufPtr.get() == NULL)
00241   {
00242     edm::LogError("pushEvent") << "========================================";
00243     edm::LogError("pushEvent") << "pushEvent called with empty event queue!";
00244     return false;
00245   }
00246 
00247   // push the next event out to a push mode consumer (SMProxyServer)
00248   FDEBUG(5) << "pushing out DQMevent to " << consumerName_ << std::endl;
00249   stor::ReadData data;
00250 
00251   data.d_.clear();
00252   // check if curl handle was obtained (at ctor) if not try again
00253   if(han_==0)
00254   {
00255     han_ = curl_easy_init();
00256     if(han_==0)
00257     {
00258       edm::LogError("pushEvent") << "Could not create curl handle";
00259       return false;
00260     }
00261     headers_ = curl_slist_append(headers_, "Content-Type: application/octet-stream");
00262     headers_ = curl_slist_append(headers_, "Content-Transfer-Encoding: binary");
00263     // Avoid the Expect: 100 continue automatic header that gives a 2 sec delay
00264     headers_ = curl_slist_append(headers_, "Expect:");
00265     setopt(han_, CURLOPT_HTTPHEADER, headers_);
00266     setopt(han_, CURLOPT_URL, consumerName_.c_str());
00267     setopt(han_, CURLOPT_WRITEFUNCTION, func);
00268   }
00269   setopt(han_,CURLOPT_WRITEDATA,&data);
00270 
00271   // build the event message
00272   DQMEventMsgView msgView(&(*bufPtr)[0]);
00273 
00274   // add the request message as a https post
00275   setopt(han_, CURLOPT_POSTFIELDS, msgView.startAddress());
00276   setopt(han_, CURLOPT_POSTFIELDSIZE, msgView.size());
00277 
00278   // send the HTTP POST, read the reply
00279   // explicitly close connection when using pthttps transport or sometimes it hangs
00280   // because somtimes curl does not see that the connection was closed and tries to reuse it
00281   setopt(han_,CURLOPT_FORBID_REUSE, 1);
00282   CURLcode messageStatus = curl_easy_perform(han_);
00283 
00284   if(messageStatus!=0)
00285   {
00286     cerr << "curl perform failed for pushDQMEvent" << endl;
00287     edm::LogError("pushEvent") << "curl perform failed for pushDQMEvent. "
00288         << "Could not register: probably XDAQ not running on Storage Manager"
00289         << " at " << consumerName_;
00290     return false;
00291   }
00292   // should really read the message to see if okay (if we had made one!)
00293   if(data.d_.length() == 0)
00294   {
00295     return true;
00296   } else {
00297     if(data.d_.length() > 0) {
00298       std::vector<char> buf(1024);
00299       int len = data.d_.length();
00300       buf.resize(len);
00301       for (int i=0; i<len ; i++) buf[i] = data.d_[i];
00302       const unsigned int MAX_DUMP_LENGTH = 1000;
00303       edm::LogError("pushEvent") << "========================================";
00304       edm::LogError("pushEvent") << "Unexpected pushDQMEvent response!";
00305       if (data.d_.length() <= MAX_DUMP_LENGTH) {
00306         edm::LogError("pushEvent") << "Here is the raw text that was returned:";
00307         edm::LogError("pushEvent") << data.d_;
00308       }
00309       else {
00310         edm::LogError("pushEvent") << "Here are the first " << MAX_DUMP_LENGTH <<
00311           " characters of the raw text that was returned:";
00312         edm::LogError("pushEvent") << (data.d_.substr(0, MAX_DUMP_LENGTH));
00313       }
00314       edm::LogError("pushEvent") << "========================================";
00315     }
00316   }
00317   return false;
00318 }
00319 
00320 void DQMConsumerPipe::clearQueue()
00321 {
00322   boost::mutex::scoped_lock scopedLockForEventQueue(eventQueueLock_);
00323   eventQueue_.clear();
00324 }

Generated on Tue Jun 9 17:34:55 2009 for CMSSW by  doxygen 1.5.4