#include <EventFilter/StorageManager/interface/DQMConsumerPipe.h>
Public Member Functions | |
void | clearQueue () |
DQMConsumerPipe (std::string name, std::string priority, int activeTimeout, int idleTimeout, std::string folderName, std::string hostName, int queueSize) | |
DQMConsumerPipe constructor. | |
uint32 | getConsumerId () const |
Returns the consumer ID associated with this pipe. | |
std::string | getConsumerName () |
boost::shared_ptr< std::vector < char > > | getDQMEvent () |
Fetches the next event from this consumer pipe. | |
int | getEvents () |
std::string | getHostName () |
time_t | getLastEventRequestTime () |
int | getPushEventFailures () |
void | initializeSelection () |
Initializes the event selection for this consumer based on the list of available triggers stored in the specified InitMsgView and the request ParameterSet that was specified in the constructor. | |
bool | isDisconnected () const |
Tests if the consumer has disconnected. | |
bool | isIdle () const |
Tests if the consumer is idle (as opposed to active). | |
bool | isProxyServer () const |
bool | isReadyForEvent () const |
Tests if the consumer is ready for an event. | |
void | putDQMEvent (boost::shared_ptr< std::vector< char > > bufPtr) |
Adds the specified event to this consumer pipe. | |
void | setPushMode (bool mode) |
bool | wantsDQMEvent (DQMEventMsgView const &eventView) const |
Tests if the consumer wants the specified event. | |
~DQMConsumerPipe () | |
DQMConsumerPipe destructor. | |
Private Member Functions | |
bool | pushEvent () |
Private Attributes | |
uint32 | consumerId_ |
bool | consumerIsProxyServer_ |
std::string | consumerName_ |
std::string | consumerPriority_ |
std::deque< boost::shared_ptr < std::vector< char > > > | eventQueue_ |
boost::mutex | eventQueueLock_ |
int | events_ |
CURL * | han_ |
struct curl_slist * | headers_ |
std::string | hostName_ |
bool | initializationDone |
time_t | lastEventRequestTime_ |
unsigned int | maxQueueSize_ |
unsigned int | pushEventFailures_ |
bool | pushMode_ |
int | timeToDisconnectedState_ |
int | timeToIdleState_ |
std::string | topFolderName_ |
Static Private Attributes | |
static uint32 | rootId_ = 1 |
Initialize the static value for the root consumer id. | |
static boost::mutex | rootIdLock_ |
Initialize the static lock used to control access to the root ID. |
Definition at line 42 of file DQMConsumerPipe.h.
DQMConsumerPipe::DQMConsumerPipe | ( | std::string | name, | |
std::string | priority, | |||
int | activeTimeout, | |||
int | idleTimeout, | |||
std::string | folderName, | |||
std::string | hostName, | |||
int | queueSize | |||
) |
DQMConsumerPipe constructor.
Definition at line 36 of file DQMConsumerPipe.cc.
References consumerId_, consumerIsProxyServer_, consumerName_, consumerPriority_, stor::func(), han_, headers_, initializationDone, lastEventRequestTime_, NULL, pushMode_, rootId_, rootIdLock_, stor::setopt(), timeToDisconnectedState_, and timeToIdleState_.
00039 : 00040 han_(curl_easy_init()), 00041 headers_(), 00042 consumerName_(name),consumerPriority_(priority), 00043 topFolderName_(folderName), 00044 hostName_(hostName), 00045 events_(0), 00046 pushEventFailures_(0), 00047 maxQueueSize_(queueSize) 00048 { 00049 // initialize the time values we use for defining "states" 00050 timeToIdleState_ = activeTimeout; 00051 timeToDisconnectedState_ = activeTimeout + idleTimeout; 00052 lastEventRequestTime_ = time(NULL); 00053 initializationDone = false; 00054 pushMode_ = false; 00055 if(consumerPriority_.compare("PushMode") == 0) pushMode_ = true; 00056 00057 // determine if we're connected to a proxy server 00058 consumerIsProxyServer_ = false; 00059 //if (consumerName_ == PROXY_SERVER_NAME) 00060 if (consumerName_.find("urn") != std::string::npos && 00061 consumerName_.find("xdaq") != std::string::npos && 00062 consumerName_.find("pushEventData") != std::string::npos) 00063 { 00064 consumerIsProxyServer_ = true; 00065 } 00066 00067 // assign the consumer ID 00068 boost::mutex::scoped_lock scopedLockForRootId(rootIdLock_); 00069 consumerId_ = rootId_; 00070 rootId_++; 00071 00072 if(han_==0) 00073 { 00074 edm::LogError("DQMConsumerPipe") << "Could not create curl handle"; 00075 //std::cout << "Could not create curl handle" << std::endl; 00076 // throw exception here when we can make the SM go to a fail state from 00077 // another thread 00078 } else { 00079 headers_ = curl_slist_append(headers_, "Content-Type: application/octet-stream"); 00080 headers_ = curl_slist_append(headers_, "Content-Transfer-Encoding: binary"); 00081 // Avoid the Expect: 100 continue automatic header that gives a 2 sec delay 00082 // for pthttps but we don't need the Expect: 100 continue anyway 00083 headers_ = curl_slist_append(headers_, "Expect:"); 00084 setopt(han_, CURLOPT_HTTPHEADER, headers_); 00085 setopt(han_, CURLOPT_URL, consumerName_.c_str()); 00086 setopt(han_, CURLOPT_WRITEFUNCTION, func); 00087 // debug options 00088 //setopt(han_,CURLOPT_VERBOSE, 1); 00089 //setopt(han_,CURLOPT_TCP_NODELAY, 1); 00090 } 00091 }
DQMConsumerPipe::~DQMConsumerPipe | ( | ) |
DQMConsumerPipe destructor.
Definition at line 96 of file DQMConsumerPipe.cc.
References consumerId_, lat::endl(), FDEBUG, han_, and headers_.
00097 { 00098 FDEBUG(5) << "Executing destructor for DQM consumer pipe with ID = " << 00099 consumerId_ << std::endl; 00100 curl_slist_free_all(headers_); 00101 curl_easy_cleanup(han_); 00102 }
void DQMConsumerPipe::clearQueue | ( | ) |
Definition at line 320 of file DQMConsumerPipe.cc.
References eventQueue_, and eventQueueLock_.
Referenced by getDQMEvent().
00321 { 00322 boost::mutex::scoped_lock scopedLockForEventQueue(eventQueueLock_); 00323 eventQueue_.clear(); 00324 }
uint32 DQMConsumerPipe::getConsumerId | ( | ) | const |
Returns the consumer ID associated with this pipe.
Definition at line 107 of file DQMConsumerPipe.cc.
References consumerId_.
00108 { 00109 return consumerId_; 00110 }
std::string stor::DQMConsumerPipe::getConsumerName | ( | ) | [inline] |
Definition at line 63 of file DQMConsumerPipe.h.
References consumerName_.
00063 { return(consumerName_);}
boost::shared_ptr< std::vector< char > > DQMConsumerPipe::getDQMEvent | ( | ) |
Fetches the next event from this consumer pipe.
If there are no events in the pipe, an empty shared_ptr will be returned (ptr.get() == NULL).
Definition at line 201 of file DQMConsumerPipe.cc.
References clearQueue(), eventQueue_, eventQueueLock_, events_, isDisconnected(), isIdle(), lastEventRequestTime_, and NULL.
00202 { 00203 // clear out any stale event(s) 00204 if (isIdle() || isDisconnected()) 00205 { 00206 this->clearQueue(); 00207 } 00208 00209 // fetch the most recent event 00210 boost::shared_ptr< std::vector<char> > bufPtr; 00211 { 00212 boost::mutex::scoped_lock scopedLockForEventQueue(eventQueueLock_); 00213 if (! eventQueue_.empty()) 00214 { 00215 bufPtr = eventQueue_.front(); 00216 eventQueue_.pop_front(); 00217 ++events_; 00218 } 00219 } 00220 00221 // update the time of the most recent request 00222 lastEventRequestTime_ = time(NULL); 00223 00224 // return the event 00225 return bufPtr; 00226 }
int stor::DQMConsumerPipe::getEvents | ( | ) | [inline] |
std::string stor::DQMConsumerPipe::getHostName | ( | ) | [inline] |
time_t stor::DQMConsumerPipe::getLastEventRequestTime | ( | ) | [inline] |
Definition at line 66 of file DQMConsumerPipe.h.
References lastEventRequestTime_.
00066 { return(lastEventRequestTime_);}
int stor::DQMConsumerPipe::getPushEventFailures | ( | ) | [inline] |
Definition at line 64 of file DQMConsumerPipe.h.
References pushEventFailures_.
00064 { return(pushEventFailures_);}
void DQMConsumerPipe::initializeSelection | ( | ) |
Initializes the event selection for this consumer based on the list of available triggers stored in the specified InitMsgView and the request ParameterSet that was specified in the constructor.
Definition at line 117 of file DQMConsumerPipe.cc.
References consumerId_, lat::endl(), FDEBUG, and initializationDone.
00118 { 00119 FDEBUG(5) << "Initializing DQM consumer pipe, ID = " << 00120 consumerId_ << std::endl; 00121 00122 // no need for initialization yet 00123 initializationDone = true; 00124 00125 }
bool DQMConsumerPipe::isDisconnected | ( | ) | const |
Tests if the consumer has disconnected.
Definition at line 142 of file DQMConsumerPipe.cc.
References lastEventRequestTime_, NULL, and timeToDisconnectedState_.
Referenced by getDQMEvent().
00143 { 00144 time_t timeDiff = time(NULL) - lastEventRequestTime_; 00145 return (timeDiff >= timeToDisconnectedState_); 00146 }
bool DQMConsumerPipe::isIdle | ( | ) | const |
Tests if the consumer is idle (as opposed to active).
The idle state indicates that the consumer is still connected, but it hasn't requested an event in some time.
Definition at line 132 of file DQMConsumerPipe.cc.
References lastEventRequestTime_, NULL, timeToDisconnectedState_, and timeToIdleState_.
Referenced by getDQMEvent().
00133 { 00134 time_t timeDiff = time(NULL) - lastEventRequestTime_; 00135 return (timeDiff >= timeToIdleState_ && 00136 timeDiff < timeToDisconnectedState_); 00137 }
bool stor::DQMConsumerPipe::isProxyServer | ( | ) | const [inline] |
Definition at line 57 of file DQMConsumerPipe.h.
References consumerIsProxyServer_.
00057 { return consumerIsProxyServer_; }
bool DQMConsumerPipe::isReadyForEvent | ( | ) | const |
Tests if the consumer is ready for an event.
Definition at line 151 of file DQMConsumerPipe.cc.
References initializationDone, lastEventRequestTime_, NULL, and timeToIdleState_.
00152 { 00153 // 13-Oct-2006, KAB - we're not ready if we haven't been initialized 00154 if (! initializationDone) return false; 00155 00156 // for now, just test if we are in the active state 00157 time_t timeDiff = time(NULL) - lastEventRequestTime_; 00158 return (timeDiff < timeToIdleState_); 00159 }
bool DQMConsumerPipe::pushEvent | ( | ) | [private] |
Definition at line 228 of file DQMConsumerPipe.cc.
References TestMuL1L2Filter_cff::cerr, consumerName_, stor::ReadData::d_, data, lat::endl(), eventQueue_, eventQueueLock_, FDEBUG, stor::func(), han_, headers_, i, len, NULL, stor::setopt(), DQMEventMsgView::size(), and DQMEventMsgView::startAddress().
Referenced by putDQMEvent().
00229 { 00230 // fetch the most recent event 00231 boost::shared_ptr< std::vector<char> > bufPtr; 00232 { 00233 boost::mutex::scoped_lock scopedLockForEventQueue(eventQueueLock_); 00234 if (! eventQueue_.empty()) 00235 { 00236 bufPtr = eventQueue_.front(); 00237 eventQueue_.pop_front(); 00238 } 00239 } 00240 if (bufPtr.get() == NULL) 00241 { 00242 edm::LogError("pushEvent") << "========================================"; 00243 edm::LogError("pushEvent") << "pushEvent called with empty event queue!"; 00244 return false; 00245 } 00246 00247 // push the next event out to a push mode consumer (SMProxyServer) 00248 FDEBUG(5) << "pushing out DQMevent to " << consumerName_ << std::endl; 00249 stor::ReadData data; 00250 00251 data.d_.clear(); 00252 // check if curl handle was obtained (at ctor) if not try again 00253 if(han_==0) 00254 { 00255 han_ = curl_easy_init(); 00256 if(han_==0) 00257 { 00258 edm::LogError("pushEvent") << "Could not create curl handle"; 00259 return false; 00260 } 00261 headers_ = curl_slist_append(headers_, "Content-Type: application/octet-stream"); 00262 headers_ = curl_slist_append(headers_, "Content-Transfer-Encoding: binary"); 00263 // Avoid the Expect: 100 continue automatic header that gives a 2 sec delay 00264 headers_ = curl_slist_append(headers_, "Expect:"); 00265 setopt(han_, CURLOPT_HTTPHEADER, headers_); 00266 setopt(han_, CURLOPT_URL, consumerName_.c_str()); 00267 setopt(han_, CURLOPT_WRITEFUNCTION, func); 00268 } 00269 setopt(han_,CURLOPT_WRITEDATA,&data); 00270 00271 // build the event message 00272 DQMEventMsgView msgView(&(*bufPtr)[0]); 00273 00274 // add the request message as a https post 00275 setopt(han_, CURLOPT_POSTFIELDS, msgView.startAddress()); 00276 setopt(han_, CURLOPT_POSTFIELDSIZE, msgView.size()); 00277 00278 // send the HTTP POST, read the reply 00279 // explicitly close connection when using pthttps transport or sometimes it hangs 00280 // because somtimes curl does not see that the connection was closed and tries to reuse it 00281 setopt(han_,CURLOPT_FORBID_REUSE, 1); 00282 CURLcode messageStatus = curl_easy_perform(han_); 00283 00284 if(messageStatus!=0) 00285 { 00286 cerr << "curl perform failed for pushDQMEvent" << endl; 00287 edm::LogError("pushEvent") << "curl perform failed for pushDQMEvent. " 00288 << "Could not register: probably XDAQ not running on Storage Manager" 00289 << " at " << consumerName_; 00290 return false; 00291 } 00292 // should really read the message to see if okay (if we had made one!) 00293 if(data.d_.length() == 0) 00294 { 00295 return true; 00296 } else { 00297 if(data.d_.length() > 0) { 00298 std::vector<char> buf(1024); 00299 int len = data.d_.length(); 00300 buf.resize(len); 00301 for (int i=0; i<len ; i++) buf[i] = data.d_[i]; 00302 const unsigned int MAX_DUMP_LENGTH = 1000; 00303 edm::LogError("pushEvent") << "========================================"; 00304 edm::LogError("pushEvent") << "Unexpected pushDQMEvent response!"; 00305 if (data.d_.length() <= MAX_DUMP_LENGTH) { 00306 edm::LogError("pushEvent") << "Here is the raw text that was returned:"; 00307 edm::LogError("pushEvent") << data.d_; 00308 } 00309 else { 00310 edm::LogError("pushEvent") << "Here are the first " << MAX_DUMP_LENGTH << 00311 " characters of the raw text that was returned:"; 00312 edm::LogError("pushEvent") << (data.d_.substr(0, MAX_DUMP_LENGTH)); 00313 } 00314 edm::LogError("pushEvent") << "========================================"; 00315 } 00316 } 00317 return false; 00318 }
void DQMConsumerPipe::putDQMEvent | ( | boost::shared_ptr< std::vector< char > > | bufPtr | ) |
Adds the specified event to this consumer pipe.
Definition at line 175 of file DQMConsumerPipe.cc.
References eventQueue_, eventQueueLock_, events_, lastEventRequestTime_, maxQueueSize_, NULL, pushEvent(), pushEventFailures_, and pushMode_.
00176 { 00177 // update the local pointer to the most recent event 00178 boost::mutex::scoped_lock scopedLockForEventQueue(eventQueueLock_); 00179 eventQueue_.push_back(bufPtr); 00180 while (eventQueue_.size() > maxQueueSize_) { 00181 eventQueue_.pop_front(); 00182 } 00183 // actually push out DQM data if this is a push mode consumer (SMProxyServer) 00184 if(pushMode_) { 00185 bool success = pushEvent(); 00186 // update the time of the most recent successful transaction 00187 if(!success) ++pushEventFailures_; 00188 else 00189 { 00190 lastEventRequestTime_ = time(NULL); 00191 ++events_; 00192 } 00193 } 00194 }
bool DQMConsumerPipe::wantsDQMEvent | ( | DQMEventMsgView const & | eventView | ) | const |
Tests if the consumer wants the specified event.
Definition at line 164 of file DQMConsumerPipe.cc.
References DQMEventMsgView::topFolderName(), and topFolderName_.
00165 { 00166 // for now, only allow one top folder selection or "*" 00167 std::string meansEverything = "*"; 00168 if(topFolderName_.compare(meansEverything) == 0) return true; 00169 else return (topFolderName_.compare(eventView.topFolderName()) == 0); 00170 }
uint32 stor::DQMConsumerPipe::consumerId_ [private] |
Definition at line 74 of file DQMConsumerPipe.h.
Referenced by DQMConsumerPipe(), getConsumerId(), initializeSelection(), and ~DQMConsumerPipe().
Definition at line 79 of file DQMConsumerPipe.h.
Referenced by DQMConsumerPipe(), and isProxyServer().
std::string stor::DQMConsumerPipe::consumerName_ [private] |
Definition at line 75 of file DQMConsumerPipe.h.
Referenced by DQMConsumerPipe(), getConsumerName(), and pushEvent().
std::string stor::DQMConsumerPipe::consumerPriority_ [private] |
std::deque< boost::shared_ptr< std::vector<char> > > stor::DQMConsumerPipe::eventQueue_ [private] |
Definition at line 96 of file DQMConsumerPipe.h.
Referenced by clearQueue(), getDQMEvent(), pushEvent(), and putDQMEvent().
Definition at line 100 of file DQMConsumerPipe.h.
Referenced by clearQueue(), getDQMEvent(), pushEvent(), and putDQMEvent().
int stor::DQMConsumerPipe::events_ [private] |
Definition at line 80 of file DQMConsumerPipe.h.
Referenced by getDQMEvent(), getEvents(), and putDQMEvent().
CURL* stor::DQMConsumerPipe::han_ [private] |
Definition at line 71 of file DQMConsumerPipe.h.
Referenced by DQMConsumerPipe(), pushEvent(), and ~DQMConsumerPipe().
struct curl_slist* stor::DQMConsumerPipe::headers_ [read, private] |
Definition at line 72 of file DQMConsumerPipe.h.
Referenced by DQMConsumerPipe(), pushEvent(), and ~DQMConsumerPipe().
std::string stor::DQMConsumerPipe::hostName_ [private] |
Definition at line 88 of file DQMConsumerPipe.h.
Referenced by DQMConsumerPipe(), initializeSelection(), and isReadyForEvent().
time_t stor::DQMConsumerPipe::lastEventRequestTime_ [private] |
Definition at line 85 of file DQMConsumerPipe.h.
Referenced by DQMConsumerPipe(), getDQMEvent(), getLastEventRequestTime(), isDisconnected(), isIdle(), isReadyForEvent(), and putDQMEvent().
unsigned int stor::DQMConsumerPipe::maxQueueSize_ [private] |
unsigned int stor::DQMConsumerPipe::pushEventFailures_ [private] |
Definition at line 93 of file DQMConsumerPipe.h.
Referenced by getPushEventFailures(), and putDQMEvent().
bool stor::DQMConsumerPipe::pushMode_ [private] |
Definition at line 91 of file DQMConsumerPipe.h.
Referenced by DQMConsumerPipe(), putDQMEvent(), and setPushMode().
uint32 DQMConsumerPipe::rootId_ = 1 [static, private] |
Initialize the static value for the root consumer id.
Definition at line 103 of file DQMConsumerPipe.h.
Referenced by DQMConsumerPipe().
boost::mutex DQMConsumerPipe::rootIdLock_ [static, private] |
Initialize the static lock used to control access to the root ID.
Definition at line 104 of file DQMConsumerPipe.h.
Referenced by DQMConsumerPipe().
Definition at line 84 of file DQMConsumerPipe.h.
Referenced by DQMConsumerPipe(), isDisconnected(), and isIdle().
int stor::DQMConsumerPipe::timeToIdleState_ [private] |
Definition at line 83 of file DQMConsumerPipe.h.
Referenced by DQMConsumerPipe(), isIdle(), and isReadyForEvent().
std::string stor::DQMConsumerPipe::topFolderName_ [private] |