#include <EventFilter/StorageManager/interface/ConsumerPipe.h>
Public Types | |
enum | STATS_SAMPLE_TYPE { QUEUED_EVENTS = 10, SERVED_EVENTS = 11, DESIRED_EVENTS = 12 } |
enum | STATS_TIME_FRAME { SHORT_TERM = 0, LONG_TERM = 1 } |
Public Member Functions | |
void | clearQueue () |
void | clearRegistryWarning () |
ConsumerPipe (std::string name, std::string priority, int activeTimeout, int idleTimeout, Strings triggerSelection, double rateRequest, std::string hltOutputSelection, std::string hostName, int queueSize) | |
ConsumerPipe constructor. | |
double | getAverageQueueSize (STATS_TIME_FRAME timeFrame=SHORT_TERM, STATS_SAMPLE_TYPE sampleType=QUEUED_EVENTS, double currentTime=BaseCounter::getCurrentTime()) |
Returns the average queue size for the specified statistics types (short term vs. | |
uint32 | getConsumerId () const |
Returns the consumer ID associated with this pipe. | |
std::string | getConsumerName () |
double | getDataRate (STATS_TIME_FRAME timeFrame=SHORT_TERM, STATS_SAMPLE_TYPE sampleType=QUEUED_EVENTS, double currentTime=BaseCounter::getCurrentTime()) |
Returns the data rate for the specified statistics types (short term vs. | |
double | getDuration (STATS_TIME_FRAME timeFrame=SHORT_TERM, STATS_SAMPLE_TYPE sampleType=QUEUED_EVENTS, double currentTime=BaseCounter::getCurrentTime()) |
Returns the duration (in seconds) for the specified statistics types (short term vs. | |
boost::shared_ptr< std::vector < char > > | getEvent () |
Fetches the next event from this consumer pipe. | |
long long | getEventCount (STATS_TIME_FRAME timeFrame=SHORT_TERM, STATS_SAMPLE_TYPE sampleType=QUEUED_EVENTS, double currentTime=BaseCounter::getCurrentTime()) |
Returns the number of events for the specified statistics types (short term vs. | |
double | getEventRate (STATS_TIME_FRAME timeFrame=SHORT_TERM, STATS_SAMPLE_TYPE sampleType=QUEUED_EVENTS, double currentTime=BaseCounter::getCurrentTime()) |
Returns the rate of events for the specified statistics types (short term vs. | |
unsigned int | getEvents () |
std::string | getHLTOutputSelection () const |
std::string | getHostName () |
time_t | getLastEventRequestTime () |
unsigned int | getPushEventFailures () |
double | getRateRequest () const |
std::vector< char > | getRegistryWarning () |
std::vector< std::string > | getTriggerRequest () const |
Strings | getTriggerSelection () const |
bool | hasRegistryWarning () const |
void | initializeSelection (Strings const &fullTriggerList, uint32 outputModuleId) |
Initializes the event selection for this consumer based on the specified full list of triggers and the request ParameterSet that was specified in the constructor. | |
bool | isActive () const |
Tests if the consumer is active. | |
bool | isDisconnected () const |
Tests if the consumer has disconnected. | |
bool | isIdle () const |
Tests if the consumer is idle (as opposed to active). | |
bool | isProxyServer () const |
bool | isReadyForEvent (double currentTime=BaseCounter::getCurrentTime()) const |
Tests if the consumer is ready for an event. | |
void | putEvent (boost::shared_ptr< std::vector< char > > bufPtr) |
Adds the specified event to this consumer pipe. | |
void | setPushMode (bool mode) |
void | setRegistryWarning (std::vector< char > const &message) |
void | setRegistryWarning (std::string const &message) |
bool | wantsEvent (EventMsgView const &eventView) const |
Tests if the consumer wants the specified event. | |
void | wasConsidered (double currentTime=BaseCounter::getCurrentTime()) |
Tells the ConsumerPipe that an event was considered or accepted or queued. | |
~ConsumerPipe () | |
ConsumerPipe destructor. | |
Static Public Attributes | |
static const double | MAX_ACCEPT_INTERVAL = 86400.0 |
Initialize the maximum accept interval. | |
Private Member Functions | |
bool | pushEvent () |
Private Attributes | |
uint32 | consumerId_ |
bool | consumerIsProxyServer_ |
std::string | consumerName_ |
std::string | consumerPriority_ |
std::deque< boost::shared_ptr < std::vector< char > > > | eventQueue_ |
boost::mutex | eventQueueLock_ |
int | events_ |
boost::shared_ptr < edm::EventSelector > | eventSelector_ |
CURL * | han_ |
struct curl_slist * | headers_ |
uint32 | hltOutputModuleId_ |
std::string | hltOutputSelection_ |
std::string | hostName_ |
bool | initializationDone |
double | lastConsideredEventTime_ |
time_t | lastEventRequestTime_ |
boost::shared_ptr< ForeverCounter > | longTermDesiredCounter_ |
boost::shared_ptr< ForeverCounter > | longTermQueuedCounter_ |
boost::shared_ptr< ForeverCounter > | longTermServedCounter_ |
boost::shared_ptr< ForeverCounter > | ltQueueSizeWhenDesiredCounter_ |
boost::shared_ptr< ForeverCounter > | ltQueueSizeWhenQueuedCounter_ |
unsigned int | maxQueueSize_ |
double | minTimeBetweenEvents_ |
unsigned int | pushEventFailures_ |
bool | pushMode_ |
double | rateRequest_ |
boost::shared_ptr < RollingSampleCounter > | rateRequestCounter_ |
std::vector< char > | registryWarningMessage_ |
bool | registryWarningWasReported_ |
boost::shared_ptr < RollingIntervalCounter > | shortTermDesiredCounter_ |
boost::shared_ptr < RollingIntervalCounter > | shortTermQueuedCounter_ |
boost::shared_ptr < RollingIntervalCounter > | shortTermServedCounter_ |
boost::shared_ptr < RollingIntervalCounter > | stQueueSizeWhenDesiredCounter_ |
boost::shared_ptr < RollingIntervalCounter > | stQueueSizeWhenQueuedCounter_ |
int | timeToDisconnectedState_ |
int | timeToIdleState_ |
Strings | triggerSelection_ |
Static Private Attributes | |
static uint32 | rootId_ = 1 |
Initialize the static value for the root consumer id. | |
static boost::mutex | rootIdLock_ |
Initialize the static lock used to control access to the root ID. |
Definition at line 49 of file ConsumerPipe.h.
Definition at line 53 of file ConsumerPipe.h.
00053 { QUEUED_EVENTS = 10, SERVED_EVENTS = 11, 00054 DESIRED_EVENTS = 12 };
ConsumerPipe::ConsumerPipe | ( | std::string | name, | |
std::string | priority, | |||
int | activeTimeout, | |||
int | idleTimeout, | |||
Strings | triggerSelection, | |||
double | rateRequest, | |||
std::string | hltOutputSelection, | |||
std::string | hostName, | |||
int | queueSize | |||
) |
ConsumerPipe constructor.
Definition at line 39 of file ConsumerPipe.cc.
References consumerId_, consumerIsProxyServer_, consumerName_, consumerPriority_, stor::func(), stor::BaseCounter::getCurrentTime(), han_, headers_, stor::RollingSampleCounter::INCLUDE_SAMPLES_IMMEDIATELY, initializationDone, lastConsideredEventTime_, lastEventRequestTime_, longTermDesiredCounter_, longTermQueuedCounter_, longTermServedCounter_, ltQueueSizeWhenDesiredCounter_, ltQueueSizeWhenQueuedCounter_, MAX_ACCEPT_INTERVAL, minTimeBetweenEvents_, NULL, pushMode_, rateRequest_, rateRequestCounter_, registryWarningWasReported_, rootId_, rootIdLock_, stor::setopt(), shortTermDesiredCounter_, shortTermQueuedCounter_, shortTermServedCounter_, stQueueSizeWhenDesiredCounter_, stQueueSizeWhenQueuedCounter_, timeToDisconnectedState_, and timeToIdleState_.
00043 : 00044 han_(curl_easy_init()), 00045 headers_(), 00046 consumerName_(name),consumerPriority_(priority), 00047 events_(0), 00048 triggerSelection_(triggerSelection), 00049 rateRequest_(rateRequest), 00050 hltOutputSelection_(hltOutputSelection), 00051 hostName_(hostName), 00052 pushEventFailures_(0), 00053 maxQueueSize_(queueSize) 00054 { 00055 // initialize the time values we use for defining "states" 00056 timeToIdleState_ = activeTimeout; 00057 timeToDisconnectedState_ = activeTimeout + idleTimeout; 00058 lastEventRequestTime_ = time(NULL); 00059 initializationDone = false; 00060 pushMode_ = false; 00061 if(consumerPriority_.compare("PushMode") == 0) pushMode_ = true; 00062 registryWarningWasReported_ = false; 00063 00064 // determine if we're connected to a proxy server 00065 consumerIsProxyServer_ = false; 00066 //if (consumerName_ == PROXY_SERVER_NAME) 00067 if (consumerName_.find("urn") != std::string::npos && 00068 consumerName_.find("xdaq") != std::string::npos && 00069 consumerName_.find("pushEventData") != std::string::npos) 00070 { 00071 consumerIsProxyServer_ = true; 00072 } 00073 00074 // assign the consumer ID 00075 boost::mutex::scoped_lock scopedLockForRootId(rootIdLock_); 00076 consumerId_ = rootId_; 00077 rootId_++; 00078 00079 if(han_==0) 00080 { 00081 edm::LogError("ConsumerPipe") << "Could not create curl handle"; 00082 //std::cout << "Could not create curl handle" << std::endl; 00083 // throw exception here when we can make the SM go to a fail state from 00084 // another thread 00085 } else { 00086 headers_ = curl_slist_append(headers_, "Content-Type: application/octet-stream"); 00087 headers_ = curl_slist_append(headers_, "Content-Transfer-Encoding: binary"); 00088 // Avoid the Expect: 100 continue automatic header that gives a 2 sec delay 00089 // for pthttps but we don't need the Expect: 100 continue anyway 00090 headers_ = curl_slist_append(headers_, "Expect:"); 00091 setopt(han_, CURLOPT_HTTPHEADER, headers_); 00092 setopt(han_, CURLOPT_URL, consumerName_.c_str()); 00093 setopt(han_, CURLOPT_WRITEFUNCTION, func); 00094 // debug options 00095 //setopt(han_,CURLOPT_VERBOSE, 1); 00096 //setopt(han_,CURLOPT_TCP_NODELAY, 1); 00097 } 00098 00099 // determine the amount of time that we need to wait between accepted 00100 // events. The request rate specified to this constructor is 00101 // converted to an interval that is used internally, and the interval 00102 // is required to be somewhat reasonable. 00103 if (rateRequest_ < (1.0 / MAX_ACCEPT_INTERVAL)) 00104 { 00105 minTimeBetweenEvents_ = MAX_ACCEPT_INTERVAL; 00106 } 00107 else 00108 { 00109 minTimeBetweenEvents_ = 1.0 / rateRequest_; // seconds 00110 } 00111 lastConsideredEventTime_ = BaseCounter::getCurrentTime(); 00112 rateRequestCounter_.reset(new RollingSampleCounter(50,1,60,RollingSampleCounter::INCLUDE_SAMPLES_IMMEDIATELY)); 00113 00114 // initialize the counters that we use for statistics 00115 longTermDesiredCounter_.reset(new ForeverCounter()); 00116 shortTermDesiredCounter_.reset(new RollingIntervalCounter(180,5,20)); 00117 longTermQueuedCounter_.reset(new ForeverCounter()); 00118 shortTermQueuedCounter_.reset(new RollingIntervalCounter(180,5,20)); 00119 longTermServedCounter_.reset(new ForeverCounter()); 00120 shortTermServedCounter_.reset(new RollingIntervalCounter(180,5,20)); 00121 00122 ltQueueSizeWhenDesiredCounter_.reset(new ForeverCounter()); 00123 stQueueSizeWhenDesiredCounter_.reset(new RollingIntervalCounter(180,5,20)); 00124 ltQueueSizeWhenQueuedCounter_.reset(new ForeverCounter()); 00125 stQueueSizeWhenQueuedCounter_.reset(new RollingIntervalCounter(180,5,20)); 00126 }
ConsumerPipe::~ConsumerPipe | ( | ) |
ConsumerPipe destructor.
Definition at line 131 of file ConsumerPipe.cc.
References consumerId_, lat::endl(), FDEBUG, han_, and headers_.
00132 { 00133 FDEBUG(5) << "Executing destructor for consumer pipe with ID = " << 00134 consumerId_ << std::endl; 00135 curl_slist_free_all(headers_); 00136 curl_easy_cleanup(han_); 00137 }
void ConsumerPipe::clearQueue | ( | ) |
Definition at line 456 of file ConsumerPipe.cc.
References eventQueue_, and eventQueueLock_.
Referenced by getEvent().
00457 { 00458 boost::mutex::scoped_lock scopedLockForEventQueue(eventQueueLock_); 00459 eventQueue_.clear(); 00460 }
void stor::ConsumerPipe::clearRegistryWarning | ( | ) | [inline] |
Definition at line 89 of file ConsumerPipe.h.
References registryWarningWasReported_.
00089 { registryWarningWasReported_ = false; }
double ConsumerPipe::getAverageQueueSize | ( | STATS_TIME_FRAME | timeFrame = SHORT_TERM , |
|
STATS_SAMPLE_TYPE | sampleType = QUEUED_EVENTS , |
|||
double | currentTime = BaseCounter::getCurrentTime() | |||
) |
Returns the average queue size for the specified statistics types (short term vs.
long term; desired vs. queued). The queue size is sampled before any additional events are added. For example, the average queue size for "queued" events is the size before each new event is queued.
Definition at line 622 of file ConsumerPipe.cc.
References ltQueueSizeWhenDesiredCounter_, ltQueueSizeWhenQueuedCounter_, QUEUED_EVENTS, SHORT_TERM, stQueueSizeWhenDesiredCounter_, and stQueueSizeWhenQueuedCounter_.
00625 { 00626 if (timeFrame == SHORT_TERM) { 00627 if (sampleType == QUEUED_EVENTS) { 00628 return stQueueSizeWhenQueuedCounter_->getValueAverage(currentTime); 00629 } 00630 else { 00631 return stQueueSizeWhenDesiredCounter_->getValueAverage(currentTime); 00632 } 00633 } 00634 else { 00635 if (sampleType == QUEUED_EVENTS) { 00636 return ltQueueSizeWhenQueuedCounter_->getValueAverage(); 00637 } 00638 else { 00639 return ltQueueSizeWhenDesiredCounter_->getValueAverage(); 00640 } 00641 } 00642 }
uint32 ConsumerPipe::getConsumerId | ( | ) | const |
Returns the consumer ID associated with this pipe.
Definition at line 142 of file ConsumerPipe.cc.
References consumerId_.
00143 { 00144 return consumerId_; 00145 }
std::string stor::ConsumerPipe::getConsumerName | ( | ) | [inline] |
Definition at line 80 of file ConsumerPipe.h.
References consumerName_.
00080 { return(consumerName_);}
double ConsumerPipe::getDataRate | ( | STATS_TIME_FRAME | timeFrame = SHORT_TERM , |
|
STATS_SAMPLE_TYPE | sampleType = QUEUED_EVENTS , |
|||
double | currentTime = BaseCounter::getCurrentTime() | |||
) |
Returns the data rate for the specified statistics types (short term vs.
long term; desired vs. queued vs. served).
Definition at line 553 of file ConsumerPipe.cc.
References DESIRED_EVENTS, longTermDesiredCounter_, longTermQueuedCounter_, longTermServedCounter_, QUEUED_EVENTS, SHORT_TERM, shortTermDesiredCounter_, shortTermQueuedCounter_, and shortTermServedCounter_.
00556 { 00557 if (timeFrame == SHORT_TERM) { 00558 if (sampleType == QUEUED_EVENTS) { 00559 return shortTermQueuedCounter_->getValueRate(currentTime); 00560 } 00561 else if (sampleType == DESIRED_EVENTS) { 00562 return shortTermDesiredCounter_->getValueRate(currentTime); 00563 } 00564 else { 00565 return shortTermServedCounter_->getValueRate(currentTime); 00566 } 00567 } 00568 else { 00569 if (sampleType == QUEUED_EVENTS) { 00570 return longTermQueuedCounter_->getValueRate(currentTime); 00571 } 00572 else if (sampleType == DESIRED_EVENTS) { 00573 return longTermDesiredCounter_->getValueRate(currentTime); 00574 } 00575 else { 00576 return longTermServedCounter_->getValueRate(currentTime); 00577 } 00578 } 00579 }
double ConsumerPipe::getDuration | ( | STATS_TIME_FRAME | timeFrame = SHORT_TERM , |
|
STATS_SAMPLE_TYPE | sampleType = QUEUED_EVENTS , |
|||
double | currentTime = BaseCounter::getCurrentTime() | |||
) |
Returns the duration (in seconds) for the specified statistics types (short term vs.
long term; desired vs. queued vs. served). "Duration" here means the length of time in which the specified statistics have been collected.
Definition at line 587 of file ConsumerPipe.cc.
References DESIRED_EVENTS, longTermDesiredCounter_, longTermQueuedCounter_, longTermServedCounter_, QUEUED_EVENTS, SHORT_TERM, shortTermDesiredCounter_, shortTermQueuedCounter_, and shortTermServedCounter_.
00590 { 00591 if (timeFrame == SHORT_TERM) { 00592 if (sampleType == QUEUED_EVENTS) { 00593 return shortTermQueuedCounter_->getDuration(currentTime); 00594 } 00595 else if (sampleType == DESIRED_EVENTS) { 00596 return shortTermDesiredCounter_->getDuration(currentTime); 00597 } 00598 else { 00599 return shortTermServedCounter_->getDuration(currentTime); 00600 } 00601 } 00602 else { 00603 if (sampleType == QUEUED_EVENTS) { 00604 return longTermQueuedCounter_->getDuration(currentTime); 00605 } 00606 else if (sampleType == DESIRED_EVENTS) { 00607 return longTermDesiredCounter_->getDuration(currentTime); 00608 } 00609 else { 00610 return longTermServedCounter_->getDuration(currentTime); 00611 } 00612 } 00613 }
boost::shared_ptr< std::vector< char > > ConsumerPipe::getEvent | ( | ) |
Fetches the next event from this consumer pipe.
If there are no events in the pipe, an empty shared_ptr will be returned (ptr.get() == NULL).
Definition at line 331 of file ConsumerPipe.cc.
References clearQueue(), eventQueue_, eventQueueLock_, events_, isDisconnected(), isIdle(), lastEventRequestTime_, longTermServedCounter_, NULL, and shortTermServedCounter_.
00332 { 00333 // 25-Aug-2005, KAB: clear out any stale event(s) 00334 if (isIdle() || isDisconnected()) 00335 { 00336 this->clearQueue(); 00337 } 00338 00339 // fetch the most recent event 00340 boost::shared_ptr< std::vector<char> > bufPtr; 00341 { 00342 boost::mutex::scoped_lock scopedLockForEventQueue(eventQueueLock_); 00343 if (! eventQueue_.empty()) 00344 { 00345 bufPtr = eventQueue_.front(); 00346 eventQueue_.pop_front(); 00347 00348 // add the event to our statistics for "served" events 00349 ++events_; 00350 double sizeInMB = static_cast<double>(bufPtr->size()) / 1048576.0; 00351 longTermServedCounter_->addSample(sizeInMB); 00352 shortTermServedCounter_->addSample(sizeInMB); 00353 } 00354 } 00355 00356 // update the time of the most recent request 00357 lastEventRequestTime_ = time(NULL); 00358 00359 // return the event 00360 return bufPtr; 00361 }
long long ConsumerPipe::getEventCount | ( | STATS_TIME_FRAME | timeFrame = SHORT_TERM , |
|
STATS_SAMPLE_TYPE | sampleType = QUEUED_EVENTS , |
|||
double | currentTime = BaseCounter::getCurrentTime() | |||
) |
Returns the number of events for the specified statistics types (short term vs.
long term; desired vs. queued vs. served).
Definition at line 489 of file ConsumerPipe.cc.
References DESIRED_EVENTS, longTermDesiredCounter_, longTermQueuedCounter_, longTermServedCounter_, QUEUED_EVENTS, SHORT_TERM, shortTermDesiredCounter_, shortTermQueuedCounter_, and shortTermServedCounter_.
00492 { 00493 if (timeFrame == SHORT_TERM) { 00494 if (sampleType == QUEUED_EVENTS) { 00495 return shortTermQueuedCounter_->getSampleCount(currentTime); 00496 } 00497 else if (sampleType == DESIRED_EVENTS) { 00498 return shortTermDesiredCounter_->getSampleCount(currentTime); 00499 } 00500 else { 00501 return shortTermServedCounter_->getSampleCount(currentTime); 00502 } 00503 } 00504 else { 00505 if (sampleType == QUEUED_EVENTS) { 00506 return longTermQueuedCounter_->getSampleCount(); 00507 } 00508 else if (sampleType == DESIRED_EVENTS) { 00509 return longTermDesiredCounter_->getSampleCount(); 00510 } 00511 else { 00512 return longTermServedCounter_->getSampleCount(); 00513 } 00514 } 00515 }
double ConsumerPipe::getEventRate | ( | STATS_TIME_FRAME | timeFrame = SHORT_TERM , |
|
STATS_SAMPLE_TYPE | sampleType = QUEUED_EVENTS , |
|||
double | currentTime = BaseCounter::getCurrentTime() | |||
) |
Returns the rate of events for the specified statistics types (short term vs.
long term; desired vs. queued vs. served).
Definition at line 521 of file ConsumerPipe.cc.
References DESIRED_EVENTS, longTermDesiredCounter_, longTermQueuedCounter_, longTermServedCounter_, QUEUED_EVENTS, SHORT_TERM, shortTermDesiredCounter_, shortTermQueuedCounter_, and shortTermServedCounter_.
00524 { 00525 if (timeFrame == SHORT_TERM) { 00526 if (sampleType == QUEUED_EVENTS) { 00527 return shortTermQueuedCounter_->getSampleRate(currentTime); 00528 } 00529 else if (sampleType == DESIRED_EVENTS) { 00530 return shortTermDesiredCounter_->getSampleRate(currentTime); 00531 } 00532 else { 00533 return shortTermServedCounter_->getSampleRate(currentTime); 00534 } 00535 } 00536 else { 00537 if (sampleType == QUEUED_EVENTS) { 00538 return longTermQueuedCounter_->getSampleRate(currentTime); 00539 } 00540 else if (sampleType == DESIRED_EVENTS) { 00541 return longTermDesiredCounter_->getSampleRate(currentTime); 00542 } 00543 else { 00544 return longTermServedCounter_->getSampleRate(currentTime); 00545 } 00546 } 00547 }
unsigned int stor::ConsumerPipe::getEvents | ( | ) | [inline] |
std::string stor::ConsumerPipe::getHLTOutputSelection | ( | ) | const [inline] |
Definition at line 108 of file ConsumerPipe.h.
References hltOutputSelection_.
00108 { return hltOutputSelection_; }
std::string stor::ConsumerPipe::getHostName | ( | ) | [inline] |
time_t stor::ConsumerPipe::getLastEventRequestTime | ( | ) | [inline] |
Definition at line 83 of file ConsumerPipe.h.
References lastEventRequestTime_.
00083 { return(lastEventRequestTime_);}
unsigned int stor::ConsumerPipe::getPushEventFailures | ( | ) | [inline] |
Definition at line 81 of file ConsumerPipe.h.
References pushEventFailures_.
00081 { return(pushEventFailures_);}
double stor::ConsumerPipe::getRateRequest | ( | ) | const [inline] |
Definition at line 107 of file ConsumerPipe.h.
References rateRequest_.
00107 { return rateRequest_; }
std::vector<char> stor::ConsumerPipe::getRegistryWarning | ( | ) | [inline] |
Definition at line 88 of file ConsumerPipe.h.
References registryWarningMessage_.
00088 { return registryWarningMessage_; }
std::vector< std::string > ConsumerPipe::getTriggerRequest | ( | ) | const |
Definition at line 462 of file ConsumerPipe.cc.
References triggerSelection_.
00463 { 00464 return triggerSelection_; 00465 }
Strings stor::ConsumerPipe::getTriggerSelection | ( | ) | const [inline] |
Definition at line 106 of file ConsumerPipe.h.
References triggerSelection_.
00106 { return triggerSelection_; }
bool stor::ConsumerPipe::hasRegistryWarning | ( | ) | const [inline] |
Definition at line 73 of file ConsumerPipe.h.
References registryWarningWasReported_.
00073 { return registryWarningWasReported_; }
Initializes the event selection for this consumer based on the specified full list of triggers and the request ParameterSet that was specified in the constructor.
Definition at line 152 of file ConsumerPipe.cc.
References consumerId_, lat::endl(), eventSelector_, FDEBUG, hltOutputModuleId_, initializationDone, and triggerSelection_.
00154 { 00155 FDEBUG(5) << "Initializing consumer pipe, ID = " << 00156 consumerId_ << std::endl; 00157 00158 // store the output module id 00159 hltOutputModuleId_ = outputModuleId; 00160 00161 // create our event selector 00162 eventSelector_.reset(new EventSelector(triggerSelection_, 00163 fullTriggerList)); 00164 // indicate that initialization is complete 00165 initializationDone = true; 00166 00167 }
bool ConsumerPipe::isActive | ( | ) | const |
Tests if the consumer is active.
The active state indicates that the consumer is connected and is actively requesting events.
Definition at line 173 of file ConsumerPipe.cc.
References lastEventRequestTime_, NULL, and timeToIdleState_.
00174 { 00175 time_t timeDiff = time(NULL) - lastEventRequestTime_; 00176 return (timeDiff < timeToIdleState_); 00177 }
bool ConsumerPipe::isDisconnected | ( | ) | const |
Tests if the consumer has disconnected.
Definition at line 194 of file ConsumerPipe.cc.
References lastEventRequestTime_, NULL, and timeToDisconnectedState_.
Referenced by getEvent().
00195 { 00196 time_t timeDiff = time(NULL) - lastEventRequestTime_; 00197 return (timeDiff >= timeToDisconnectedState_); 00198 }
bool ConsumerPipe::isIdle | ( | ) | const |
Tests if the consumer is idle (as opposed to active).
The idle state indicates that the consumer is still connected, but it hasn't requested an event in some time.
Definition at line 184 of file ConsumerPipe.cc.
References lastEventRequestTime_, NULL, timeToDisconnectedState_, and timeToIdleState_.
Referenced by getEvent().
00185 { 00186 time_t timeDiff = time(NULL) - lastEventRequestTime_; 00187 return (timeDiff >= timeToIdleState_ && 00188 timeDiff < timeToDisconnectedState_); 00189 }
bool stor::ConsumerPipe::isProxyServer | ( | ) | const [inline] |
Definition at line 72 of file ConsumerPipe.h.
References consumerIsProxyServer_.
00072 { return consumerIsProxyServer_; }
bool ConsumerPipe::isReadyForEvent | ( | double | currentTime = BaseCounter::getCurrentTime() |
) | const |
Tests if the consumer is ready for an event.
This method is often used in conjunction with the wantsEvent() method. In those cases, the wantsEvent() method should be called first to get "DESIRED" event statistics that are not biased by the consumer rate request.
Definition at line 207 of file ConsumerPipe.cc.
References initializationDone, lastConsideredEventTime_, minTimeBetweenEvents_, rateRequest_, and rateRequestCounter_.
00208 { 00209 // we're not ready if we haven't yet been initialized 00210 // or are no longer active 00211 if (! initializationDone) {return false;} 00212 if (! this->isActive()) {return false;} 00213 00214 // check if enough time has elapsed since the last event was considered. 00215 // 16-Apr-2008, KAB: The simple method here would be to always use 00216 // "currentTime - lastTime" is greater-than-or-equal-to the minimum time 00217 // between events. However, that doesn't provide enough rate, in my 00218 // opinion. For example, let's say that the rate of triggers for a 00219 // particular consumer is 1.0 Hz, and the consumer rate request is 10 Hz. 00220 // The simple method will undershoot the 1.0 Hz because occasionally 00221 // an event is just under the 1.0 sec cutoff. Using a longer time frame 00222 // (with a RollingSampleCounter or something else) allows us to provide 00223 // a more accurate requested rate. 00224 if (! rateRequestCounter_->hasValidResult()) { 00225 return ((currentTime - lastConsideredEventTime_) >= minTimeBetweenEvents_); 00226 } 00227 else { 00228 return (rateRequestCounter_->getSampleRate(currentTime) < rateRequest_); 00229 } 00230 }
bool ConsumerPipe::pushEvent | ( | ) | [private] |
Definition at line 363 of file ConsumerPipe.cc.
References TestMuL1L2Filter_cff::cerr, consumerName_, stor::ReadData::d_, data, lat::endl(), eventQueue_, eventQueueLock_, FDEBUG, stor::func(), han_, headers_, i, len, NULL, stor::setopt(), EventMsgView::size(), and EventMsgView::startAddress().
Referenced by putEvent().
00364 { 00365 // fetch the most recent event 00366 boost::shared_ptr< std::vector<char> > bufPtr; 00367 { 00368 boost::mutex::scoped_lock scopedLockForEventQueue(eventQueueLock_); 00369 if (! eventQueue_.empty()) 00370 { 00371 bufPtr = eventQueue_.front(); 00372 eventQueue_.pop_front(); 00373 } 00374 } 00375 if (bufPtr.get() == NULL) 00376 { 00377 edm::LogError("pushEvent") << "========================================"; 00378 edm::LogError("pushEvent") << "pushEvent called with empty event queue!"; 00379 return false; 00380 } 00381 00382 // push the next event out to a push mode consumer (SMProxyServer) 00383 FDEBUG(5) << "pushing out event to " << consumerName_ << std::endl; 00384 stor::ReadData data; 00385 00386 data.d_.clear(); 00387 // check if curl handle was obtained (at ctor) if not try again 00388 if(han_==0) 00389 { 00390 han_ = curl_easy_init(); 00391 if(han_==0) 00392 { 00393 edm::LogError("pushEvent") << "Could not create curl handle"; 00394 return false; 00395 } 00396 headers_ = curl_slist_append(headers_, "Content-Type: application/octet-stream"); 00397 headers_ = curl_slist_append(headers_, "Content-Transfer-Encoding: binary"); 00398 // Avoid the Expect: 100 continue automatic header that gives a 2 sec delay 00399 headers_ = curl_slist_append(headers_, "Expect:"); 00400 setopt(han_, CURLOPT_HTTPHEADER, headers_); 00401 setopt(han_, CURLOPT_URL, consumerName_.c_str()); 00402 setopt(han_, CURLOPT_WRITEFUNCTION, func); 00403 } 00404 00405 setopt(han_,CURLOPT_WRITEDATA,&data); 00406 00407 // build the event message 00408 EventMsgView msgView(&(*bufPtr)[0]); 00409 00410 // add the request message as a https post 00411 setopt(han_, CURLOPT_POSTFIELDS, msgView.startAddress()); 00412 setopt(han_, CURLOPT_POSTFIELDSIZE, msgView.size()); 00413 00414 // send the HTTP POST, read the reply 00415 // explicitly close connection when using pthttps transport or sometimes it hangs 00416 // because somtimes curl does not see that the connection was closed and tries to reuse it 00417 setopt(han_,CURLOPT_FORBID_REUSE, 1); 00418 CURLcode messageStatus = curl_easy_perform(han_); 00419 00420 if(messageStatus!=0) 00421 { 00422 cerr << "curl perform failed for pushEvent" << endl; 00423 edm::LogError("pushEvent") << "curl perform failed for pushEvent. " 00424 << "Could not register: probably XDAQ not running on Storage Manager" 00425 << " at " << consumerName_; 00426 return false; 00427 } 00428 // should really read the message to see if okay (if we had made one!) 00429 if(data.d_.length() == 0) 00430 { 00431 return true; 00432 } else { 00433 if(data.d_.length() > 0) { 00434 std::vector<char> buf(1024); 00435 int len = data.d_.length(); 00436 buf.resize(len); 00437 for (int i=0; i<len ; i++) buf[i] = data.d_[i]; 00438 const unsigned int MAX_DUMP_LENGTH = 1000; 00439 edm::LogError("pushEvent") << "========================================"; 00440 edm::LogError("pushEvent") << "Unexpected pushEvent response!"; 00441 if (data.d_.length() <= MAX_DUMP_LENGTH) { 00442 edm::LogError("pushEvent") << "Here is the raw text that was returned:"; 00443 edm::LogError("pushEvent") << data.d_; 00444 } 00445 else { 00446 edm::LogError("pushEvent") << "Here are the first " << MAX_DUMP_LENGTH << 00447 " characters of the raw text that was returned:"; 00448 edm::LogError("pushEvent") << (data.d_.substr(0, MAX_DUMP_LENGTH)); 00449 } 00450 edm::LogError("pushEvent") << "========================================"; 00451 } 00452 } 00453 return false; 00454 }
void ConsumerPipe::putEvent | ( | boost::shared_ptr< std::vector< char > > | bufPtr | ) |
Adds the specified event to this consumer pipe.
Definition at line 292 of file ConsumerPipe.cc.
References eventQueue_, eventQueueLock_, events_, stor::BaseCounter::getCurrentTime(), lastEventRequestTime_, longTermQueuedCounter_, longTermServedCounter_, ltQueueSizeWhenQueuedCounter_, maxQueueSize_, NULL, pushEvent(), pushEventFailures_, pushMode_, shortTermQueuedCounter_, shortTermServedCounter_, and stQueueSizeWhenQueuedCounter_.
00293 { 00294 // add this event to the queue 00295 boost::mutex::scoped_lock scopedLockForEventQueue(eventQueueLock_); 00296 00297 // add the event to our statistics for "queued" events 00298 double now = BaseCounter::getCurrentTime(); 00299 double sizeInMB = static_cast<double>(bufPtr->size()) / 1048576.0; 00300 ltQueueSizeWhenQueuedCounter_->addSample(eventQueue_.size()); 00301 stQueueSizeWhenQueuedCounter_->addSample(eventQueue_.size(), now); 00302 longTermQueuedCounter_->addSample(sizeInMB); 00303 shortTermQueuedCounter_->addSample(sizeInMB, now); 00304 00305 eventQueue_.push_back(bufPtr); 00306 00307 while (eventQueue_.size() > maxQueueSize_) { 00308 eventQueue_.pop_front(); 00309 } 00310 // if a push mode consumer actually push the event out to SMProxyServer 00311 if(pushMode_) { 00312 bool success = pushEvent(); 00313 // update the time of the most recent successful transaction 00314 if(!success) ++pushEventFailures_; 00315 else 00316 { 00317 lastEventRequestTime_ = time(NULL); 00318 ++events_; 00319 // add the event to our statistics for "served" events 00320 longTermServedCounter_->addSample(sizeInMB); 00321 shortTermServedCounter_->addSample(sizeInMB, now); 00322 } 00323 } 00324 }
void ConsumerPipe::setRegistryWarning | ( | std::vector< char > const & | message | ) |
Definition at line 475 of file ConsumerPipe.cc.
References registryWarningMessage_, and registryWarningWasReported_.
00476 { 00477 // assign the registry warning text before setting the warning flag 00478 // to avoid race conditions in which the hasRegistryWarning() method 00479 // would return true but the message hasn't been set (simpler than 00480 // adding a mutex for the warning message string) 00481 registryWarningMessage_ = message; 00482 registryWarningWasReported_ = true; 00483 }
void ConsumerPipe::setRegistryWarning | ( | std::string const & | message | ) |
Definition at line 467 of file ConsumerPipe.cc.
References edmNew::copy().
00468 { 00469 // convert the string to a vector of char and then pass off the work 00470 std::vector<char> warningBuff(message.size());; 00471 std::copy(message.begin(), message.end(), warningBuff.begin()); 00472 setRegistryWarning(warningBuff); 00473 }
bool ConsumerPipe::wantsEvent | ( | EventMsgView const & | eventView | ) | const |
Tests if the consumer wants the specified event.
This method is often used in conjuntion with the isReadyForEvent() method. In those cases, this method should be called first to generate "DESIRED" event statistics that are not biased by the consumer rate request.
Definition at line 238 of file ConsumerPipe.cc.
References eventQueue_, eventSelector_, stor::BaseCounter::getCurrentTime(), EventMsgView::hltCount(), hltOutputModuleId_, EventMsgView::hltTriggerBits(), initializationDone, longTermDesiredCounter_, ltQueueSizeWhenDesiredCounter_, EventMsgView::outModId(), shortTermDesiredCounter_, EventMsgView::size(), and stQueueSizeWhenDesiredCounter_.
00239 { 00240 // we're not interested in events if we haven't yet been initialized 00241 // or are no longer active 00242 if (! initializationDone) {return false;} 00243 if (! this->isActive()) {return false;} 00244 00245 // the event must be from the correct HLT output module 00246 if (! this->isProxyServer() && 00247 eventView.outModId() != hltOutputModuleId_) {return false;} 00248 00249 // get trigger bits for this event and check using eventSelector_ 00250 std::vector<unsigned char> hlt_out; 00251 hlt_out.resize(1 + (eventView.hltCount()-1)/4); 00252 eventView.hltTriggerBits(&hlt_out[0]); 00253 int num_paths = eventView.hltCount(); 00254 bool rc = (eventSelector_->wantAll() || eventSelector_->acceptEvent(&hlt_out[0], num_paths)); 00255 00256 // if we want this event, add it to our statistics for "desired" 00257 // or "acceptable" events. 00258 if (rc) { 00259 double now = BaseCounter::getCurrentTime(); 00260 double sizeInMB = static_cast<double>(eventView.size()) / 1048576.0; 00261 ltQueueSizeWhenDesiredCounter_->addSample(eventQueue_.size()); 00262 stQueueSizeWhenDesiredCounter_->addSample(eventQueue_.size(), now); 00263 longTermDesiredCounter_->addSample(sizeInMB); 00264 shortTermDesiredCounter_->addSample(sizeInMB, now); 00265 } 00266 return rc; 00267 }
void ConsumerPipe::wasConsidered | ( | double | currentTime = BaseCounter::getCurrentTime() |
) |
Tells the ConsumerPipe that an event was considered or accepted or queued.
This method may be used in conjunction with the putEvent() method, but it may be used independently if external prescaling is done. Basically, this method is used to tell the ConsumerPipe instance that it should update its internal state for keeping track of whether it is ready for another event.
This extra work is needed to support our fair-share event serving model. With fair-event-serving, we need a way for external entities to get a realistic event rate for a consumer by calling the isReadyForEvent() method independent of whether events actually end up being queued.
Definition at line 283 of file ConsumerPipe.cc.
References lastConsideredEventTime_, and rateRequestCounter_.
00284 { 00285 lastConsideredEventTime_ = currentTime; 00286 rateRequestCounter_->addSample(1.0, currentTime); 00287 }
uint32 stor::ConsumerPipe::consumerId_ [private] |
Definition at line 115 of file ConsumerPipe.h.
Referenced by ConsumerPipe(), getConsumerId(), initializeSelection(), and ~ConsumerPipe().
std::string stor::ConsumerPipe::consumerName_ [private] |
Definition at line 116 of file ConsumerPipe.h.
Referenced by ConsumerPipe(), getConsumerName(), and pushEvent().
std::string stor::ConsumerPipe::consumerPriority_ [private] |
std::deque< boost::shared_ptr< std::vector<char> > > stor::ConsumerPipe::eventQueue_ [private] |
Definition at line 150 of file ConsumerPipe.h.
Referenced by clearQueue(), getEvent(), pushEvent(), putEvent(), and wantsEvent().
Definition at line 154 of file ConsumerPipe.h.
Referenced by clearQueue(), getEvent(), pushEvent(), and putEvent().
int stor::ConsumerPipe::events_ [private] |
Definition at line 118 of file ConsumerPipe.h.
Referenced by getEvent(), getEvents(), and putEvent().
boost::shared_ptr<edm::EventSelector> stor::ConsumerPipe::eventSelector_ [private] |
Definition at line 130 of file ConsumerPipe.h.
Referenced by initializeSelection(), and wantsEvent().
CURL* stor::ConsumerPipe::han_ [private] |
Definition at line 112 of file ConsumerPipe.h.
Referenced by ConsumerPipe(), pushEvent(), and ~ConsumerPipe().
struct curl_slist* stor::ConsumerPipe::headers_ [read, private] |
Definition at line 113 of file ConsumerPipe.h.
Referenced by ConsumerPipe(), pushEvent(), and ~ConsumerPipe().
uint32 stor::ConsumerPipe::hltOutputModuleId_ [private] |
Definition at line 122 of file ConsumerPipe.h.
Referenced by initializeSelection(), and wantsEvent().
std::string stor::ConsumerPipe::hltOutputSelection_ [private] |
std::string stor::ConsumerPipe::hostName_ [private] |
bool stor::ConsumerPipe::initializationDone [private] |
Definition at line 138 of file ConsumerPipe.h.
Referenced by ConsumerPipe(), initializeSelection(), isReadyForEvent(), and wantsEvent().
double stor::ConsumerPipe::lastConsideredEventTime_ [private] |
Definition at line 125 of file ConsumerPipe.h.
Referenced by ConsumerPipe(), isReadyForEvent(), and wasConsidered().
time_t stor::ConsumerPipe::lastEventRequestTime_ [private] |
Definition at line 135 of file ConsumerPipe.h.
Referenced by ConsumerPipe(), getEvent(), getLastEventRequestTime(), isActive(), isDisconnected(), isIdle(), and putEvent().
boost::shared_ptr<ForeverCounter> stor::ConsumerPipe::longTermDesiredCounter_ [private] |
Definition at line 161 of file ConsumerPipe.h.
Referenced by ConsumerPipe(), getDataRate(), getDuration(), getEventCount(), getEventRate(), and wantsEvent().
boost::shared_ptr<ForeverCounter> stor::ConsumerPipe::longTermQueuedCounter_ [private] |
Definition at line 163 of file ConsumerPipe.h.
Referenced by ConsumerPipe(), getDataRate(), getDuration(), getEventCount(), getEventRate(), and putEvent().
boost::shared_ptr<ForeverCounter> stor::ConsumerPipe::longTermServedCounter_ [private] |
Definition at line 165 of file ConsumerPipe.h.
Referenced by ConsumerPipe(), getDataRate(), getDuration(), getEvent(), getEventCount(), getEventRate(), and putEvent().
boost::shared_ptr<ForeverCounter> stor::ConsumerPipe::ltQueueSizeWhenDesiredCounter_ [private] |
Definition at line 168 of file ConsumerPipe.h.
Referenced by ConsumerPipe(), getAverageQueueSize(), and wantsEvent().
boost::shared_ptr<ForeverCounter> stor::ConsumerPipe::ltQueueSizeWhenQueuedCounter_ [private] |
Definition at line 170 of file ConsumerPipe.h.
Referenced by ConsumerPipe(), getAverageQueueSize(), and putEvent().
const double ConsumerPipe::MAX_ACCEPT_INTERVAL = 86400.0 [static] |
Initialize the maximum accept interval.
Definition at line 56 of file ConsumerPipe.h.
Referenced by ConsumerPipe().
unsigned int stor::ConsumerPipe::maxQueueSize_ [private] |
double stor::ConsumerPipe::minTimeBetweenEvents_ [private] |
unsigned int stor::ConsumerPipe::pushEventFailures_ [private] |
Definition at line 143 of file ConsumerPipe.h.
Referenced by getPushEventFailures(), and putEvent().
bool stor::ConsumerPipe::pushMode_ [private] |
Definition at line 141 of file ConsumerPipe.h.
Referenced by ConsumerPipe(), putEvent(), and setPushMode().
double stor::ConsumerPipe::rateRequest_ [private] |
Definition at line 120 of file ConsumerPipe.h.
Referenced by ConsumerPipe(), getRateRequest(), and isReadyForEvent().
boost::shared_ptr<RollingSampleCounter> stor::ConsumerPipe::rateRequestCounter_ [private] |
Definition at line 123 of file ConsumerPipe.h.
Referenced by ConsumerPipe(), isReadyForEvent(), and wasConsidered().
std::vector<char> stor::ConsumerPipe::registryWarningMessage_ [private] |
Definition at line 147 of file ConsumerPipe.h.
Referenced by getRegistryWarning(), and setRegistryWarning().
Definition at line 146 of file ConsumerPipe.h.
Referenced by clearRegistryWarning(), ConsumerPipe(), hasRegistryWarning(), and setRegistryWarning().
uint32 ConsumerPipe::rootId_ = 1 [static, private] |
Initialize the static value for the root consumer id.
Definition at line 157 of file ConsumerPipe.h.
Referenced by ConsumerPipe().
boost::mutex ConsumerPipe::rootIdLock_ [static, private] |
Initialize the static lock used to control access to the root ID.
Definition at line 158 of file ConsumerPipe.h.
Referenced by ConsumerPipe().
boost::shared_ptr<RollingIntervalCounter> stor::ConsumerPipe::shortTermDesiredCounter_ [private] |
Definition at line 162 of file ConsumerPipe.h.
Referenced by ConsumerPipe(), getDataRate(), getDuration(), getEventCount(), getEventRate(), and wantsEvent().
boost::shared_ptr<RollingIntervalCounter> stor::ConsumerPipe::shortTermQueuedCounter_ [private] |
Definition at line 164 of file ConsumerPipe.h.
Referenced by ConsumerPipe(), getDataRate(), getDuration(), getEventCount(), getEventRate(), and putEvent().
boost::shared_ptr<RollingIntervalCounter> stor::ConsumerPipe::shortTermServedCounter_ [private] |
Definition at line 166 of file ConsumerPipe.h.
Referenced by ConsumerPipe(), getDataRate(), getDuration(), getEvent(), getEventCount(), getEventRate(), and putEvent().
boost::shared_ptr<RollingIntervalCounter> stor::ConsumerPipe::stQueueSizeWhenDesiredCounter_ [private] |
Definition at line 169 of file ConsumerPipe.h.
Referenced by ConsumerPipe(), getAverageQueueSize(), and wantsEvent().
boost::shared_ptr<RollingIntervalCounter> stor::ConsumerPipe::stQueueSizeWhenQueuedCounter_ [private] |
Definition at line 171 of file ConsumerPipe.h.
Referenced by ConsumerPipe(), getAverageQueueSize(), and putEvent().
Definition at line 134 of file ConsumerPipe.h.
Referenced by ConsumerPipe(), isDisconnected(), and isIdle().
int stor::ConsumerPipe::timeToIdleState_ [private] |
Definition at line 133 of file ConsumerPipe.h.
Referenced by ConsumerPipe(), isActive(), and isIdle().
Strings stor::ConsumerPipe::triggerSelection_ [private] |
Definition at line 119 of file ConsumerPipe.h.
Referenced by getTriggerRequest(), getTriggerSelection(), and initializeSelection().