CMS 3D CMS Logo

stor::DQMConsumerPipe Class Reference

#include <EventFilter/StorageManager/interface/DQMConsumerPipe.h>

List of all members.

Public Member Functions

void clearQueue ()
 DQMConsumerPipe (std::string name, std::string priority, int activeTimeout, int idleTimeout, std::string folderName, std::string hostName, int queueSize)
 DQMConsumerPipe constructor.
uint32 getConsumerId () const
 Returns the consumer ID associated with this pipe.
std::string getConsumerName ()
boost::shared_ptr< std::vector
< char > > 
getDQMEvent ()
 Fetches the next event from this consumer pipe.
int getEvents ()
std::string getHostName ()
time_t getLastEventRequestTime ()
int getPushEventFailures ()
void initializeSelection ()
 Initializes the event selection for this consumer based on the list of available triggers stored in the specified InitMsgView and the request ParameterSet that was specified in the constructor.
bool isDisconnected () const
 Tests if the consumer has disconnected.
bool isIdle () const
 Tests if the consumer is idle (as opposed to active).
bool isProxyServer () const
bool isReadyForEvent () const
 Tests if the consumer is ready for an event.
void putDQMEvent (boost::shared_ptr< std::vector< char > > bufPtr)
 Adds the specified event to this consumer pipe.
void setPushMode (bool mode)
bool wantsDQMEvent (DQMEventMsgView const &eventView) const
 Tests if the consumer wants the specified event.
 ~DQMConsumerPipe ()
 DQMConsumerPipe destructor.

Private Member Functions

bool pushEvent ()

Private Attributes

uint32 consumerId_
bool consumerIsProxyServer_
std::string consumerName_
std::string consumerPriority_
std::deque< boost::shared_ptr
< std::vector< char > > > 
eventQueue_
boost::mutex eventQueueLock_
int events_
CURL * han_
struct curl_slist * headers_
std::string hostName_
bool initializationDone
time_t lastEventRequestTime_
unsigned int maxQueueSize_
unsigned int pushEventFailures_
bool pushMode_
int timeToDisconnectedState_
int timeToIdleState_
std::string topFolderName_

Static Private Attributes

static uint32 rootId_ = 1
 Initialize the static value for the root consumer id.
static boost::mutex rootIdLock_
 Initialize the static lock used to control access to the root ID.


Detailed Description

Definition at line 42 of file DQMConsumerPipe.h.


Constructor & Destructor Documentation

DQMConsumerPipe::DQMConsumerPipe ( std::string  name,
std::string  priority,
int  activeTimeout,
int  idleTimeout,
std::string  folderName,
std::string  hostName,
int  queueSize 
)

DQMConsumerPipe constructor.

Definition at line 36 of file DQMConsumerPipe.cc.

References consumerId_, consumerIsProxyServer_, consumerName_, consumerPriority_, stor::func(), han_, headers_, initializationDone, lastEventRequestTime_, NULL, pushMode_, rootId_, rootIdLock_, stor::setopt(), timeToDisconnectedState_, and timeToIdleState_.

00039                                                :
00040   han_(curl_easy_init()),
00041   headers_(),
00042   consumerName_(name),consumerPriority_(priority),
00043   topFolderName_(folderName),
00044   hostName_(hostName),
00045   events_(0),
00046   pushEventFailures_(0),
00047   maxQueueSize_(queueSize)
00048 {
00049   // initialize the time values we use for defining "states"
00050   timeToIdleState_ = activeTimeout;
00051   timeToDisconnectedState_ = activeTimeout + idleTimeout;
00052   lastEventRequestTime_ = time(NULL);
00053   initializationDone = false;
00054   pushMode_ = false;
00055   if(consumerPriority_.compare("PushMode") == 0) pushMode_ = true;
00056 
00057   // determine if we're connected to a proxy server
00058   consumerIsProxyServer_ = false;
00059   //if (consumerName_ == PROXY_SERVER_NAME)
00060   if (consumerName_.find("urn") != std::string::npos &&
00061       consumerName_.find("xdaq") != std::string::npos &&
00062       consumerName_.find("pushEventData") != std::string::npos)
00063   {
00064     consumerIsProxyServer_ = true;
00065   }
00066 
00067   // assign the consumer ID
00068   boost::mutex::scoped_lock scopedLockForRootId(rootIdLock_);
00069   consumerId_ = rootId_;
00070   rootId_++;
00071 
00072   if(han_==0)
00073   {
00074     edm::LogError("DQMConsumerPipe") << "Could not create curl handle";
00075     //std::cout << "Could not create curl handle" << std::endl;
00076     // throw exception here when we can make the SM go to a fail state from
00077     // another thread
00078   } else {
00079     headers_ = curl_slist_append(headers_, "Content-Type: application/octet-stream");
00080     headers_ = curl_slist_append(headers_, "Content-Transfer-Encoding: binary");
00081     // Avoid the Expect: 100 continue automatic header that gives a 2 sec delay
00082     // for pthttps but we don't need the Expect: 100 continue anyway
00083     headers_ = curl_slist_append(headers_, "Expect:");
00084     setopt(han_, CURLOPT_HTTPHEADER, headers_);
00085     setopt(han_, CURLOPT_URL, consumerName_.c_str());
00086     setopt(han_, CURLOPT_WRITEFUNCTION, func);
00087     // debug options
00088     //setopt(han_,CURLOPT_VERBOSE, 1);
00089     //setopt(han_,CURLOPT_TCP_NODELAY, 1);
00090   }
00091 }

DQMConsumerPipe::~DQMConsumerPipe (  ) 

DQMConsumerPipe destructor.

Definition at line 96 of file DQMConsumerPipe.cc.

References consumerId_, lat::endl(), FDEBUG, han_, and headers_.

00097 {
00098   FDEBUG(5) << "Executing destructor for DQM consumer pipe with ID = " <<
00099     consumerId_ << std::endl;
00100   curl_slist_free_all(headers_);
00101   curl_easy_cleanup(han_);
00102 }


Member Function Documentation

void DQMConsumerPipe::clearQueue (  ) 

Definition at line 320 of file DQMConsumerPipe.cc.

References eventQueue_, and eventQueueLock_.

Referenced by getDQMEvent().

00321 {
00322   boost::mutex::scoped_lock scopedLockForEventQueue(eventQueueLock_);
00323   eventQueue_.clear();
00324 }

uint32 DQMConsumerPipe::getConsumerId (  )  const

Returns the consumer ID associated with this pipe.

Definition at line 107 of file DQMConsumerPipe.cc.

References consumerId_.

00108 {
00109   return consumerId_;
00110 }

std::string stor::DQMConsumerPipe::getConsumerName (  )  [inline]

Definition at line 63 of file DQMConsumerPipe.h.

References consumerName_.

00063 { return(consumerName_);}

boost::shared_ptr< std::vector< char > > DQMConsumerPipe::getDQMEvent (  ) 

Fetches the next event from this consumer pipe.

If there are no events in the pipe, an empty shared_ptr will be returned (ptr.get() == NULL).

Definition at line 201 of file DQMConsumerPipe.cc.

References clearQueue(), eventQueue_, eventQueueLock_, events_, isDisconnected(), isIdle(), lastEventRequestTime_, and NULL.

00202 {
00203   // clear out any stale event(s)
00204   if (isIdle() || isDisconnected())
00205   {
00206     this->clearQueue();
00207   }
00208 
00209   // fetch the most recent event
00210   boost::shared_ptr< std::vector<char> > bufPtr;
00211   {
00212     boost::mutex::scoped_lock scopedLockForEventQueue(eventQueueLock_);
00213     if (! eventQueue_.empty())
00214     {
00215       bufPtr = eventQueue_.front();
00216       eventQueue_.pop_front();
00217       ++events_;
00218     }
00219   }
00220 
00221   // update the time of the most recent request
00222   lastEventRequestTime_ = time(NULL);
00223 
00224   // return the event
00225   return bufPtr;
00226 }

int stor::DQMConsumerPipe::getEvents (  )  [inline]

Definition at line 65 of file DQMConsumerPipe.h.

References events_.

00065 { return(events_);}

std::string stor::DQMConsumerPipe::getHostName (  )  [inline]

Definition at line 67 of file DQMConsumerPipe.h.

References hostName_.

00067 { return(hostName_);}

time_t stor::DQMConsumerPipe::getLastEventRequestTime (  )  [inline]

Definition at line 66 of file DQMConsumerPipe.h.

References lastEventRequestTime_.

00066 { return(lastEventRequestTime_);}

int stor::DQMConsumerPipe::getPushEventFailures (  )  [inline]

Definition at line 64 of file DQMConsumerPipe.h.

References pushEventFailures_.

00064 { return(pushEventFailures_);}

void DQMConsumerPipe::initializeSelection (  ) 

Initializes the event selection for this consumer based on the list of available triggers stored in the specified InitMsgView and the request ParameterSet that was specified in the constructor.

Definition at line 117 of file DQMConsumerPipe.cc.

References consumerId_, lat::endl(), FDEBUG, and initializationDone.

00118 {
00119   FDEBUG(5) << "Initializing DQM consumer pipe, ID = " <<
00120     consumerId_ << std::endl;
00121 
00122   // no need for initialization yet
00123   initializationDone = true;
00124 
00125 }

bool DQMConsumerPipe::isDisconnected (  )  const

Tests if the consumer has disconnected.

Definition at line 142 of file DQMConsumerPipe.cc.

References lastEventRequestTime_, NULL, and timeToDisconnectedState_.

Referenced by getDQMEvent().

00143 {
00144   time_t timeDiff = time(NULL) - lastEventRequestTime_;
00145   return (timeDiff >= timeToDisconnectedState_);
00146 }

bool DQMConsumerPipe::isIdle (  )  const

Tests if the consumer is idle (as opposed to active).

The idle state indicates that the consumer is still connected, but it hasn't requested an event in some time.

Definition at line 132 of file DQMConsumerPipe.cc.

References lastEventRequestTime_, NULL, timeToDisconnectedState_, and timeToIdleState_.

Referenced by getDQMEvent().

00133 {
00134   time_t timeDiff = time(NULL) - lastEventRequestTime_;
00135   return (timeDiff >= timeToIdleState_ &&
00136           timeDiff <  timeToDisconnectedState_);
00137 }

bool stor::DQMConsumerPipe::isProxyServer (  )  const [inline]

Definition at line 57 of file DQMConsumerPipe.h.

References consumerIsProxyServer_.

00057 { return consumerIsProxyServer_; }

bool DQMConsumerPipe::isReadyForEvent (  )  const

Tests if the consumer is ready for an event.

Definition at line 151 of file DQMConsumerPipe.cc.

References initializationDone, lastEventRequestTime_, NULL, and timeToIdleState_.

00152 {
00153   // 13-Oct-2006, KAB - we're not ready if we haven't been initialized
00154   if (! initializationDone) return false;
00155 
00156   // for now, just test if we are in the active state
00157   time_t timeDiff = time(NULL) - lastEventRequestTime_;
00158   return (timeDiff < timeToIdleState_);
00159 }

bool DQMConsumerPipe::pushEvent (  )  [private]

Definition at line 228 of file DQMConsumerPipe.cc.

References TestMuL1L2Filter_cff::cerr, consumerName_, stor::ReadData::d_, data, lat::endl(), eventQueue_, eventQueueLock_, FDEBUG, stor::func(), han_, headers_, i, len, NULL, stor::setopt(), DQMEventMsgView::size(), and DQMEventMsgView::startAddress().

Referenced by putDQMEvent().

00229 {
00230   // fetch the most recent event
00231   boost::shared_ptr< std::vector<char> > bufPtr;
00232   {
00233     boost::mutex::scoped_lock scopedLockForEventQueue(eventQueueLock_);
00234     if (! eventQueue_.empty())
00235     {
00236       bufPtr = eventQueue_.front();
00237       eventQueue_.pop_front();
00238     }
00239   }
00240   if (bufPtr.get() == NULL)
00241   {
00242     edm::LogError("pushEvent") << "========================================";
00243     edm::LogError("pushEvent") << "pushEvent called with empty event queue!";
00244     return false;
00245   }
00246 
00247   // push the next event out to a push mode consumer (SMProxyServer)
00248   FDEBUG(5) << "pushing out DQMevent to " << consumerName_ << std::endl;
00249   stor::ReadData data;
00250 
00251   data.d_.clear();
00252   // check if curl handle was obtained (at ctor) if not try again
00253   if(han_==0)
00254   {
00255     han_ = curl_easy_init();
00256     if(han_==0)
00257     {
00258       edm::LogError("pushEvent") << "Could not create curl handle";
00259       return false;
00260     }
00261     headers_ = curl_slist_append(headers_, "Content-Type: application/octet-stream");
00262     headers_ = curl_slist_append(headers_, "Content-Transfer-Encoding: binary");
00263     // Avoid the Expect: 100 continue automatic header that gives a 2 sec delay
00264     headers_ = curl_slist_append(headers_, "Expect:");
00265     setopt(han_, CURLOPT_HTTPHEADER, headers_);
00266     setopt(han_, CURLOPT_URL, consumerName_.c_str());
00267     setopt(han_, CURLOPT_WRITEFUNCTION, func);
00268   }
00269   setopt(han_,CURLOPT_WRITEDATA,&data);
00270 
00271   // build the event message
00272   DQMEventMsgView msgView(&(*bufPtr)[0]);
00273 
00274   // add the request message as a https post
00275   setopt(han_, CURLOPT_POSTFIELDS, msgView.startAddress());
00276   setopt(han_, CURLOPT_POSTFIELDSIZE, msgView.size());
00277 
00278   // send the HTTP POST, read the reply
00279   // explicitly close connection when using pthttps transport or sometimes it hangs
00280   // because somtimes curl does not see that the connection was closed and tries to reuse it
00281   setopt(han_,CURLOPT_FORBID_REUSE, 1);
00282   CURLcode messageStatus = curl_easy_perform(han_);
00283 
00284   if(messageStatus!=0)
00285   {
00286     cerr << "curl perform failed for pushDQMEvent" << endl;
00287     edm::LogError("pushEvent") << "curl perform failed for pushDQMEvent. "
00288         << "Could not register: probably XDAQ not running on Storage Manager"
00289         << " at " << consumerName_;
00290     return false;
00291   }
00292   // should really read the message to see if okay (if we had made one!)
00293   if(data.d_.length() == 0)
00294   {
00295     return true;
00296   } else {
00297     if(data.d_.length() > 0) {
00298       std::vector<char> buf(1024);
00299       int len = data.d_.length();
00300       buf.resize(len);
00301       for (int i=0; i<len ; i++) buf[i] = data.d_[i];
00302       const unsigned int MAX_DUMP_LENGTH = 1000;
00303       edm::LogError("pushEvent") << "========================================";
00304       edm::LogError("pushEvent") << "Unexpected pushDQMEvent response!";
00305       if (data.d_.length() <= MAX_DUMP_LENGTH) {
00306         edm::LogError("pushEvent") << "Here is the raw text that was returned:";
00307         edm::LogError("pushEvent") << data.d_;
00308       }
00309       else {
00310         edm::LogError("pushEvent") << "Here are the first " << MAX_DUMP_LENGTH <<
00311           " characters of the raw text that was returned:";
00312         edm::LogError("pushEvent") << (data.d_.substr(0, MAX_DUMP_LENGTH));
00313       }
00314       edm::LogError("pushEvent") << "========================================";
00315     }
00316   }
00317   return false;
00318 }

void DQMConsumerPipe::putDQMEvent ( boost::shared_ptr< std::vector< char > >  bufPtr  ) 

Adds the specified event to this consumer pipe.

Definition at line 175 of file DQMConsumerPipe.cc.

References eventQueue_, eventQueueLock_, events_, lastEventRequestTime_, maxQueueSize_, NULL, pushEvent(), pushEventFailures_, and pushMode_.

00176 {
00177   // update the local pointer to the most recent event
00178   boost::mutex::scoped_lock scopedLockForEventQueue(eventQueueLock_);
00179   eventQueue_.push_back(bufPtr);
00180   while (eventQueue_.size() > maxQueueSize_) {
00181     eventQueue_.pop_front();
00182   }
00183   // actually push out DQM data if this is a push mode consumer (SMProxyServer)
00184   if(pushMode_) {
00185     bool success = pushEvent();
00186     // update the time of the most recent successful transaction
00187     if(!success) ++pushEventFailures_;
00188     else 
00189     {
00190       lastEventRequestTime_ = time(NULL);
00191       ++events_;
00192     }
00193   }
00194 }

void stor::DQMConsumerPipe::setPushMode ( bool  mode  )  [inline]

Definition at line 61 of file DQMConsumerPipe.h.

References pushMode_.

00061 { pushMode_ = mode; }

bool DQMConsumerPipe::wantsDQMEvent ( DQMEventMsgView const &  eventView  )  const

Tests if the consumer wants the specified event.

Definition at line 164 of file DQMConsumerPipe.cc.

References DQMEventMsgView::topFolderName(), and topFolderName_.

00165 {
00166   // for now, only allow one top folder selection or "*"
00167   std::string meansEverything = "*";
00168   if(topFolderName_.compare(meansEverything) == 0) return true;
00169   else return (topFolderName_.compare(eventView.topFolderName()) == 0);
00170 }


Member Data Documentation

uint32 stor::DQMConsumerPipe::consumerId_ [private]

Definition at line 74 of file DQMConsumerPipe.h.

Referenced by DQMConsumerPipe(), getConsumerId(), initializeSelection(), and ~DQMConsumerPipe().

bool stor::DQMConsumerPipe::consumerIsProxyServer_ [private]

Definition at line 79 of file DQMConsumerPipe.h.

Referenced by DQMConsumerPipe(), and isProxyServer().

std::string stor::DQMConsumerPipe::consumerName_ [private]

Definition at line 75 of file DQMConsumerPipe.h.

Referenced by DQMConsumerPipe(), getConsumerName(), and pushEvent().

std::string stor::DQMConsumerPipe::consumerPriority_ [private]

Definition at line 76 of file DQMConsumerPipe.h.

Referenced by DQMConsumerPipe().

std::deque< boost::shared_ptr< std::vector<char> > > stor::DQMConsumerPipe::eventQueue_ [private]

Definition at line 96 of file DQMConsumerPipe.h.

Referenced by clearQueue(), getDQMEvent(), pushEvent(), and putDQMEvent().

boost::mutex stor::DQMConsumerPipe::eventQueueLock_ [private]

Definition at line 100 of file DQMConsumerPipe.h.

Referenced by clearQueue(), getDQMEvent(), pushEvent(), and putDQMEvent().

int stor::DQMConsumerPipe::events_ [private]

Definition at line 80 of file DQMConsumerPipe.h.

Referenced by getDQMEvent(), getEvents(), and putDQMEvent().

CURL* stor::DQMConsumerPipe::han_ [private]

Definition at line 71 of file DQMConsumerPipe.h.

Referenced by DQMConsumerPipe(), pushEvent(), and ~DQMConsumerPipe().

struct curl_slist* stor::DQMConsumerPipe::headers_ [read, private]

Definition at line 72 of file DQMConsumerPipe.h.

Referenced by DQMConsumerPipe(), pushEvent(), and ~DQMConsumerPipe().

std::string stor::DQMConsumerPipe::hostName_ [private]

Definition at line 78 of file DQMConsumerPipe.h.

Referenced by getHostName().

bool stor::DQMConsumerPipe::initializationDone [private]

Definition at line 88 of file DQMConsumerPipe.h.

Referenced by DQMConsumerPipe(), initializeSelection(), and isReadyForEvent().

time_t stor::DQMConsumerPipe::lastEventRequestTime_ [private]

Definition at line 85 of file DQMConsumerPipe.h.

Referenced by DQMConsumerPipe(), getDQMEvent(), getLastEventRequestTime(), isDisconnected(), isIdle(), isReadyForEvent(), and putDQMEvent().

unsigned int stor::DQMConsumerPipe::maxQueueSize_ [private]

Definition at line 97 of file DQMConsumerPipe.h.

Referenced by putDQMEvent().

unsigned int stor::DQMConsumerPipe::pushEventFailures_ [private]

Definition at line 93 of file DQMConsumerPipe.h.

Referenced by getPushEventFailures(), and putDQMEvent().

bool stor::DQMConsumerPipe::pushMode_ [private]

Definition at line 91 of file DQMConsumerPipe.h.

Referenced by DQMConsumerPipe(), putDQMEvent(), and setPushMode().

uint32 DQMConsumerPipe::rootId_ = 1 [static, private]

Initialize the static value for the root consumer id.

Definition at line 103 of file DQMConsumerPipe.h.

Referenced by DQMConsumerPipe().

boost::mutex DQMConsumerPipe::rootIdLock_ [static, private]

Initialize the static lock used to control access to the root ID.

Definition at line 104 of file DQMConsumerPipe.h.

Referenced by DQMConsumerPipe().

int stor::DQMConsumerPipe::timeToDisconnectedState_ [private]

Definition at line 84 of file DQMConsumerPipe.h.

Referenced by DQMConsumerPipe(), isDisconnected(), and isIdle().

int stor::DQMConsumerPipe::timeToIdleState_ [private]

Definition at line 83 of file DQMConsumerPipe.h.

Referenced by DQMConsumerPipe(), isIdle(), and isReadyForEvent().

std::string stor::DQMConsumerPipe::topFolderName_ [private]

Definition at line 77 of file DQMConsumerPipe.h.

Referenced by wantsDQMEvent().


The documentation for this class was generated from the following files:
Generated on Tue Jun 9 18:52:50 2009 for CMSSW by  doxygen 1.5.4