#include <QueueCollection.h>
Class template QueueCollection provides a collection of ConcurrentQueue<T>.
The class T must implement a method getEventConsumerTags() const, returning a std::vector<QueueID> which gives the list of QueueIDs of queues the class should be added.
Definition at line 44 of file QueueCollection.h.
typedef std::vector<ExpirableDiscardNewQueuePtr> stor::QueueCollection< T >::DiscardNewQueues_t [private] |
Definition at line 170 of file QueueCollection.h.
typedef std::vector<ExpirableDiscardOldQueuePtr> stor::QueueCollection< T >::DiscardOldQueues_t [private] |
Definition at line 172 of file QueueCollection.h.
typedef ExpirableQueue<T, RejectNewest<T> > stor::QueueCollection< T >::ExpirableDiscardNewQueue_t [private] |
Definition at line 151 of file QueueCollection.h.
typedef boost::shared_ptr<ExpirableDiscardNewQueue_t> stor::QueueCollection< T >::ExpirableDiscardNewQueuePtr [private] |
Definition at line 155 of file QueueCollection.h.
typedef ExpirableQueue<T, KeepNewest<T> > stor::QueueCollection< T >::ExpirableDiscardOldQueue_t [private] |
Definition at line 152 of file QueueCollection.h.
typedef boost::shared_ptr<ExpirableDiscardOldQueue_t> stor::QueueCollection< T >::ExpirableDiscardOldQueuePtr [private] |
Definition at line 157 of file QueueCollection.h.
typedef std::map<ConsumerID, QueueID> stor::QueueCollection< T >::IDLookup_t [private] |
Definition at line 175 of file QueueCollection.h.
typedef boost::mutex::scoped_lock stor::QueueCollection< T >::ReadLock_t [private] |
Definition at line 160 of file QueueCollection.h.
typedef boost::mutex stor::QueueCollection< T >::ReadWriteMutex_t [private] |
Definition at line 162 of file QueueCollection.h.
typedef std::map<EventConsRegPtr, QueueID, utils::ptrComp<EventConsumerRegistrationInfo> > stor::QueueCollection< T >::ReginfoLookup_t [private] |
Definition at line 179 of file QueueCollection.h.
typedef ExpirableQueue<T, RejectNewest<T> >::SizeType stor::QueueCollection< T >::SizeType |
Definition at line 47 of file QueueCollection.h.
typedef ExpirableQueue<T, RejectNewest<T> >::ValueType stor::QueueCollection< T >::ValueType |
Definition at line 48 of file QueueCollection.h.
typedef boost::mutex::scoped_lock stor::QueueCollection< T >::WriteLock_t [private] |
Definition at line 161 of file QueueCollection.h.
stor::QueueCollection< T >::QueueCollection | ( | ConsumerMonitorCollection & | ccp | ) |
A default-constructed QueueCollection contains no queues
Definition at line 221 of file QueueCollection.h.
: protectDiscardNewQueues_(), protectDiscardOldQueues_(), protectLookup_(), discardNewQueues_(), discardOldQueues_(), queueIdLookup_(), consumerMonitorCollection_( ccp ) { }
stor::QueueCollection< T >::QueueCollection | ( | QueueCollection< T > const & | ) | [private] |
void stor::QueueCollection< T >::addEvent | ( | T const & | event | ) |
Add an event to all queues matching the specifications.
Definition at line 445 of file QueueCollection.h.
References stor::utils::getCurrentTime().
{ ReadLock_t lockDiscardNew(protectDiscardNewQueues_); ReadLock_t lockDiscardOld(protectDiscardOldQueues_); utils::TimePoint_t now = utils::getCurrentTime(); QueueIDs routes = event.getEventConsumerTags(); for( QueueIDs::const_iterator it = routes.begin(), itEnd = routes.end(); it != itEnd; ++it ) { const SizeType droppedEvents = enqueueEvent_( *it, event, now ); if ( droppedEvents > 0 ) consumerMonitorCollection_.addDroppedEvents( *it, droppedEvents ); else consumerMonitorCollection_.addQueuedEventSample( *it, event.totalDataSize() ); } }
bool stor::QueueCollection< T >::allQueuesStale | ( | const utils::TimePoint_t & | now | ) | const |
Returns true if all queues are stale at the given time.
Definition at line 777 of file QueueCollection.h.
References i, and CommonMethods::lock().
{ { ReadLock_t lock(protectDiscardNewQueues_); const SizeType numQueues = discardNewQueues_.size(); for (SizeType i = 0; i < numQueues; ++i) { if ( ! discardNewQueues_[i]->stale(now) ) return false; } } { ReadLock_t lock(protectDiscardOldQueues_); const SizeType numQueues = discardOldQueues_.size(); for (SizeType i = 0; i < numQueues; ++i) { if ( ! discardOldQueues_[i]->stale(now) ) return false; } } return true; }
void stor::QueueCollection< T >::clearQueue | ( | const QueueID & | id | ) |
Clear the queue with the given QueueID.
Definition at line 529 of file QueueCollection.h.
References stor::enquing_policy::DiscardNew, stor::enquing_policy::DiscardOld, getHLTprescales::index, CommonMethods::lock(), and evf::utils::policy.
{ switch (id.policy()) { case enquing_policy::DiscardNew: { ReadLock_t lock(protectDiscardNewQueues_); try { const SizeType clearedEvents = discardNewQueues_.at(id.index())->clear(); consumerMonitorCollection_.addDroppedEvents( id, clearedEvents); } catch(std::out_of_range) { throwUnknownQueueid(id); } break; } case enquing_policy::DiscardOld: { ReadLock_t lock(protectDiscardOldQueues_); try { const SizeType clearedEvents = discardOldQueues_.at(id.index())->clear(); consumerMonitorCollection_.addDroppedEvents( id, clearedEvents); } catch(std::out_of_range) { throwUnknownQueueid(id); } break; } default: { throwUnknownQueueid(id); // does not return, no break needed } } }
void stor::QueueCollection< T >::clearQueues | ( | ) |
Clear all the contained queues.
Definition at line 617 of file QueueCollection.h.
References stor::enquing_policy::DiscardNew, stor::enquing_policy::DiscardOld, i, and CommonMethods::lock().
{ { ReadLock_t lock(protectDiscardNewQueues_); const SizeType numQueues = discardNewQueues_.size(); for (SizeType i = 0; i < numQueues; ++i) { const SizeType clearedEvents = discardNewQueues_[i]->clear(); consumerMonitorCollection_.addDroppedEvents( QueueID(enquing_policy::DiscardNew, i), clearedEvents ); } } { ReadLock_t lock(protectDiscardOldQueues_); const SizeType numQueues = discardOldQueues_.size(); for (SizeType i = 0; i < numQueues; ++i) { const SizeType clearedEvents = discardOldQueues_[i]->clear(); consumerMonitorCollection_.addDroppedEvents( QueueID(enquing_policy::DiscardOld, i), clearedEvents ); } } }
bool stor::QueueCollection< T >::clearStaleQueues | ( | const utils::TimePoint_t & | now | ) |
Clear all queues which are stale at the specified point in time.
Definition at line 577 of file QueueCollection.h.
References stor::enquing_policy::DiscardNew, stor::enquing_policy::DiscardOld, i, CommonMethods::lock(), and query::result.
{ bool result(false); SizeType clearedEvents; { ReadLock_t lock(protectDiscardNewQueues_); const SizeType numQueues = discardNewQueues_.size(); for (SizeType i = 0; i < numQueues; ++i) { if ( discardNewQueues_[i]->clearIfStale(now, clearedEvents) ) { consumerMonitorCollection_.addDroppedEvents( QueueID(enquing_policy::DiscardNew, i), clearedEvents ); result = true; } } } { ReadLock_t lock(protectDiscardOldQueues_); const SizeType numQueues = discardOldQueues_.size(); for (SizeType i = 0; i < numQueues; ++i) { if ( discardOldQueues_[i]->clearIfStale(now, clearedEvents) ) { consumerMonitorCollection_.addDroppedEvents( QueueID(enquing_policy::DiscardOld, i), clearedEvents ); result = true; } } } return result; }
QueueID stor::QueueCollection< T >::createQueue | ( | const RegPtr | reginfo, |
const utils::TimePoint_t & | now = utils::getCurrentTime() |
||
) |
Definition at line 356 of file QueueCollection.h.
References stor::ConsumerID::isValid().
{ QueueID qid; const ConsumerID& cid = reginfo->consumerId(); // We don't proceed if the given ConsumerID is invalid, or if // we've already seen that value before. if (!cid.isValid()) return qid; WriteLock_t lockLookup(protectLookup_); if (queueIdLookup_.find(cid) != queueIdLookup_.end()) return qid; qid = getQueue(reginfo, now); queueIdLookup_[cid] = qid; return qid; }
QueueID stor::QueueCollection< T >::createQueue | ( | const EventConsRegPtr | reginfo, |
const utils::TimePoint_t & | now = utils::getCurrentTime() |
||
) |
Create a new contained queue, with the given policy and given maximum size. It returns a unique identifier to later identify requests originating from this consumer.
Definition at line 319 of file QueueCollection.h.
References stor::ConsumerID::isValid().
{ QueueID qid; const ConsumerID& cid = reginfo->consumerId(); // We don't proceed if the given ConsumerID is invalid, or if // we've already seen that value before. if (!cid.isValid()) return qid; WriteLock_t lockLookup(protectLookup_); if (queueIdLookup_.find(cid) != queueIdLookup_.end()) return qid; if ( reginfo->uniqueEvents() ) { // another consumer wants to share the // queue to get unique events. ReginfoLookup_t::const_iterator it = queueReginfoLookup_.find(reginfo); if ( it != queueReginfoLookup_.end() ) { qid = it->second; queueIdLookup_[cid] = qid; return qid; } } qid = getQueue(reginfo, now); queueIdLookup_[cid] = qid; queueReginfoLookup_[reginfo] = qid; return qid; }
bool stor::QueueCollection< T >::empty | ( | const QueueID & | id | ) | const |
Test to see if the queue with the given QueueID is empty.
Definition at line 651 of file QueueCollection.h.
References stor::enquing_policy::DiscardNew, stor::enquing_policy::DiscardOld, getHLTprescales::index, CommonMethods::lock(), evf::utils::policy, and query::result.
{ bool result(true); switch (id.policy()) { case enquing_policy::DiscardNew: { ReadLock_t lock(protectDiscardNewQueues_); try { result = discardNewQueues_.at(id.index())->empty(); } catch(std::out_of_range) { throwUnknownQueueid(id); } break; } case enquing_policy::DiscardOld: { ReadLock_t lock(protectDiscardOldQueues_); try { result = discardOldQueues_.at(id.index())->empty(); } catch(std::out_of_range) { throwUnknownQueueid(id); } break; } default: { throwUnknownQueueid(id); // does not return, no break needed } } return result; }
QueueCollection< T >::SizeType stor::QueueCollection< T >::enqueueEvent_ | ( | QueueID const & | id, |
T const & | event, | ||
utils::TimePoint_t const & | now | ||
) | [private] |
Definition at line 843 of file QueueCollection.h.
References stor::enquing_policy::DiscardNew, stor::enquing_policy::DiscardOld, getHLTprescales::index, and evf::utils::policy.
{ switch (id.policy()) { case enquing_policy::DiscardNew: { try { return discardNewQueues_.at(id.index())->enqNowait(event,now); } catch(std::out_of_range) { throwUnknownQueueid(id); } break; } case enquing_policy::DiscardOld: { try { return discardOldQueues_.at(id.index())->enqNowait(event,now); } catch(std::out_of_range) { throwUnknownQueueid(id); } break; } default: { throwUnknownQueueid(id); // does not return, no break needed } } return 1; // event could not be entered }
bool stor::QueueCollection< T >::full | ( | const QueueID & | id | ) | const |
Test to see if the queue with the given QueueID is full.
Definition at line 693 of file QueueCollection.h.
References stor::enquing_policy::DiscardNew, stor::enquing_policy::DiscardOld, getHLTprescales::index, CommonMethods::lock(), evf::utils::policy, and query::result.
{ bool result(true); switch (id.policy()) { case enquing_policy::DiscardNew: { ReadLock_t lock(protectDiscardNewQueues_); try { result = discardNewQueues_.at(id.index())->full(); } catch(std::out_of_range) { throwUnknownQueueid(id); } break; } case enquing_policy::DiscardOld: { ReadLock_t lock(protectDiscardOldQueues_); try { result = discardOldQueues_.at(id.index())->full(); } catch(std::out_of_range) { throwUnknownQueueid(id); } break; } default: { throwUnknownQueueid(id); // does not return, no break needed } } return result; }
utils::Duration_t stor::QueueCollection< T >::getExpirationInterval | ( | const QueueID & | id | ) | const |
Definition at line 276 of file QueueCollection.h.
References stor::enquing_policy::DiscardNew, stor::enquing_policy::DiscardOld, getHLTprescales::index, CommonMethods::lock(), evf::utils::policy, query::result, and seconds().
{ utils::Duration_t result = boost::posix_time::seconds(0); switch (id.policy()) { case enquing_policy::DiscardNew: { ReadLock_t lock(protectDiscardNewQueues_); try { result = discardNewQueues_.at(id.index())->stalenessInterval(); } catch(std::out_of_range) { throwUnknownQueueid(id); } break; } case enquing_policy::DiscardOld: { ReadLock_t lock(protectDiscardOldQueues_); try { result = discardOldQueues_.at(id.index())->stalenessInterval(); } catch(std::out_of_range) { throwUnknownQueueid(id); } break; } default: { throwUnknownQueueid(id); // does not return, no break needed } } return result; }
QueueID stor::QueueCollection< T >::getQueue | ( | const RegPtr | reginfo, |
const utils::TimePoint_t & | now | ||
) | [private] |
Definition at line 377 of file QueueCollection.h.
References stor::enquing_policy::DiscardNew, stor::enquing_policy::DiscardOld, CommonMethods::lock(), and cmsPerfSuiteHarvest::now.
{ if (reginfo->queuePolicy() == enquing_policy::DiscardNew) { WriteLock_t lock(protectDiscardNewQueues_); ExpirableDiscardNewQueuePtr newborn( new ExpirableDiscardNewQueue_t( reginfo->queueSize(), reginfo->secondsToStale(), now ) ); discardNewQueues_.push_back(newborn); return QueueID( enquing_policy::DiscardNew, discardNewQueues_.size()-1 ); } else if (reginfo->queuePolicy() == enquing_policy::DiscardOld) { WriteLock_t lock(protectDiscardOldQueues_); ExpirableDiscardOldQueuePtr newborn( new ExpirableDiscardOldQueue_t( reginfo->queueSize(), reginfo->secondsToStale(), now ) ); discardOldQueues_.push_back(newborn); return QueueID( enquing_policy::DiscardOld, discardOldQueues_.size()-1 ); } return QueueID(); }
QueueCollection& stor::QueueCollection< T >::operator= | ( | QueueCollection< T > const & | ) | [private] |
QueueCollection< T >::ValueType stor::QueueCollection< T >::popEvent | ( | const QueueID & | id | ) |
Remove and return an event from the queue for the consumer with the given id. If there is no event in that queue, an empty event is returned.
Definition at line 466 of file QueueCollection.h.
References stor::enquing_policy::DiscardNew, stor::enquing_policy::DiscardOld, CommonMethods::lock(), evf::utils::policy, and query::result.
{ ValueType result; switch (id.policy()) { case enquing_policy::DiscardNew: { ReadLock_t lock(protectDiscardNewQueues_); try { if ( discardNewQueues_.at(id.index())->deqNowait(result) ) consumerMonitorCollection_.addServedEventSample(id, result.first.totalDataSize()); } catch(std::out_of_range) { throwUnknownQueueid(id); } break; } case enquing_policy::DiscardOld: { ReadLock_t lock(protectDiscardOldQueues_); try { if ( discardOldQueues_.at(id.index())->deqNowait(result) ) consumerMonitorCollection_.addServedEventSample(id, result.first.totalDataSize()); } catch(std::out_of_range) { throwUnknownQueueid(id); } break; } default: { throwUnknownQueueid(id); // does not return, no break needed } } return result; }
QueueCollection< T >::ValueType stor::QueueCollection< T >::popEvent | ( | const ConsumerID & | cid | ) |
Remove and return an event from the queue for the consumer with the given ConsumerID. If there is no event in that queue, an empty event is returned.
Definition at line 512 of file QueueCollection.h.
References i, stor::ConsumerID::isValid(), CommonMethods::lock(), and query::result.
{ ValueType result; if (!cid.isValid()) return result; QueueID id; { // Scope to control lifetime of lock. ReadLock_t lock(protectLookup_); IDLookup_t::const_iterator i = queueIdLookup_.find(cid); if (i == queueIdLookup_.end()) return result; id = i->second; } return popEvent(id); }
void stor::QueueCollection< T >::removeQueues | ( | ) |
Remove all contained queues. Note that this has the effect of clearing all the queues as well.
Definition at line 419 of file QueueCollection.h.
{ clearQueues(); WriteLock_t lockDiscardNew(protectDiscardNewQueues_); WriteLock_t lockDiscardOld(protectDiscardOldQueues_); discardNewQueues_.clear(); discardOldQueues_.clear(); WriteLock_t lockLookup(protectLookup_); queueIdLookup_.clear(); }
void stor::QueueCollection< T >::setExpirationInterval | ( | const QueueID & | id, |
const utils::Duration_t & | interval | ||
) |
Set or get the time in seconds that the queue with the given id can be unused (by a consumer) before becoming stale.
Definition at line 233 of file QueueCollection.h.
References stor::enquing_policy::DiscardNew, stor::enquing_policy::DiscardOld, getHLTprescales::index, CommonMethods::lock(), and evf::utils::policy.
{ switch (id.policy()) { case enquing_policy::DiscardNew: { ReadLock_t lock(protectDiscardNewQueues_); try { discardNewQueues_.at(id.index())->setStalenessInterval(interval); } catch(std::out_of_range) { throwUnknownQueueid(id); } break; } case enquing_policy::DiscardOld: { ReadLock_t lock(protectDiscardOldQueues_); try { discardOldQueues_.at(id.index())->setStalenessInterval(interval); } catch(std::out_of_range) { throwUnknownQueueid(id); } break; } default: { throwUnknownQueueid(id); // does not return, no break needed } } }
QueueCollection< T >::SizeType stor::QueueCollection< T >::size | ( | const QueueID & | id | ) | const |
Get number of elements in queue
Definition at line 800 of file QueueCollection.h.
References stor::enquing_policy::DiscardNew, stor::enquing_policy::DiscardOld, getHLTprescales::index, CommonMethods::lock(), evf::utils::policy, and query::result.
{ SizeType result = 0; switch (id.policy()) { case enquing_policy::DiscardNew: { ReadLock_t lock(protectDiscardNewQueues_); try { result = discardNewQueues_.at(id.index())->size(); } catch(std::out_of_range) { throwUnknownQueueid(id); } break; } case enquing_policy::DiscardOld: { ReadLock_t lock(protectDiscardOldQueues_); try { result = discardOldQueues_.at(id.index())->size(); } catch(std::out_of_range) { throwUnknownQueueid(id); } break; } default: { throwUnknownQueueid( id ); // does not return, no break needed } } return result; }
QueueCollection< T >::SizeType stor::QueueCollection< T >::size | ( | void | ) | const |
Return the number of queues in the collection.
Definition at line 434 of file QueueCollection.h.
{ // We obtain locks not because it is unsafe to read the sizes // without locking, but because we want consistent values. ReadLock_t lockDiscardNew(protectDiscardNewQueues_); ReadLock_t lockDiscardOld(protectDiscardOldQueues_); return discardNewQueues_.size() + discardOldQueues_.size(); }
bool stor::QueueCollection< T >::stale | ( | const QueueID & | id, |
const utils::TimePoint_t & | now | ||
) | const |
Test to see if the queue with the given QueueID is stale at the given time.
Definition at line 735 of file QueueCollection.h.
References stor::enquing_policy::DiscardNew, stor::enquing_policy::DiscardOld, getHLTprescales::index, CommonMethods::lock(), evf::utils::policy, and query::result.
{ bool result(true); switch (id.policy()) { case enquing_policy::DiscardNew: { ReadLock_t lock(protectDiscardNewQueues_); try { result = discardNewQueues_.at(id.index())->stale(now); } catch(std::out_of_range) { throwUnknownQueueid(id); } break; } case enquing_policy::DiscardOld: { ReadLock_t lock(protectDiscardOldQueues_); try { result = discardOldQueues_.at(id.index())->stale(now); } catch(std::out_of_range) { throwUnknownQueueid(id); } break; } default: { throwUnknownQueueid(id); // does not return, no break needed } } return result; }
ConsumerMonitorCollection& stor::QueueCollection< T >::consumerMonitorCollection_ [private] |
Definition at line 181 of file QueueCollection.h.
DiscardNewQueues_t stor::QueueCollection< T >::discardNewQueues_ [private] |
Definition at line 171 of file QueueCollection.h.
DiscardOldQueues_t stor::QueueCollection< T >::discardOldQueues_ [private] |
Definition at line 173 of file QueueCollection.h.
ReadWriteMutex_t stor::QueueCollection< T >::protectDiscardNewQueues_ [mutable, private] |
Definition at line 166 of file QueueCollection.h.
ReadWriteMutex_t stor::QueueCollection< T >::protectDiscardOldQueues_ [mutable, private] |
Definition at line 167 of file QueueCollection.h.
ReadWriteMutex_t stor::QueueCollection< T >::protectLookup_ [mutable, private] |
Definition at line 168 of file QueueCollection.h.
IDLookup_t stor::QueueCollection< T >::queueIdLookup_ [private] |
Definition at line 176 of file QueueCollection.h.
ReginfoLookup_t stor::QueueCollection< T >::queueReginfoLookup_ [private] |
Definition at line 180 of file QueueCollection.h.