00001
00012 #include "EventFilter/StorageManager/interface/DQMConsumerPipe.h"
00013 #include "FWCore/Utilities/interface/DebugMacros.h"
00014 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00015
00016
00017
00018
00019 using namespace std;
00020 using namespace stor;
00021 using namespace edm;
00022
00026 uint32 DQMConsumerPipe::rootId_ = 1;
00027
00031 boost::mutex DQMConsumerPipe::rootIdLock_;
00032
00036 DQMConsumerPipe::DQMConsumerPipe(std::string name, std::string priority,
00037 int activeTimeout, int idleTimeout,
00038 std::string folderName, std::string hostName,
00039 int queueSize):
00040 han_(curl_easy_init()),
00041 headers_(),
00042 consumerName_(name),consumerPriority_(priority),
00043 topFolderName_(folderName),
00044 hostName_(hostName),
00045 events_(0),
00046 pushEventFailures_(0),
00047 maxQueueSize_(queueSize)
00048 {
00049
00050 timeToIdleState_ = activeTimeout;
00051 timeToDisconnectedState_ = activeTimeout + idleTimeout;
00052 lastEventRequestTime_ = time(NULL);
00053 initializationDone = false;
00054 pushMode_ = false;
00055 if(consumerPriority_.compare("PushMode") == 0) pushMode_ = true;
00056
00057
00058 consumerIsProxyServer_ = false;
00059
00060 if (consumerName_.find("urn") != std::string::npos &&
00061 consumerName_.find("xdaq") != std::string::npos &&
00062 consumerName_.find("pushEventData") != std::string::npos)
00063 {
00064 consumerIsProxyServer_ = true;
00065 }
00066
00067
00068 boost::mutex::scoped_lock scopedLockForRootId(rootIdLock_);
00069 consumerId_ = rootId_;
00070 rootId_++;
00071
00072 if(han_==0)
00073 {
00074 edm::LogError("DQMConsumerPipe") << "Could not create curl handle";
00075
00076
00077
00078 } else {
00079 headers_ = curl_slist_append(headers_, "Content-Type: application/octet-stream");
00080 headers_ = curl_slist_append(headers_, "Content-Transfer-Encoding: binary");
00081
00082
00083 headers_ = curl_slist_append(headers_, "Expect:");
00084 setopt(han_, CURLOPT_HTTPHEADER, headers_);
00085 setopt(han_, CURLOPT_URL, consumerName_.c_str());
00086 setopt(han_, CURLOPT_WRITEFUNCTION, func);
00087
00088
00089
00090 }
00091 }
00092
00096 DQMConsumerPipe::~DQMConsumerPipe()
00097 {
00098 FDEBUG(5) << "Executing destructor for DQM consumer pipe with ID = " <<
00099 consumerId_ << std::endl;
00100 curl_slist_free_all(headers_);
00101 curl_easy_cleanup(han_);
00102 }
00103
00107 uint32 DQMConsumerPipe::getConsumerId() const
00108 {
00109 return consumerId_;
00110 }
00111
00117 void DQMConsumerPipe::initializeSelection()
00118 {
00119 FDEBUG(5) << "Initializing DQM consumer pipe, ID = " <<
00120 consumerId_ << std::endl;
00121
00122
00123 initializationDone = true;
00124
00125 }
00126
00132 bool DQMConsumerPipe::isIdle() const
00133 {
00134 time_t timeDiff = time(NULL) - lastEventRequestTime_;
00135 return (timeDiff >= timeToIdleState_ &&
00136 timeDiff < timeToDisconnectedState_);
00137 }
00138
00142 bool DQMConsumerPipe::isDisconnected() const
00143 {
00144 time_t timeDiff = time(NULL) - lastEventRequestTime_;
00145 return (timeDiff >= timeToDisconnectedState_);
00146 }
00147
00151 bool DQMConsumerPipe::isReadyForEvent() const
00152 {
00153
00154 if (! initializationDone) return false;
00155
00156
00157 time_t timeDiff = time(NULL) - lastEventRequestTime_;
00158 return (timeDiff < timeToIdleState_);
00159 }
00160
00164 bool DQMConsumerPipe::wantsDQMEvent(DQMEventMsgView const& eventView) const
00165 {
00166
00167 std::string meansEverything = "*";
00168 if(topFolderName_.compare(meansEverything) == 0) return true;
00169 else return (topFolderName_.compare(eventView.topFolderName()) == 0);
00170 }
00171
00175 void DQMConsumerPipe::putDQMEvent(boost::shared_ptr< std::vector<char> > bufPtr)
00176 {
00177
00178 boost::mutex::scoped_lock scopedLockForEventQueue(eventQueueLock_);
00179 eventQueue_.push_back(bufPtr);
00180 while (eventQueue_.size() > maxQueueSize_) {
00181 eventQueue_.pop_front();
00182 }
00183
00184 if(pushMode_) {
00185 bool success = pushEvent();
00186
00187 if(!success) ++pushEventFailures_;
00188 else
00189 {
00190 lastEventRequestTime_ = time(NULL);
00191 ++events_;
00192 }
00193 }
00194 }
00195
00201 boost::shared_ptr< std::vector<char> > DQMConsumerPipe::getDQMEvent()
00202 {
00203
00204 if (isIdle() || isDisconnected())
00205 {
00206 this->clearQueue();
00207 }
00208
00209
00210 boost::shared_ptr< std::vector<char> > bufPtr;
00211 {
00212 boost::mutex::scoped_lock scopedLockForEventQueue(eventQueueLock_);
00213 if (! eventQueue_.empty())
00214 {
00215 bufPtr = eventQueue_.front();
00216 eventQueue_.pop_front();
00217 ++events_;
00218 }
00219 }
00220
00221
00222 lastEventRequestTime_ = time(NULL);
00223
00224
00225 return bufPtr;
00226 }
00227
00228 bool DQMConsumerPipe::pushEvent()
00229 {
00230
00231 boost::shared_ptr< std::vector<char> > bufPtr;
00232 {
00233 boost::mutex::scoped_lock scopedLockForEventQueue(eventQueueLock_);
00234 if (! eventQueue_.empty())
00235 {
00236 bufPtr = eventQueue_.front();
00237 eventQueue_.pop_front();
00238 }
00239 }
00240 if (bufPtr.get() == NULL)
00241 {
00242 edm::LogError("pushEvent") << "========================================";
00243 edm::LogError("pushEvent") << "pushEvent called with empty event queue!";
00244 return false;
00245 }
00246
00247
00248 FDEBUG(5) << "pushing out DQMevent to " << consumerName_ << std::endl;
00249 stor::ReadData data;
00250
00251 data.d_.clear();
00252
00253 if(han_==0)
00254 {
00255 han_ = curl_easy_init();
00256 if(han_==0)
00257 {
00258 edm::LogError("pushEvent") << "Could not create curl handle";
00259 return false;
00260 }
00261 headers_ = curl_slist_append(headers_, "Content-Type: application/octet-stream");
00262 headers_ = curl_slist_append(headers_, "Content-Transfer-Encoding: binary");
00263
00264 headers_ = curl_slist_append(headers_, "Expect:");
00265 setopt(han_, CURLOPT_HTTPHEADER, headers_);
00266 setopt(han_, CURLOPT_URL, consumerName_.c_str());
00267 setopt(han_, CURLOPT_WRITEFUNCTION, func);
00268 }
00269 setopt(han_,CURLOPT_WRITEDATA,&data);
00270
00271
00272 DQMEventMsgView msgView(&(*bufPtr)[0]);
00273
00274
00275 setopt(han_, CURLOPT_POSTFIELDS, msgView.startAddress());
00276 setopt(han_, CURLOPT_POSTFIELDSIZE, msgView.size());
00277
00278
00279
00280
00281 setopt(han_,CURLOPT_FORBID_REUSE, 1);
00282 CURLcode messageStatus = curl_easy_perform(han_);
00283
00284 if(messageStatus!=0)
00285 {
00286 cerr << "curl perform failed for pushDQMEvent" << endl;
00287 edm::LogError("pushEvent") << "curl perform failed for pushDQMEvent. "
00288 << "Could not register: probably XDAQ not running on Storage Manager"
00289 << " at " << consumerName_;
00290 return false;
00291 }
00292
00293 if(data.d_.length() == 0)
00294 {
00295 return true;
00296 } else {
00297 if(data.d_.length() > 0) {
00298 std::vector<char> buf(1024);
00299 int len = data.d_.length();
00300 buf.resize(len);
00301 for (int i=0; i<len ; i++) buf[i] = data.d_[i];
00302 const unsigned int MAX_DUMP_LENGTH = 1000;
00303 edm::LogError("pushEvent") << "========================================";
00304 edm::LogError("pushEvent") << "Unexpected pushDQMEvent response!";
00305 if (data.d_.length() <= MAX_DUMP_LENGTH) {
00306 edm::LogError("pushEvent") << "Here is the raw text that was returned:";
00307 edm::LogError("pushEvent") << data.d_;
00308 }
00309 else {
00310 edm::LogError("pushEvent") << "Here are the first " << MAX_DUMP_LENGTH <<
00311 " characters of the raw text that was returned:";
00312 edm::LogError("pushEvent") << (data.d_.substr(0, MAX_DUMP_LENGTH));
00313 }
00314 edm::LogError("pushEvent") << "========================================";
00315 }
00316 }
00317 return false;
00318 }
00319
00320 void DQMConsumerPipe::clearQueue()
00321 {
00322 boost::mutex::scoped_lock scopedLockForEventQueue(eventQueueLock_);
00323 eventQueue_.clear();
00324 }