4 #ifndef EventFilter_StorageManager_QueueCollection_h
5 #define EventFilter_StorageManager_QueueCollection_h
10 #include "boost/bind.hpp"
11 #include "boost/thread/mutex.hpp"
12 #include "boost/shared_ptr.hpp"
154 typedef boost::shared_ptr<ExpirableDiscardNewQueue_t>
156 typedef boost::shared_ptr<ExpirableDiscardOldQueue_t>
211 void throwUnknownQueueid(
const QueueID&
id)
213 std::ostringstream
msg;
214 msg <<
"Unable to retrieve queue with signature: ";
216 XCEPT_RAISE(exception::UnknownQueueId, msg.str());
222 protectDiscardNewQueues_(),
223 protectDiscardOldQueues_(),
228 consumerMonitorCollection_( ccp )
245 discardNewQueues_.at(
id.
index())->setStalenessInterval(interval);
247 catch(std::out_of_range)
249 throwUnknownQueueid(
id);
258 discardOldQueues_.at(
id.
index())->setStalenessInterval(interval);
260 catch(std::out_of_range)
262 throwUnknownQueueid(
id);
268 throwUnknownQueueid(
id);
286 result = discardNewQueues_.at(
id.
index())->stalenessInterval();
288 catch(std::out_of_range)
290 throwUnknownQueueid(
id);
299 result = discardOldQueues_.at(
id.
index())->stalenessInterval();
301 catch(std::out_of_range)
303 throwUnknownQueueid(
id);
309 throwUnknownQueueid(
id);
325 const ConsumerID& cid = reginfo->consumerId();
329 if (!cid.
isValid())
return qid;
331 if (queueIdLookup_.find(cid) != queueIdLookup_.end())
return qid;
333 if ( reginfo->uniqueEvents() )
337 ReginfoLookup_t::const_iterator it =
338 queueReginfoLookup_.find(reginfo);
339 if ( it != queueReginfoLookup_.end() )
342 queueIdLookup_[cid] = qid;
347 qid = getQueue(reginfo, now);
348 queueIdLookup_[cid] = qid;
349 queueReginfoLookup_[reginfo] = qid;
362 const ConsumerID& cid = reginfo->consumerId();
366 if (!cid.
isValid())
return qid;
368 if (queueIdLookup_.find(cid) != queueIdLookup_.end())
return qid;
369 qid = getQueue(reginfo, now);
370 queueIdLookup_[cid] = qid;
387 reginfo->queueSize(),
388 reginfo->secondsToStale(),
392 discardNewQueues_.push_back(newborn);
395 discardNewQueues_.size()-1
403 reginfo->queueSize(),
404 reginfo->secondsToStale(),
408 discardOldQueues_.push_back(newborn);
411 discardOldQueues_.size()-1
423 WriteLock_t lockDiscardNew(protectDiscardNewQueues_);
424 WriteLock_t lockDiscardOld(protectDiscardOldQueues_);
425 discardNewQueues_.clear();
426 discardOldQueues_.clear();
429 queueIdLookup_.clear();
438 ReadLock_t lockDiscardNew(protectDiscardNewQueues_);
439 ReadLock_t lockDiscardOld(protectDiscardOldQueues_);
440 return discardNewQueues_.size() + discardOldQueues_.size();
447 ReadLock_t lockDiscardNew(protectDiscardNewQueues_);
448 ReadLock_t lockDiscardOld(protectDiscardOldQueues_);
451 QueueIDs routes =
event.getEventConsumerTags();
453 for( QueueIDs::const_iterator it = routes.begin(), itEnd = routes.end();
456 const SizeType droppedEvents = enqueueEvent_( *it, event, now );
457 if ( droppedEvents > 0 )
458 consumerMonitorCollection_.addDroppedEvents( *it, droppedEvents );
460 consumerMonitorCollection_.addQueuedEventSample( *it, event.totalDataSize() );
476 if ( discardNewQueues_.at(
id.index())->deqNowait(result) )
477 consumerMonitorCollection_.addServedEventSample(
id,
478 result.first.totalDataSize());
480 catch(std::out_of_range)
482 throwUnknownQueueid(
id);
491 if ( discardOldQueues_.at(
id.index())->deqNowait(result) )
492 consumerMonitorCollection_.addServedEventSample(
id,
493 result.first.totalDataSize());
495 catch(std::out_of_range)
497 throwUnknownQueueid(
id);
503 throwUnknownQueueid(
id);
515 if (!cid.
isValid())
return result;
520 IDLookup_t::const_iterator
i = queueIdLookup_.find(cid);
521 if (i == queueIdLookup_.end())
return result;
539 discardNewQueues_.at(
id.
index())->clear();
541 consumerMonitorCollection_.addDroppedEvents(
544 catch(std::out_of_range)
546 throwUnknownQueueid(
id);
556 discardOldQueues_.at(
id.
index())->clear();
558 consumerMonitorCollection_.addDroppedEvents(
561 catch(std::out_of_range)
563 throwUnknownQueueid(
id);
569 throwUnknownQueueid(
id);
584 const SizeType numQueues = discardNewQueues_.size();
587 if ( discardNewQueues_[
i]->clearIfStale(now, clearedEvents) )
589 consumerMonitorCollection_.addDroppedEvents(
599 const SizeType numQueues = discardOldQueues_.size();
602 if ( discardOldQueues_[
i]->clearIfStale(now, clearedEvents) )
604 consumerMonitorCollection_.addDroppedEvents(
621 const SizeType numQueues = discardNewQueues_.size();
625 discardNewQueues_[
i]->clear();
627 consumerMonitorCollection_.addDroppedEvents(
635 const SizeType numQueues = discardOldQueues_.size();
639 discardOldQueues_[
i]->clear();
641 consumerMonitorCollection_.addDroppedEvents(
661 result = discardNewQueues_.at(
id.
index())->empty();
663 catch(std::out_of_range)
665 throwUnknownQueueid(
id);
674 result = discardOldQueues_.at(
id.
index())->empty();
676 catch(std::out_of_range)
678 throwUnknownQueueid(
id);
684 throwUnknownQueueid(
id);
703 result = discardNewQueues_.at(
id.
index())->full();
705 catch(std::out_of_range)
707 throwUnknownQueueid(
id);
716 result = discardOldQueues_.at(
id.
index())->full();
718 catch(std::out_of_range)
720 throwUnknownQueueid(
id);
726 throwUnknownQueueid(
id);
745 result = discardNewQueues_.at(
id.
index())->stale(now);
747 catch(std::out_of_range)
749 throwUnknownQueueid(
id);
758 result = discardOldQueues_.at(
id.
index())->stale(now);
760 catch(std::out_of_range)
762 throwUnknownQueueid(
id);
768 throwUnknownQueueid(
id);
781 const SizeType numQueues = discardNewQueues_.size();
784 if ( ! discardNewQueues_[
i]->stale(now) )
return false;
789 const SizeType numQueues = discardOldQueues_.size();
792 if ( ! discardOldQueues_[
i]->stale(now) )
return false;
810 result = discardNewQueues_.at(
id.
index())->size();
812 catch(std::out_of_range)
814 throwUnknownQueueid(
id);
823 result = discardOldQueues_.at(
id.
index())->size();
825 catch(std::out_of_range)
827 throwUnknownQueueid(
id);
833 throwUnknownQueueid(
id );
855 return discardNewQueues_.at(
id.
index())->enqNowait(event,now);
857 catch(std::out_of_range)
859 throwUnknownQueueid(
id);
867 return discardOldQueues_.at(
id.
index())->enqNowait(event,now);
869 catch(std::out_of_range)
871 throwUnknownQueueid(
id);
877 throwUnknownQueueid(
id);
886 #endif // EventFilter_StorageManager_QueueCollection_h
TimePoint_t getCurrentTime()
DiscardOldQueues_t discardOldQueues_
DiscardNewQueues_t discardNewQueues_
utils::Duration_t getExpirationInterval(const QueueID &id) const
ReadWriteMutex_t protectDiscardOldQueues_
void clearQueue(const QueueID &)
boost::shared_ptr< RegistrationInfoBase > RegPtr
bool full(const QueueID &) const
ReadWriteMutex_t protectLookup_
static boost::mutex mutex
std::vector< QueueID > QueueIDs
SizeType enqueueEvent_(QueueID const &, T const &, utils::TimePoint_t const &)
std::vector< ExpirableDiscardNewQueuePtr > DiscardNewQueues_t
void setExpirationInterval(const QueueID &, const utils::Duration_t &)
boost::shared_ptr< stor::EventConsumerRegistrationInfo > EventConsRegPtr
IDLookup_t queueIdLookup_
std::map< EventConsRegPtr, QueueID, utils::ptrComp< EventConsumerRegistrationInfo > > ReginfoLookup_t
boost::posix_time::time_duration Duration_t
ExpirableQueue< T, RejectNewest< T > > ExpirableDiscardNewQueue_t
QueueCollection(ConsumerMonitorCollection &)
boost::posix_time::ptime TimePoint_t
ConsumerMonitorCollection & consumerMonitorCollection_
bool stale(const QueueID &, const utils::TimePoint_t &) const
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
boost::mutex::scoped_lock WriteLock_t
boost::mutex ReadWriteMutex_t
ValueType popEvent(const QueueID &)
ExpirableQueue< T, RejectNewest< T > >::SizeType SizeType
ReadWriteMutex_t protectDiscardNewQueues_
boost::shared_ptr< ExpirableDiscardOldQueue_t > ExpirableDiscardOldQueuePtr
bool clearStaleQueues(const utils::TimePoint_t &)
ExpirableQueue< T, KeepNewest< T > > ExpirableDiscardOldQueue_t
bool empty(const QueueID &) const
QueueID createQueue(const EventConsRegPtr, const utils::TimePoint_t &now=utils::getCurrentTime())
QueueCollection & operator=(QueueCollection const &)
boost::mutex::scoped_lock ReadLock_t
ExpirableQueue< T, RejectNewest< T > >::ValueType ValueType
ReginfoLookup_t queueReginfoLookup_
std::map< ConsumerID, QueueID > IDLookup_t
QueueID getQueue(const RegPtr, const utils::TimePoint_t &)
std::vector< ExpirableDiscardOldQueuePtr > DiscardOldQueues_t
bool allQueuesStale(const utils::TimePoint_t &) const
boost::shared_ptr< ExpirableDiscardNewQueue_t > ExpirableDiscardNewQueuePtr