CMS 3D CMS Logo

stor::ConsumerPipe Class Reference

#include <EventFilter/StorageManager/interface/ConsumerPipe.h>

List of all members.

Public Types

enum  STATS_SAMPLE_TYPE { QUEUED_EVENTS = 10, SERVED_EVENTS = 11, DESIRED_EVENTS = 12 }
enum  STATS_TIME_FRAME { SHORT_TERM = 0, LONG_TERM = 1 }

Public Member Functions

void clearQueue ()
void clearRegistryWarning ()
 ConsumerPipe (std::string name, std::string priority, int activeTimeout, int idleTimeout, Strings triggerSelection, double rateRequest, std::string hltOutputSelection, std::string hostName, int queueSize)
 ConsumerPipe constructor.
double getAverageQueueSize (STATS_TIME_FRAME timeFrame=SHORT_TERM, STATS_SAMPLE_TYPE sampleType=QUEUED_EVENTS, double currentTime=BaseCounter::getCurrentTime())
 Returns the average queue size for the specified statistics types (short term vs.
uint32 getConsumerId () const
 Returns the consumer ID associated with this pipe.
std::string getConsumerName ()
double getDataRate (STATS_TIME_FRAME timeFrame=SHORT_TERM, STATS_SAMPLE_TYPE sampleType=QUEUED_EVENTS, double currentTime=BaseCounter::getCurrentTime())
 Returns the data rate for the specified statistics types (short term vs.
double getDuration (STATS_TIME_FRAME timeFrame=SHORT_TERM, STATS_SAMPLE_TYPE sampleType=QUEUED_EVENTS, double currentTime=BaseCounter::getCurrentTime())
 Returns the duration (in seconds) for the specified statistics types (short term vs.
boost::shared_ptr< std::vector
< char > > 
getEvent ()
 Fetches the next event from this consumer pipe.
long long getEventCount (STATS_TIME_FRAME timeFrame=SHORT_TERM, STATS_SAMPLE_TYPE sampleType=QUEUED_EVENTS, double currentTime=BaseCounter::getCurrentTime())
 Returns the number of events for the specified statistics types (short term vs.
double getEventRate (STATS_TIME_FRAME timeFrame=SHORT_TERM, STATS_SAMPLE_TYPE sampleType=QUEUED_EVENTS, double currentTime=BaseCounter::getCurrentTime())
 Returns the rate of events for the specified statistics types (short term vs.
unsigned int getEvents ()
std::string getHLTOutputSelection () const
std::string getHostName ()
time_t getLastEventRequestTime ()
unsigned int getPushEventFailures ()
double getRateRequest () const
std::vector< char > getRegistryWarning ()
std::vector< std::string > getTriggerRequest () const
Strings getTriggerSelection () const
bool hasRegistryWarning () const
void initializeSelection (Strings const &fullTriggerList, uint32 outputModuleId)
 Initializes the event selection for this consumer based on the specified full list of triggers and the request ParameterSet that was specified in the constructor.
bool isActive () const
 Tests if the consumer is active.
bool isDisconnected () const
 Tests if the consumer has disconnected.
bool isIdle () const
 Tests if the consumer is idle (as opposed to active).
bool isProxyServer () const
bool isReadyForEvent (double currentTime=BaseCounter::getCurrentTime()) const
 Tests if the consumer is ready for an event.
void putEvent (boost::shared_ptr< std::vector< char > > bufPtr)
 Adds the specified event to this consumer pipe.
void setPushMode (bool mode)
void setRegistryWarning (std::vector< char > const &message)
void setRegistryWarning (std::string const &message)
bool wantsEvent (EventMsgView const &eventView) const
 Tests if the consumer wants the specified event.
void wasConsidered (double currentTime=BaseCounter::getCurrentTime())
 Tells the ConsumerPipe that an event was considered or accepted or queued.
 ~ConsumerPipe ()
 ConsumerPipe destructor.

Static Public Attributes

static const double MAX_ACCEPT_INTERVAL = 86400.0
 Initialize the maximum accept interval.

Private Member Functions

bool pushEvent ()

Private Attributes

uint32 consumerId_
bool consumerIsProxyServer_
std::string consumerName_
std::string consumerPriority_
std::deque< boost::shared_ptr
< std::vector< char > > > 
eventQueue_
boost::mutex eventQueueLock_
int events_
boost::shared_ptr
< edm::EventSelector
eventSelector_
CURL * han_
struct curl_slist * headers_
uint32 hltOutputModuleId_
std::string hltOutputSelection_
std::string hostName_
bool initializationDone
double lastConsideredEventTime_
time_t lastEventRequestTime_
boost::shared_ptr< ForeverCounterlongTermDesiredCounter_
boost::shared_ptr< ForeverCounterlongTermQueuedCounter_
boost::shared_ptr< ForeverCounterlongTermServedCounter_
boost::shared_ptr< ForeverCounterltQueueSizeWhenDesiredCounter_
boost::shared_ptr< ForeverCounterltQueueSizeWhenQueuedCounter_
unsigned int maxQueueSize_
double minTimeBetweenEvents_
unsigned int pushEventFailures_
bool pushMode_
double rateRequest_
boost::shared_ptr
< RollingSampleCounter
rateRequestCounter_
std::vector< char > registryWarningMessage_
bool registryWarningWasReported_
boost::shared_ptr
< RollingIntervalCounter
shortTermDesiredCounter_
boost::shared_ptr
< RollingIntervalCounter
shortTermQueuedCounter_
boost::shared_ptr
< RollingIntervalCounter
shortTermServedCounter_
boost::shared_ptr
< RollingIntervalCounter
stQueueSizeWhenDesiredCounter_
boost::shared_ptr
< RollingIntervalCounter
stQueueSizeWhenQueuedCounter_
int timeToDisconnectedState_
int timeToIdleState_
Strings triggerSelection_

Static Private Attributes

static uint32 rootId_ = 1
 Initialize the static value for the root consumer id.
static boost::mutex rootIdLock_
 Initialize the static lock used to control access to the root ID.


Detailed Description

Definition at line 49 of file ConsumerPipe.h.


Member Enumeration Documentation

enum stor::ConsumerPipe::STATS_SAMPLE_TYPE

Enumerator:
QUEUED_EVENTS 
SERVED_EVENTS 
DESIRED_EVENTS 

Definition at line 53 of file ConsumerPipe.h.

00053                            { QUEUED_EVENTS = 10, SERVED_EVENTS = 11,
00054                              DESIRED_EVENTS = 12 };

enum stor::ConsumerPipe::STATS_TIME_FRAME

Enumerator:
SHORT_TERM 
LONG_TERM 

Definition at line 52 of file ConsumerPipe.h.

00052 { SHORT_TERM = 0, LONG_TERM = 1 };


Constructor & Destructor Documentation

ConsumerPipe::ConsumerPipe ( std::string  name,
std::string  priority,
int  activeTimeout,
int  idleTimeout,
Strings  triggerSelection,
double  rateRequest,
std::string  hltOutputSelection,
std::string  hostName,
int  queueSize 
)

ConsumerPipe constructor.

Definition at line 39 of file ConsumerPipe.cc.

References consumerId_, consumerIsProxyServer_, consumerName_, consumerPriority_, stor::func(), stor::BaseCounter::getCurrentTime(), han_, headers_, stor::RollingSampleCounter::INCLUDE_SAMPLES_IMMEDIATELY, initializationDone, lastConsideredEventTime_, lastEventRequestTime_, longTermDesiredCounter_, longTermQueuedCounter_, longTermServedCounter_, ltQueueSizeWhenDesiredCounter_, ltQueueSizeWhenQueuedCounter_, MAX_ACCEPT_INTERVAL, minTimeBetweenEvents_, NULL, pushMode_, rateRequest_, rateRequestCounter_, registryWarningWasReported_, rootId_, rootIdLock_, stor::setopt(), shortTermDesiredCounter_, shortTermQueuedCounter_, shortTermServedCounter_, stQueueSizeWhenDesiredCounter_, stQueueSizeWhenQueuedCounter_, timeToDisconnectedState_, and timeToIdleState_.

00043                                                              :
00044   han_(curl_easy_init()),
00045   headers_(),
00046   consumerName_(name),consumerPriority_(priority),
00047   events_(0),
00048   triggerSelection_(triggerSelection),
00049   rateRequest_(rateRequest),
00050   hltOutputSelection_(hltOutputSelection),
00051   hostName_(hostName),
00052   pushEventFailures_(0),
00053   maxQueueSize_(queueSize)
00054 {
00055   // initialize the time values we use for defining "states"
00056   timeToIdleState_ = activeTimeout;
00057   timeToDisconnectedState_ = activeTimeout + idleTimeout;
00058   lastEventRequestTime_ = time(NULL);
00059   initializationDone = false;
00060   pushMode_ = false;
00061   if(consumerPriority_.compare("PushMode") == 0) pushMode_ = true;
00062   registryWarningWasReported_ = false;
00063 
00064   // determine if we're connected to a proxy server
00065   consumerIsProxyServer_ = false;
00066   //if (consumerName_ == PROXY_SERVER_NAME)
00067   if (consumerName_.find("urn") != std::string::npos &&
00068       consumerName_.find("xdaq") != std::string::npos &&
00069       consumerName_.find("pushEventData") != std::string::npos)
00070   {
00071     consumerIsProxyServer_ = true;
00072   }
00073 
00074   // assign the consumer ID
00075   boost::mutex::scoped_lock scopedLockForRootId(rootIdLock_);
00076   consumerId_ = rootId_;
00077   rootId_++;
00078 
00079   if(han_==0)
00080   {
00081     edm::LogError("ConsumerPipe") << "Could not create curl handle";
00082     //std::cout << "Could not create curl handle" << std::endl;
00083     // throw exception here when we can make the SM go to a fail state from
00084     // another thread
00085   } else {
00086     headers_ = curl_slist_append(headers_, "Content-Type: application/octet-stream");
00087     headers_ = curl_slist_append(headers_, "Content-Transfer-Encoding: binary");
00088     // Avoid the Expect: 100 continue automatic header that gives a 2 sec delay
00089     // for pthttps but we don't need the Expect: 100 continue anyway
00090     headers_ = curl_slist_append(headers_, "Expect:");
00091     setopt(han_, CURLOPT_HTTPHEADER, headers_);
00092     setopt(han_, CURLOPT_URL, consumerName_.c_str());
00093     setopt(han_, CURLOPT_WRITEFUNCTION, func);
00094     // debug options
00095     //setopt(han_,CURLOPT_VERBOSE, 1);
00096     //setopt(han_,CURLOPT_TCP_NODELAY, 1);
00097   }
00098 
00099   // determine the amount of time that we need to wait between accepted
00100   // events.  The request rate specified to this constructor is
00101   // converted to an interval that is used internally, and the interval
00102   // is required to be somewhat reasonable.
00103   if (rateRequest_ < (1.0 / MAX_ACCEPT_INTERVAL))
00104   {
00105     minTimeBetweenEvents_ = MAX_ACCEPT_INTERVAL;
00106   }
00107   else
00108   {
00109     minTimeBetweenEvents_ = 1.0 / rateRequest_;  // seconds
00110   }
00111   lastConsideredEventTime_ = BaseCounter::getCurrentTime();
00112   rateRequestCounter_.reset(new RollingSampleCounter(50,1,60,RollingSampleCounter::INCLUDE_SAMPLES_IMMEDIATELY));
00113 
00114   // initialize the counters that we use for statistics
00115   longTermDesiredCounter_.reset(new ForeverCounter());
00116   shortTermDesiredCounter_.reset(new RollingIntervalCounter(180,5,20));
00117   longTermQueuedCounter_.reset(new ForeverCounter());
00118   shortTermQueuedCounter_.reset(new RollingIntervalCounter(180,5,20));
00119   longTermServedCounter_.reset(new ForeverCounter());
00120   shortTermServedCounter_.reset(new RollingIntervalCounter(180,5,20));
00121 
00122   ltQueueSizeWhenDesiredCounter_.reset(new ForeverCounter());
00123   stQueueSizeWhenDesiredCounter_.reset(new RollingIntervalCounter(180,5,20));
00124   ltQueueSizeWhenQueuedCounter_.reset(new ForeverCounter());
00125   stQueueSizeWhenQueuedCounter_.reset(new RollingIntervalCounter(180,5,20));
00126 }

ConsumerPipe::~ConsumerPipe (  ) 

ConsumerPipe destructor.

Definition at line 131 of file ConsumerPipe.cc.

References consumerId_, lat::endl(), FDEBUG, han_, and headers_.

00132 {
00133   FDEBUG(5) << "Executing destructor for consumer pipe with ID = " <<
00134     consumerId_ << std::endl;
00135   curl_slist_free_all(headers_);
00136   curl_easy_cleanup(han_);
00137 }


Member Function Documentation

void ConsumerPipe::clearQueue (  ) 

Definition at line 456 of file ConsumerPipe.cc.

References eventQueue_, and eventQueueLock_.

Referenced by getEvent().

00457 {
00458   boost::mutex::scoped_lock scopedLockForEventQueue(eventQueueLock_);
00459   eventQueue_.clear();
00460 }

void stor::ConsumerPipe::clearRegistryWarning (  )  [inline]

Definition at line 89 of file ConsumerPipe.h.

References registryWarningWasReported_.

00089 { registryWarningWasReported_ = false; }

double ConsumerPipe::getAverageQueueSize ( STATS_TIME_FRAME  timeFrame = SHORT_TERM,
STATS_SAMPLE_TYPE  sampleType = QUEUED_EVENTS,
double  currentTime = BaseCounter::getCurrentTime() 
)

Returns the average queue size for the specified statistics types (short term vs.

long term; desired vs. queued). The queue size is sampled before any additional events are added. For example, the average queue size for "queued" events is the size before each new event is queued.

Definition at line 622 of file ConsumerPipe.cc.

References ltQueueSizeWhenDesiredCounter_, ltQueueSizeWhenQueuedCounter_, QUEUED_EVENTS, SHORT_TERM, stQueueSizeWhenDesiredCounter_, and stQueueSizeWhenQueuedCounter_.

00625 {
00626   if (timeFrame == SHORT_TERM) {
00627     if (sampleType == QUEUED_EVENTS) {
00628       return stQueueSizeWhenQueuedCounter_->getValueAverage(currentTime);
00629     }
00630     else {
00631       return stQueueSizeWhenDesiredCounter_->getValueAverage(currentTime);
00632     }
00633   }
00634   else {
00635     if (sampleType == QUEUED_EVENTS) {
00636       return ltQueueSizeWhenQueuedCounter_->getValueAverage();
00637     }
00638     else {
00639       return ltQueueSizeWhenDesiredCounter_->getValueAverage();
00640     }
00641   }
00642 }

uint32 ConsumerPipe::getConsumerId (  )  const

Returns the consumer ID associated with this pipe.

Definition at line 142 of file ConsumerPipe.cc.

References consumerId_.

00143 {
00144   return consumerId_;
00145 }

std::string stor::ConsumerPipe::getConsumerName (  )  [inline]

Definition at line 80 of file ConsumerPipe.h.

References consumerName_.

00080 { return(consumerName_);}

double ConsumerPipe::getDataRate ( STATS_TIME_FRAME  timeFrame = SHORT_TERM,
STATS_SAMPLE_TYPE  sampleType = QUEUED_EVENTS,
double  currentTime = BaseCounter::getCurrentTime() 
)

Returns the data rate for the specified statistics types (short term vs.

long term; desired vs. queued vs. served).

Definition at line 553 of file ConsumerPipe.cc.

References DESIRED_EVENTS, longTermDesiredCounter_, longTermQueuedCounter_, longTermServedCounter_, QUEUED_EVENTS, SHORT_TERM, shortTermDesiredCounter_, shortTermQueuedCounter_, and shortTermServedCounter_.

00556 {
00557   if (timeFrame == SHORT_TERM) {
00558     if (sampleType == QUEUED_EVENTS) {
00559       return shortTermQueuedCounter_->getValueRate(currentTime);
00560     }
00561     else if (sampleType == DESIRED_EVENTS) {
00562       return shortTermDesiredCounter_->getValueRate(currentTime);
00563     }
00564     else {
00565       return shortTermServedCounter_->getValueRate(currentTime);
00566     }
00567   }
00568   else {
00569     if (sampleType == QUEUED_EVENTS) {
00570       return longTermQueuedCounter_->getValueRate(currentTime);
00571     }
00572     else if (sampleType == DESIRED_EVENTS) {
00573       return longTermDesiredCounter_->getValueRate(currentTime);
00574     }
00575     else {
00576       return longTermServedCounter_->getValueRate(currentTime);
00577     }
00578   }
00579 }

double ConsumerPipe::getDuration ( STATS_TIME_FRAME  timeFrame = SHORT_TERM,
STATS_SAMPLE_TYPE  sampleType = QUEUED_EVENTS,
double  currentTime = BaseCounter::getCurrentTime() 
)

Returns the duration (in seconds) for the specified statistics types (short term vs.

long term; desired vs. queued vs. served). "Duration" here means the length of time in which the specified statistics have been collected.

Definition at line 587 of file ConsumerPipe.cc.

References DESIRED_EVENTS, longTermDesiredCounter_, longTermQueuedCounter_, longTermServedCounter_, QUEUED_EVENTS, SHORT_TERM, shortTermDesiredCounter_, shortTermQueuedCounter_, and shortTermServedCounter_.

00590 {
00591   if (timeFrame == SHORT_TERM) {
00592     if (sampleType == QUEUED_EVENTS) {
00593       return shortTermQueuedCounter_->getDuration(currentTime);
00594     }
00595     else if (sampleType == DESIRED_EVENTS) {
00596       return shortTermDesiredCounter_->getDuration(currentTime);
00597     }
00598     else {
00599       return shortTermServedCounter_->getDuration(currentTime);
00600     }
00601   }
00602   else {
00603     if (sampleType == QUEUED_EVENTS) {
00604       return longTermQueuedCounter_->getDuration(currentTime);
00605     }
00606     else if (sampleType == DESIRED_EVENTS) {
00607       return longTermDesiredCounter_->getDuration(currentTime);
00608     }
00609     else {
00610       return longTermServedCounter_->getDuration(currentTime);
00611     }
00612   }
00613 }

boost::shared_ptr< std::vector< char > > ConsumerPipe::getEvent (  ) 

Fetches the next event from this consumer pipe.

If there are no events in the pipe, an empty shared_ptr will be returned (ptr.get() == NULL).

Definition at line 331 of file ConsumerPipe.cc.

References clearQueue(), eventQueue_, eventQueueLock_, events_, isDisconnected(), isIdle(), lastEventRequestTime_, longTermServedCounter_, NULL, and shortTermServedCounter_.

00332 {
00333   // 25-Aug-2005, KAB: clear out any stale event(s)
00334   if (isIdle() || isDisconnected())
00335   {
00336     this->clearQueue();
00337   }
00338 
00339   // fetch the most recent event
00340   boost::shared_ptr< std::vector<char> > bufPtr;
00341   {
00342     boost::mutex::scoped_lock scopedLockForEventQueue(eventQueueLock_);
00343     if (! eventQueue_.empty())
00344     {
00345       bufPtr = eventQueue_.front();
00346       eventQueue_.pop_front();
00347 
00348       // add the event to our statistics for "served" events
00349       ++events_;
00350       double sizeInMB = static_cast<double>(bufPtr->size()) / 1048576.0;
00351       longTermServedCounter_->addSample(sizeInMB);
00352       shortTermServedCounter_->addSample(sizeInMB);
00353     }
00354   }
00355 
00356   // update the time of the most recent request
00357   lastEventRequestTime_ = time(NULL);
00358 
00359   // return the event
00360   return bufPtr;
00361 }

long long ConsumerPipe::getEventCount ( STATS_TIME_FRAME  timeFrame = SHORT_TERM,
STATS_SAMPLE_TYPE  sampleType = QUEUED_EVENTS,
double  currentTime = BaseCounter::getCurrentTime() 
)

Returns the number of events for the specified statistics types (short term vs.

long term; desired vs. queued vs. served).

Definition at line 489 of file ConsumerPipe.cc.

References DESIRED_EVENTS, longTermDesiredCounter_, longTermQueuedCounter_, longTermServedCounter_, QUEUED_EVENTS, SHORT_TERM, shortTermDesiredCounter_, shortTermQueuedCounter_, and shortTermServedCounter_.

00492 {
00493   if (timeFrame == SHORT_TERM) {
00494     if (sampleType == QUEUED_EVENTS) {
00495       return shortTermQueuedCounter_->getSampleCount(currentTime);
00496     }
00497     else if (sampleType == DESIRED_EVENTS) {
00498       return shortTermDesiredCounter_->getSampleCount(currentTime);
00499     }
00500     else {
00501       return shortTermServedCounter_->getSampleCount(currentTime);
00502     }
00503   }
00504   else {
00505     if (sampleType == QUEUED_EVENTS) {
00506       return longTermQueuedCounter_->getSampleCount();
00507     }
00508     else if (sampleType == DESIRED_EVENTS) {
00509       return longTermDesiredCounter_->getSampleCount();
00510     }
00511     else {
00512       return longTermServedCounter_->getSampleCount();
00513     }
00514   }
00515 }

double ConsumerPipe::getEventRate ( STATS_TIME_FRAME  timeFrame = SHORT_TERM,
STATS_SAMPLE_TYPE  sampleType = QUEUED_EVENTS,
double  currentTime = BaseCounter::getCurrentTime() 
)

Returns the rate of events for the specified statistics types (short term vs.

long term; desired vs. queued vs. served).

Definition at line 521 of file ConsumerPipe.cc.

References DESIRED_EVENTS, longTermDesiredCounter_, longTermQueuedCounter_, longTermServedCounter_, QUEUED_EVENTS, SHORT_TERM, shortTermDesiredCounter_, shortTermQueuedCounter_, and shortTermServedCounter_.

00524 {
00525   if (timeFrame == SHORT_TERM) {
00526     if (sampleType == QUEUED_EVENTS) {
00527       return shortTermQueuedCounter_->getSampleRate(currentTime);
00528     }
00529     else if (sampleType == DESIRED_EVENTS) {
00530       return shortTermDesiredCounter_->getSampleRate(currentTime);
00531     }
00532     else {
00533       return shortTermServedCounter_->getSampleRate(currentTime);
00534     }
00535   }
00536   else {
00537     if (sampleType == QUEUED_EVENTS) {
00538       return longTermQueuedCounter_->getSampleRate(currentTime);
00539     }
00540     else if (sampleType == DESIRED_EVENTS) {
00541       return longTermDesiredCounter_->getSampleRate(currentTime);
00542     }
00543     else {
00544       return longTermServedCounter_->getSampleRate(currentTime);
00545     }
00546   }
00547 }

unsigned int stor::ConsumerPipe::getEvents (  )  [inline]

Definition at line 82 of file ConsumerPipe.h.

References events_.

00082 { return(events_);}

std::string stor::ConsumerPipe::getHLTOutputSelection (  )  const [inline]

Definition at line 108 of file ConsumerPipe.h.

References hltOutputSelection_.

00108 { return hltOutputSelection_; }

std::string stor::ConsumerPipe::getHostName (  )  [inline]

Definition at line 84 of file ConsumerPipe.h.

References hostName_.

00084 { return(hostName_);}

time_t stor::ConsumerPipe::getLastEventRequestTime (  )  [inline]

Definition at line 83 of file ConsumerPipe.h.

References lastEventRequestTime_.

00083 { return(lastEventRequestTime_);}

unsigned int stor::ConsumerPipe::getPushEventFailures (  )  [inline]

Definition at line 81 of file ConsumerPipe.h.

References pushEventFailures_.

00081 { return(pushEventFailures_);}

double stor::ConsumerPipe::getRateRequest (  )  const [inline]

Definition at line 107 of file ConsumerPipe.h.

References rateRequest_.

00107 { return rateRequest_; }

std::vector<char> stor::ConsumerPipe::getRegistryWarning (  )  [inline]

Definition at line 88 of file ConsumerPipe.h.

References registryWarningMessage_.

00088 { return registryWarningMessage_; }

std::vector< std::string > ConsumerPipe::getTriggerRequest (  )  const

Definition at line 462 of file ConsumerPipe.cc.

References triggerSelection_.

00463 {
00464   return triggerSelection_;
00465 }

Strings stor::ConsumerPipe::getTriggerSelection (  )  const [inline]

Definition at line 106 of file ConsumerPipe.h.

References triggerSelection_.

00106 { return triggerSelection_; }

bool stor::ConsumerPipe::hasRegistryWarning (  )  const [inline]

Definition at line 73 of file ConsumerPipe.h.

References registryWarningWasReported_.

00073 { return registryWarningWasReported_; }

void ConsumerPipe::initializeSelection ( Strings const &  fullTriggerList,
uint32  outputModuleId 
)

Initializes the event selection for this consumer based on the specified full list of triggers and the request ParameterSet that was specified in the constructor.

Definition at line 152 of file ConsumerPipe.cc.

References consumerId_, lat::endl(), eventSelector_, FDEBUG, hltOutputModuleId_, initializationDone, and triggerSelection_.

00154 {
00155   FDEBUG(5) << "Initializing consumer pipe, ID = " <<
00156     consumerId_ << std::endl;
00157 
00158   // store the output module id
00159   hltOutputModuleId_ = outputModuleId;
00160 
00161   // create our event selector
00162   eventSelector_.reset(new EventSelector(triggerSelection_,
00163                                          fullTriggerList));
00164   // indicate that initialization is complete
00165   initializationDone = true;
00166 
00167 }

bool ConsumerPipe::isActive (  )  const

Tests if the consumer is active.

The active state indicates that the consumer is connected and is actively requesting events.

Definition at line 173 of file ConsumerPipe.cc.

References lastEventRequestTime_, NULL, and timeToIdleState_.

00174 {
00175   time_t timeDiff = time(NULL) - lastEventRequestTime_;
00176   return (timeDiff < timeToIdleState_);
00177 }

bool ConsumerPipe::isDisconnected (  )  const

Tests if the consumer has disconnected.

Definition at line 194 of file ConsumerPipe.cc.

References lastEventRequestTime_, NULL, and timeToDisconnectedState_.

Referenced by getEvent().

00195 {
00196   time_t timeDiff = time(NULL) - lastEventRequestTime_;
00197   return (timeDiff >= timeToDisconnectedState_);
00198 }

bool ConsumerPipe::isIdle (  )  const

Tests if the consumer is idle (as opposed to active).

The idle state indicates that the consumer is still connected, but it hasn't requested an event in some time.

Definition at line 184 of file ConsumerPipe.cc.

References lastEventRequestTime_, NULL, timeToDisconnectedState_, and timeToIdleState_.

Referenced by getEvent().

00185 {
00186   time_t timeDiff = time(NULL) - lastEventRequestTime_;
00187   return (timeDiff >= timeToIdleState_ &&
00188           timeDiff <  timeToDisconnectedState_);
00189 }

bool stor::ConsumerPipe::isProxyServer (  )  const [inline]

Definition at line 72 of file ConsumerPipe.h.

References consumerIsProxyServer_.

00072 { return consumerIsProxyServer_; }

bool ConsumerPipe::isReadyForEvent ( double  currentTime = BaseCounter::getCurrentTime()  )  const

Tests if the consumer is ready for an event.

This method is often used in conjunction with the wantsEvent() method. In those cases, the wantsEvent() method should be called first to get "DESIRED" event statistics that are not biased by the consumer rate request.

Definition at line 207 of file ConsumerPipe.cc.

References initializationDone, lastConsideredEventTime_, minTimeBetweenEvents_, rateRequest_, and rateRequestCounter_.

00208 {
00209   // we're not ready if we haven't yet been initialized
00210   // or are no longer active
00211   if (! initializationDone) {return false;}
00212   if (! this->isActive()) {return false;}
00213 
00214   // check if enough time has elapsed since the last event was considered.
00215   // 16-Apr-2008, KAB:  The simple method here would be to always use
00216   // "currentTime - lastTime" is greater-than-or-equal-to the minimum time
00217   // between events.  However, that doesn't provide enough rate, in my
00218   // opinion.  For example, let's say that the rate of triggers for a
00219   // particular consumer is 1.0 Hz, and the consumer rate request is 10 Hz.
00220   // The simple method will undershoot the 1.0 Hz because occasionally
00221   // an event is just under the 1.0 sec cutoff.  Using a longer time frame
00222   // (with a RollingSampleCounter or something else) allows us to provide
00223   // a more accurate requested rate.
00224   if (! rateRequestCounter_->hasValidResult()) {
00225     return ((currentTime - lastConsideredEventTime_) >= minTimeBetweenEvents_);
00226   }
00227   else {
00228     return (rateRequestCounter_->getSampleRate(currentTime) < rateRequest_);
00229   }
00230 }

bool ConsumerPipe::pushEvent (  )  [private]

Definition at line 363 of file ConsumerPipe.cc.

References TestMuL1L2Filter_cff::cerr, consumerName_, stor::ReadData::d_, data, lat::endl(), eventQueue_, eventQueueLock_, FDEBUG, stor::func(), han_, headers_, i, len, NULL, stor::setopt(), EventMsgView::size(), and EventMsgView::startAddress().

Referenced by putEvent().

00364 {
00365   // fetch the most recent event
00366   boost::shared_ptr< std::vector<char> > bufPtr;
00367   {
00368     boost::mutex::scoped_lock scopedLockForEventQueue(eventQueueLock_);
00369     if (! eventQueue_.empty())
00370     {
00371       bufPtr = eventQueue_.front();
00372       eventQueue_.pop_front();
00373     }
00374   }
00375   if (bufPtr.get() == NULL)
00376   {
00377     edm::LogError("pushEvent") << "========================================";
00378     edm::LogError("pushEvent") << "pushEvent called with empty event queue!";
00379     return false;
00380   }
00381 
00382   // push the next event out to a push mode consumer (SMProxyServer)
00383   FDEBUG(5) << "pushing out event to " << consumerName_ << std::endl;
00384   stor::ReadData data;
00385 
00386   data.d_.clear();
00387   // check if curl handle was obtained (at ctor) if not try again
00388   if(han_==0)
00389   {
00390     han_ = curl_easy_init();
00391     if(han_==0)
00392     {
00393       edm::LogError("pushEvent") << "Could not create curl handle";
00394       return false;
00395     }
00396     headers_ = curl_slist_append(headers_, "Content-Type: application/octet-stream");
00397     headers_ = curl_slist_append(headers_, "Content-Transfer-Encoding: binary");
00398     // Avoid the Expect: 100 continue automatic header that gives a 2 sec delay
00399     headers_ = curl_slist_append(headers_, "Expect:");
00400     setopt(han_, CURLOPT_HTTPHEADER, headers_);
00401     setopt(han_, CURLOPT_URL, consumerName_.c_str());
00402     setopt(han_, CURLOPT_WRITEFUNCTION, func);
00403   }
00404 
00405   setopt(han_,CURLOPT_WRITEDATA,&data);
00406 
00407   // build the event message
00408   EventMsgView msgView(&(*bufPtr)[0]);
00409 
00410   // add the request message as a https post
00411   setopt(han_, CURLOPT_POSTFIELDS, msgView.startAddress());
00412   setopt(han_, CURLOPT_POSTFIELDSIZE, msgView.size());
00413 
00414   // send the HTTP POST, read the reply
00415   // explicitly close connection when using pthttps transport or sometimes it hangs
00416   // because somtimes curl does not see that the connection was closed and tries to reuse it
00417   setopt(han_,CURLOPT_FORBID_REUSE, 1);
00418   CURLcode messageStatus = curl_easy_perform(han_);
00419 
00420   if(messageStatus!=0)
00421   {
00422     cerr << "curl perform failed for pushEvent" << endl;
00423     edm::LogError("pushEvent") << "curl perform failed for pushEvent. "
00424         << "Could not register: probably XDAQ not running on Storage Manager"
00425         << " at " << consumerName_;
00426     return false;
00427   }
00428   // should really read the message to see if okay (if we had made one!)
00429   if(data.d_.length() == 0)
00430   {
00431     return true;
00432   } else {
00433     if(data.d_.length() > 0) {
00434       std::vector<char> buf(1024);
00435       int len = data.d_.length();
00436       buf.resize(len);
00437       for (int i=0; i<len ; i++) buf[i] = data.d_[i];
00438       const unsigned int MAX_DUMP_LENGTH = 1000;
00439       edm::LogError("pushEvent") << "========================================";
00440       edm::LogError("pushEvent") << "Unexpected pushEvent response!";
00441       if (data.d_.length() <= MAX_DUMP_LENGTH) {
00442         edm::LogError("pushEvent") << "Here is the raw text that was returned:";
00443         edm::LogError("pushEvent") << data.d_;
00444       }
00445       else {
00446         edm::LogError("pushEvent") << "Here are the first " << MAX_DUMP_LENGTH <<
00447           " characters of the raw text that was returned:";
00448         edm::LogError("pushEvent") << (data.d_.substr(0, MAX_DUMP_LENGTH));
00449       }
00450       edm::LogError("pushEvent") << "========================================";
00451     }
00452   }
00453   return false;
00454 }

void ConsumerPipe::putEvent ( boost::shared_ptr< std::vector< char > >  bufPtr  ) 

Adds the specified event to this consumer pipe.

Definition at line 292 of file ConsumerPipe.cc.

References eventQueue_, eventQueueLock_, events_, stor::BaseCounter::getCurrentTime(), lastEventRequestTime_, longTermQueuedCounter_, longTermServedCounter_, ltQueueSizeWhenQueuedCounter_, maxQueueSize_, NULL, pushEvent(), pushEventFailures_, pushMode_, shortTermQueuedCounter_, shortTermServedCounter_, and stQueueSizeWhenQueuedCounter_.

00293 {
00294   // add this event to the queue
00295   boost::mutex::scoped_lock scopedLockForEventQueue(eventQueueLock_);
00296 
00297   // add the event to our statistics for "queued" events
00298   double now = BaseCounter::getCurrentTime();
00299   double sizeInMB = static_cast<double>(bufPtr->size()) / 1048576.0;
00300   ltQueueSizeWhenQueuedCounter_->addSample(eventQueue_.size());
00301   stQueueSizeWhenQueuedCounter_->addSample(eventQueue_.size(), now);
00302   longTermQueuedCounter_->addSample(sizeInMB);
00303   shortTermQueuedCounter_->addSample(sizeInMB, now);
00304 
00305   eventQueue_.push_back(bufPtr);
00306 
00307   while (eventQueue_.size() > maxQueueSize_) {
00308     eventQueue_.pop_front();
00309   }
00310   // if a push mode consumer actually push the event out to SMProxyServer
00311   if(pushMode_) {
00312     bool success = pushEvent();
00313     // update the time of the most recent successful transaction
00314     if(!success) ++pushEventFailures_;
00315     else
00316     {
00317       lastEventRequestTime_ = time(NULL);
00318       ++events_;
00319       // add the event to our statistics for "served" events
00320       longTermServedCounter_->addSample(sizeInMB);
00321       shortTermServedCounter_->addSample(sizeInMB, now);
00322     }
00323   }
00324 }

void stor::ConsumerPipe::setPushMode ( bool  mode  )  [inline]

Definition at line 78 of file ConsumerPipe.h.

References pushMode_.

00078 { pushMode_ = mode; }

void ConsumerPipe::setRegistryWarning ( std::vector< char > const &  message  ) 

Definition at line 475 of file ConsumerPipe.cc.

References registryWarningMessage_, and registryWarningWasReported_.

00476 {
00477   // assign the registry warning text before setting the warning flag
00478   // to avoid race conditions in which the hasRegistryWarning() method
00479   // would return true but the message hasn't been set (simpler than
00480   // adding a mutex for the warning message string)
00481   registryWarningMessage_ = message;
00482   registryWarningWasReported_ = true;
00483 }

void ConsumerPipe::setRegistryWarning ( std::string const &  message  ) 

Definition at line 467 of file ConsumerPipe.cc.

References edmNew::copy().

00468 {
00469   // convert the string to a vector of char and then pass off the work
00470   std::vector<char> warningBuff(message.size());;
00471   std::copy(message.begin(), message.end(), warningBuff.begin());
00472   setRegistryWarning(warningBuff);
00473 }

bool ConsumerPipe::wantsEvent ( EventMsgView const &  eventView  )  const

Tests if the consumer wants the specified event.

This method is often used in conjuntion with the isReadyForEvent() method. In those cases, this method should be called first to generate "DESIRED" event statistics that are not biased by the consumer rate request.

Definition at line 238 of file ConsumerPipe.cc.

References eventQueue_, eventSelector_, stor::BaseCounter::getCurrentTime(), EventMsgView::hltCount(), hltOutputModuleId_, EventMsgView::hltTriggerBits(), initializationDone, longTermDesiredCounter_, ltQueueSizeWhenDesiredCounter_, EventMsgView::outModId(), shortTermDesiredCounter_, EventMsgView::size(), and stQueueSizeWhenDesiredCounter_.

00239 {
00240   // we're not interested in events if we haven't yet been initialized
00241   // or are no longer active
00242   if (! initializationDone) {return false;}
00243   if (! this->isActive()) {return false;}
00244 
00245   // the event must be from the correct HLT output module
00246   if (! this->isProxyServer() &&
00247       eventView.outModId() != hltOutputModuleId_) {return false;}
00248 
00249   // get trigger bits for this event and check using eventSelector_
00250   std::vector<unsigned char> hlt_out;
00251   hlt_out.resize(1 + (eventView.hltCount()-1)/4);
00252   eventView.hltTriggerBits(&hlt_out[0]);
00253   int num_paths = eventView.hltCount();
00254   bool rc = (eventSelector_->wantAll() || eventSelector_->acceptEvent(&hlt_out[0], num_paths));
00255 
00256   // if we want this event, add it to our statistics for "desired"
00257   // or "acceptable" events.
00258   if (rc) {
00259     double now = BaseCounter::getCurrentTime();
00260     double sizeInMB = static_cast<double>(eventView.size()) / 1048576.0;
00261     ltQueueSizeWhenDesiredCounter_->addSample(eventQueue_.size());
00262     stQueueSizeWhenDesiredCounter_->addSample(eventQueue_.size(), now);
00263     longTermDesiredCounter_->addSample(sizeInMB);
00264     shortTermDesiredCounter_->addSample(sizeInMB, now);
00265   }
00266   return rc;
00267 }

void ConsumerPipe::wasConsidered ( double  currentTime = BaseCounter::getCurrentTime()  ) 

Tells the ConsumerPipe that an event was considered or accepted or queued.

This method may be used in conjunction with the putEvent() method, but it may be used independently if external prescaling is done. Basically, this method is used to tell the ConsumerPipe instance that it should update its internal state for keeping track of whether it is ready for another event.

This extra work is needed to support our fair-share event serving model. With fair-event-serving, we need a way for external entities to get a realistic event rate for a consumer by calling the isReadyForEvent() method independent of whether events actually end up being queued.

Definition at line 283 of file ConsumerPipe.cc.

References lastConsideredEventTime_, and rateRequestCounter_.

00284 {
00285   lastConsideredEventTime_ = currentTime;
00286   rateRequestCounter_->addSample(1.0, currentTime);
00287 }


Member Data Documentation

uint32 stor::ConsumerPipe::consumerId_ [private]

Definition at line 115 of file ConsumerPipe.h.

Referenced by ConsumerPipe(), getConsumerId(), initializeSelection(), and ~ConsumerPipe().

bool stor::ConsumerPipe::consumerIsProxyServer_ [private]

Definition at line 127 of file ConsumerPipe.h.

Referenced by ConsumerPipe(), and isProxyServer().

std::string stor::ConsumerPipe::consumerName_ [private]

Definition at line 116 of file ConsumerPipe.h.

Referenced by ConsumerPipe(), getConsumerName(), and pushEvent().

std::string stor::ConsumerPipe::consumerPriority_ [private]

Definition at line 117 of file ConsumerPipe.h.

Referenced by ConsumerPipe().

std::deque< boost::shared_ptr< std::vector<char> > > stor::ConsumerPipe::eventQueue_ [private]

Definition at line 150 of file ConsumerPipe.h.

Referenced by clearQueue(), getEvent(), pushEvent(), putEvent(), and wantsEvent().

boost::mutex stor::ConsumerPipe::eventQueueLock_ [private]

Definition at line 154 of file ConsumerPipe.h.

Referenced by clearQueue(), getEvent(), pushEvent(), and putEvent().

int stor::ConsumerPipe::events_ [private]

Definition at line 118 of file ConsumerPipe.h.

Referenced by getEvent(), getEvents(), and putEvent().

boost::shared_ptr<edm::EventSelector> stor::ConsumerPipe::eventSelector_ [private]

Definition at line 130 of file ConsumerPipe.h.

Referenced by initializeSelection(), and wantsEvent().

CURL* stor::ConsumerPipe::han_ [private]

Definition at line 112 of file ConsumerPipe.h.

Referenced by ConsumerPipe(), pushEvent(), and ~ConsumerPipe().

struct curl_slist* stor::ConsumerPipe::headers_ [read, private]

Definition at line 113 of file ConsumerPipe.h.

Referenced by ConsumerPipe(), pushEvent(), and ~ConsumerPipe().

uint32 stor::ConsumerPipe::hltOutputModuleId_ [private]

Definition at line 122 of file ConsumerPipe.h.

Referenced by initializeSelection(), and wantsEvent().

std::string stor::ConsumerPipe::hltOutputSelection_ [private]

Definition at line 121 of file ConsumerPipe.h.

Referenced by getHLTOutputSelection().

std::string stor::ConsumerPipe::hostName_ [private]

Definition at line 126 of file ConsumerPipe.h.

Referenced by getHostName().

bool stor::ConsumerPipe::initializationDone [private]

Definition at line 138 of file ConsumerPipe.h.

Referenced by ConsumerPipe(), initializeSelection(), isReadyForEvent(), and wantsEvent().

double stor::ConsumerPipe::lastConsideredEventTime_ [private]

Definition at line 125 of file ConsumerPipe.h.

Referenced by ConsumerPipe(), isReadyForEvent(), and wasConsidered().

time_t stor::ConsumerPipe::lastEventRequestTime_ [private]

Definition at line 135 of file ConsumerPipe.h.

Referenced by ConsumerPipe(), getEvent(), getLastEventRequestTime(), isActive(), isDisconnected(), isIdle(), and putEvent().

boost::shared_ptr<ForeverCounter> stor::ConsumerPipe::longTermDesiredCounter_ [private]

Definition at line 161 of file ConsumerPipe.h.

Referenced by ConsumerPipe(), getDataRate(), getDuration(), getEventCount(), getEventRate(), and wantsEvent().

boost::shared_ptr<ForeverCounter> stor::ConsumerPipe::longTermQueuedCounter_ [private]

Definition at line 163 of file ConsumerPipe.h.

Referenced by ConsumerPipe(), getDataRate(), getDuration(), getEventCount(), getEventRate(), and putEvent().

boost::shared_ptr<ForeverCounter> stor::ConsumerPipe::longTermServedCounter_ [private]

Definition at line 165 of file ConsumerPipe.h.

Referenced by ConsumerPipe(), getDataRate(), getDuration(), getEvent(), getEventCount(), getEventRate(), and putEvent().

boost::shared_ptr<ForeverCounter> stor::ConsumerPipe::ltQueueSizeWhenDesiredCounter_ [private]

Definition at line 168 of file ConsumerPipe.h.

Referenced by ConsumerPipe(), getAverageQueueSize(), and wantsEvent().

boost::shared_ptr<ForeverCounter> stor::ConsumerPipe::ltQueueSizeWhenQueuedCounter_ [private]

Definition at line 170 of file ConsumerPipe.h.

Referenced by ConsumerPipe(), getAverageQueueSize(), and putEvent().

const double ConsumerPipe::MAX_ACCEPT_INTERVAL = 86400.0 [static]

Initialize the maximum accept interval.

Definition at line 56 of file ConsumerPipe.h.

Referenced by ConsumerPipe().

unsigned int stor::ConsumerPipe::maxQueueSize_ [private]

Definition at line 151 of file ConsumerPipe.h.

Referenced by putEvent().

double stor::ConsumerPipe::minTimeBetweenEvents_ [private]

Definition at line 124 of file ConsumerPipe.h.

Referenced by ConsumerPipe(), and isReadyForEvent().

unsigned int stor::ConsumerPipe::pushEventFailures_ [private]

Definition at line 143 of file ConsumerPipe.h.

Referenced by getPushEventFailures(), and putEvent().

bool stor::ConsumerPipe::pushMode_ [private]

Definition at line 141 of file ConsumerPipe.h.

Referenced by ConsumerPipe(), putEvent(), and setPushMode().

double stor::ConsumerPipe::rateRequest_ [private]

Definition at line 120 of file ConsumerPipe.h.

Referenced by ConsumerPipe(), getRateRequest(), and isReadyForEvent().

boost::shared_ptr<RollingSampleCounter> stor::ConsumerPipe::rateRequestCounter_ [private]

Definition at line 123 of file ConsumerPipe.h.

Referenced by ConsumerPipe(), isReadyForEvent(), and wasConsidered().

std::vector<char> stor::ConsumerPipe::registryWarningMessage_ [private]

Definition at line 147 of file ConsumerPipe.h.

Referenced by getRegistryWarning(), and setRegistryWarning().

bool stor::ConsumerPipe::registryWarningWasReported_ [private]

Definition at line 146 of file ConsumerPipe.h.

Referenced by clearRegistryWarning(), ConsumerPipe(), hasRegistryWarning(), and setRegistryWarning().

uint32 ConsumerPipe::rootId_ = 1 [static, private]

Initialize the static value for the root consumer id.

Definition at line 157 of file ConsumerPipe.h.

Referenced by ConsumerPipe().

boost::mutex ConsumerPipe::rootIdLock_ [static, private]

Initialize the static lock used to control access to the root ID.

Definition at line 158 of file ConsumerPipe.h.

Referenced by ConsumerPipe().

boost::shared_ptr<RollingIntervalCounter> stor::ConsumerPipe::shortTermDesiredCounter_ [private]

Definition at line 162 of file ConsumerPipe.h.

Referenced by ConsumerPipe(), getDataRate(), getDuration(), getEventCount(), getEventRate(), and wantsEvent().

boost::shared_ptr<RollingIntervalCounter> stor::ConsumerPipe::shortTermQueuedCounter_ [private]

Definition at line 164 of file ConsumerPipe.h.

Referenced by ConsumerPipe(), getDataRate(), getDuration(), getEventCount(), getEventRate(), and putEvent().

boost::shared_ptr<RollingIntervalCounter> stor::ConsumerPipe::shortTermServedCounter_ [private]

Definition at line 166 of file ConsumerPipe.h.

Referenced by ConsumerPipe(), getDataRate(), getDuration(), getEvent(), getEventCount(), getEventRate(), and putEvent().

boost::shared_ptr<RollingIntervalCounter> stor::ConsumerPipe::stQueueSizeWhenDesiredCounter_ [private]

Definition at line 169 of file ConsumerPipe.h.

Referenced by ConsumerPipe(), getAverageQueueSize(), and wantsEvent().

boost::shared_ptr<RollingIntervalCounter> stor::ConsumerPipe::stQueueSizeWhenQueuedCounter_ [private]

Definition at line 171 of file ConsumerPipe.h.

Referenced by ConsumerPipe(), getAverageQueueSize(), and putEvent().

int stor::ConsumerPipe::timeToDisconnectedState_ [private]

Definition at line 134 of file ConsumerPipe.h.

Referenced by ConsumerPipe(), isDisconnected(), and isIdle().

int stor::ConsumerPipe::timeToIdleState_ [private]

Definition at line 133 of file ConsumerPipe.h.

Referenced by ConsumerPipe(), isActive(), and isIdle().

Strings stor::ConsumerPipe::triggerSelection_ [private]

Definition at line 119 of file ConsumerPipe.h.

Referenced by getTriggerRequest(), getTriggerSelection(), and initializeSelection().


The documentation for this class was generated from the following files:
Generated on Tue Jun 9 18:52:50 2009 for CMSSW by  doxygen 1.5.4