#include <ConcurrentQueue.h>
Public Types | |
typedef EnqPolicy::SequenceType | SequenceType |
typedef SequenceType::size_type | SizeType |
typedef EnqPolicy::ValueType | ValueType |
Public Member Functions | |
void | addExternallyDroppedEvents (SizeType) |
SizeType | capacity () const |
SizeType | clear () |
ConcurrentQueue (SizeType maxSize=std::numeric_limits< SizeType >::max(), detail::MemoryType maxMemory=std::numeric_limits< detail::MemoryType >::max()) | |
bool | deqNowait (ValueType &) |
bool | deqTimedWait (ValueType &, boost::posix_time::time_duration const &) |
void | deqWait (ValueType &) |
bool | empty () const |
EnqPolicy::ReturnType | enqNowait (T const &item) |
bool | enqTimedWait (T const &p, boost::posix_time::time_duration const &) |
void | enqWait (T const &p) |
bool | full () const |
detail::MemoryType | memory () const |
bool | setCapacity (SizeType n) |
bool | setMemory (detail::MemoryType n) |
SizeType | size () const |
detail::MemoryType | used () const |
~ConcurrentQueue () | |
Private Types | |
typedef boost::mutex::scoped_lock | LockType |
Private Member Functions | |
void | assignItem (T &item, const T &element) |
void | assignItem (std::pair< T, size_t > &item, const T &element) |
ConcurrentQueue (ConcurrentQueue< T, EnqPolicy > const &) | |
bool | insertIfPossible (T const &item) |
bool | isFull () const |
ConcurrentQueue & | operator= (ConcurrentQueue< T, EnqPolicy > const &) |
void | removeHead (ValueType &item) |
bool | removeHeadIfPossible (ValueType &item) |
Private Attributes | |
SizeType | capacity_ |
SequenceType | elements_ |
size_t | elementsDropped_ |
detail::MemoryType | memory_ |
boost::mutex | protectElements_ |
boost::condition | queueNotEmpty_ |
boost::condition | queueNotFull_ |
SizeType | size_ |
detail::MemoryType | used_ |
ConcurrentQueue<T> class template declaration.
Definition at line 290 of file ConcurrentQueue.h.
typedef boost::mutex::scoped_lock stor::ConcurrentQueue< T, EnqPolicy >::LockType [private] |
Definition at line 434 of file ConcurrentQueue.h.
typedef EnqPolicy::SequenceType stor::ConcurrentQueue< T, EnqPolicy >::SequenceType |
Definition at line 294 of file ConcurrentQueue.h.
typedef SequenceType::size_type stor::ConcurrentQueue< T, EnqPolicy >::SizeType |
Definition at line 295 of file ConcurrentQueue.h.
typedef EnqPolicy::ValueType stor::ConcurrentQueue< T, EnqPolicy >::ValueType |
Definition at line 293 of file ConcurrentQueue.h.
stor::ConcurrentQueue< T, EnqPolicy >::ConcurrentQueue | ( | SizeType | maxSize = std::numeric_limits<SizeType>::max() , |
detail::MemoryType | maxMemory = std::numeric_limits<detail::MemoryType>::max() |
||
) | [explicit] |
ConcurrentQueue is always bounded. By default, the bound is absurdly large.
Definition at line 507 of file ConcurrentQueue.h.
: protectElements_(), elements_(), capacity_(maxSize), size_(0), memory_(maxMemory), used_(0), elementsDropped_(0) {}
stor::ConcurrentQueue< T, EnqPolicy >::~ConcurrentQueue | ( | ) |
Applications should arrange to make sure that the destructor of a ConcurrentQueue is not called while some other thread is using that queue. There is some protection against doing this, but it seems impossible to make sufficient protection.
Definition at line 521 of file ConcurrentQueue.h.
References CommonMethods::lock(), and edm::size_().
{ LockType lock(protectElements_); elements_.clear(); size_ = 0; used_ = 0; elementsDropped_ = 0; }
stor::ConcurrentQueue< T, EnqPolicy >::ConcurrentQueue | ( | ConcurrentQueue< T, EnqPolicy > const & | ) | [private] |
void stor::ConcurrentQueue< T, EnqPolicy >::addExternallyDroppedEvents | ( | SizeType | n | ) |
Adds the passed count to the counter of dropped events
Definition at line 682 of file ConcurrentQueue.h.
References CommonMethods::lock(), and n.
{ LockType lock(protectElements_); elementsDropped_ += n; }
void stor::ConcurrentQueue< T, EnqPolicy >::assignItem | ( | T & | item, |
const T & | element | ||
) | [private] |
Definition at line 736 of file ConcurrentQueue.h.
{ item = element; }
void stor::ConcurrentQueue< T, EnqPolicy >::assignItem | ( | std::pair< T, size_t > & | item, |
const T & | element | ||
) | [private] |
Definition at line 743 of file ConcurrentQueue.h.
{ item.first = element; item.second = elementsDropped_; elementsDropped_ = 0; }
ConcurrentQueue< T, EnqPolicy >::SizeType stor::ConcurrentQueue< T, EnqPolicy >::capacity | ( | ) | const |
Return the capacity of the queue, that is, the maximum number of items it can contain.
Definition at line 625 of file ConcurrentQueue.h.
{ // No lock is necessary: the read is atomic. return capacity_; }
ConcurrentQueue< T, EnqPolicy >::SizeType stor::ConcurrentQueue< T, EnqPolicy >::clear | ( | void | ) |
Remove all items from the queue. This changes the size to zero but does not change the capacity. Returns the number of cleared events.
Definition at line 669 of file ConcurrentQueue.h.
References CommonMethods::lock(), and edm::size_().
{ LockType lock(protectElements_); SizeType clearedEvents = size_; elementsDropped_ += size_; elements_.clear(); size_ = 0; used_ = 0; return clearedEvents; }
bool stor::ConcurrentQueue< T, EnqPolicy >::deqNowait | ( | ValueType & | item | ) |
Assign the value at the head of the queue to item and then remove the head of the queue. If successful, return true; on failure, return false. This function fill fail without waiting if the queue is empty. This function may throw any exception thrown by the assignment operator of type EnqPolicy::ValueType.
Definition at line 568 of file ConcurrentQueue.h.
References CommonMethods::lock().
{ LockType lock(protectElements_); return removeHeadIfPossible(item); }
bool stor::ConcurrentQueue< T, EnqPolicy >::deqTimedWait | ( | ValueType & | item, |
boost::posix_time::time_duration const & | waitTime | ||
) |
Assign the value at the head of the queue to item and then remove the head of the queue. If the queue is empty wait until is has become non-empty or until timeDuration has passed. Return true if an item has been removed from the queue or false if the timeout has expired. This may throw any exception thrown by the assignment operator of type EnqPolicy::ValueType.
Definition at line 586 of file ConcurrentQueue.h.
References CommonMethods::lock(), and edm::size_().
{ LockType lock(protectElements_); if (size_ == 0) { queueNotEmpty_.timed_wait(lock, waitTime); } return removeHeadIfPossible(item); }
void stor::ConcurrentQueue< T, EnqPolicy >::deqWait | ( | ValueType & | item | ) |
Assign the value of the head of the queue to item and then remove the head of the queue. If the queue is empty wait until is has become non-empty. This may throw any exception thrown by the assignment operator of type EnqPolicy::ValueType.
Definition at line 576 of file ConcurrentQueue.h.
References CommonMethods::lock(), and edm::size_().
{ LockType lock(protectElements_); while (size_ == 0) queueNotEmpty_.wait(lock); removeHead(item); }
bool stor::ConcurrentQueue< T, EnqPolicy >::empty | ( | void | ) | const |
Return true if the queue is empty, and false if it is not.
Definition at line 601 of file ConcurrentQueue.h.
References edm::size_().
{ // No lock is necessary: the read is atomic. return size_ == 0; }
EnqPolicy::ReturnType stor::ConcurrentQueue< T, EnqPolicy >::enqNowait | ( | T const & | item | ) |
Copying a ConcurrentQueue is illegal, as is asigning to a ConcurrentQueue. The copy constructor and copy assignment operator are both private and unimplemented. Add a copy if item to the queue, according to the rules determined by the EnqPolicy; see documentation above the the provided EnqPolicy choices. This may throw any exception thrown by the assignment operator of type T, or badAlloc.
Definition at line 532 of file ConcurrentQueue.h.
References CommonMethods::lock(), and edm::size_().
{ LockType lock(protectElements_); return EnqPolicy::doEnq (item, elements_, size_, capacity_, used_, memory_, elementsDropped_, queueNotEmpty_); }
bool stor::ConcurrentQueue< T, EnqPolicy >::enqTimedWait | ( | T const & | p, |
boost::posix_time::time_duration const & | waitTime | ||
) |
Add a copy of item to the queue. If the queue is full wait until it becomes non-full or until timeDuration has passed. Return true if the items has been put onto the queue or false if the timeout has expired. This may throw any exception thrown by the assignment operator of T, or badAlloc.
Definition at line 553 of file ConcurrentQueue.h.
References CommonMethods::lock().
{ LockType lock(protectElements_); if ( isFull() ) { queueNotFull_.timed_wait(lock, waitTime); } return insertIfPossible(item); }
void stor::ConcurrentQueue< T, EnqPolicy >::enqWait | ( | T const & | p | ) |
Add a copy of item to the queue. If the queue is full wait until it becomes non-full. This may throw any exception thrown by the assignment operator of type T, or badAlloc.
Definition at line 542 of file ConcurrentQueue.h.
References CommonMethods::lock(), stor::detail::memoryUsage(), and edm::size_().
{ LockType lock(protectElements_); while ( isFull() ) queueNotFull_.wait(lock); EnqPolicy::doInsert(item, elements_, size_, detail::memoryUsage(item), used_, queueNotEmpty_); }
bool stor::ConcurrentQueue< T, EnqPolicy >::full | ( | ) | const |
Return true if the queue is full, and false if it is not.
Definition at line 609 of file ConcurrentQueue.h.
References CommonMethods::lock().
{ LockType lock(protectElements_); return isFull(); }
bool stor::ConcurrentQueue< T, EnqPolicy >::insertIfPossible | ( | T const & | item | ) | [private] |
Definition at line 694 of file ConcurrentQueue.h.
References stor::detail::memoryUsage(), and edm::size_().
{ if ( isFull() ) { ++elementsDropped_; return false; } else { EnqPolicy::doInsert(item, elements_, size_, detail::memoryUsage(item), used_, queueNotEmpty_); return true; } }
bool stor::ConcurrentQueue< T, EnqPolicy >::isFull | ( | ) | const [private] |
Definition at line 752 of file ConcurrentQueue.h.
References edm::size_().
detail::MemoryType stor::ConcurrentQueue< T, EnqPolicy >::memory | ( | ) | const |
Return the memory of the queue in bytes, that is, the maximum memory the items in the queue may occupy
Definition at line 651 of file ConcurrentQueue.h.
{ // No lock is necessary: the read is atomic. return memory_; }
ConcurrentQueue& stor::ConcurrentQueue< T, EnqPolicy >::operator= | ( | ConcurrentQueue< T, EnqPolicy > const & | ) | [private] |
void stor::ConcurrentQueue< T, EnqPolicy >::removeHead | ( | ValueType & | item | ) | [private] |
Definition at line 721 of file ConcurrentQueue.h.
References stor::detail::memoryUsage(), and edm::size_().
{ SequenceType holder; // Move the item out of elements_ in a manner that will not throw. holder.splice(holder.begin(), elements_, elements_.begin()); // Record the change in the length of elements_. --size_; queueNotFull_.notify_one(); assignItem(item, holder.front()); used_ -= detail::memoryUsage( item ); }
bool stor::ConcurrentQueue< T, EnqPolicy >::removeHeadIfPossible | ( | ValueType & | item | ) | [private] |
Definition at line 711 of file ConcurrentQueue.h.
References edm::size_().
{ if (size_ == 0) return false; removeHead(item); return true; }
bool stor::ConcurrentQueue< T, EnqPolicy >::setCapacity | ( | SizeType | n | ) |
Reset the capacity of the queue. This can only be done if the queue is empty. This function returns false if the queue was not modified, and true if it was modified.
Definition at line 633 of file ConcurrentQueue.h.
References CommonMethods::lock(), and edm::size_().
{ LockType lock(protectElements_); bool isEmpty = (size_ == 0); if (isEmpty) capacity_ = newcapacity; return isEmpty; }
bool stor::ConcurrentQueue< T, EnqPolicy >::setMemory | ( | detail::MemoryType | n | ) |
Reset the memory usage in bytes of the queue. A value of 0 disabled the memory check. This can only be done if the queue is empty. This function returns false if the queue was not modified, and true if it was modified.
Definition at line 659 of file ConcurrentQueue.h.
References CommonMethods::lock(), and edm::size_().
{ LockType lock(protectElements_); bool isEmpty = (size_ == 0); if (isEmpty) memory_ = newmemory; return isEmpty; }
ConcurrentQueue< T, EnqPolicy >::SizeType stor::ConcurrentQueue< T, EnqPolicy >::size | ( | void | ) | const |
Return the size of the queue, that is, the number of items it contains.
Definition at line 617 of file ConcurrentQueue.h.
References edm::size_().
{ // No lock is necessary: the read is atomic. return size_; }
detail::MemoryType stor::ConcurrentQueue< T, EnqPolicy >::used | ( | ) | const |
Return the memory in bytes used by items in the queue
Definition at line 643 of file ConcurrentQueue.h.
{ // No lock is necessary: the read is atomic. return used_; }
SizeType stor::ConcurrentQueue< T, EnqPolicy >::capacity_ [private] |
Definition at line 441 of file ConcurrentQueue.h.
SequenceType stor::ConcurrentQueue< T, EnqPolicy >::elements_ [private] |
Definition at line 440 of file ConcurrentQueue.h.
size_t stor::ConcurrentQueue< T, EnqPolicy >::elementsDropped_ [private] |
Definition at line 452 of file ConcurrentQueue.h.
detail::MemoryType stor::ConcurrentQueue< T, EnqPolicy >::memory_ [private] |
Definition at line 450 of file ConcurrentQueue.h.
boost::mutex stor::ConcurrentQueue< T, EnqPolicy >::protectElements_ [mutable, private] |
Definition at line 436 of file ConcurrentQueue.h.
boost::condition stor::ConcurrentQueue< T, EnqPolicy >::queueNotEmpty_ [mutable, private] |
Definition at line 437 of file ConcurrentQueue.h.
boost::condition stor::ConcurrentQueue< T, EnqPolicy >::queueNotFull_ [mutable, private] |
Definition at line 438 of file ConcurrentQueue.h.
SizeType stor::ConcurrentQueue< T, EnqPolicy >::size_ [private] |
Definition at line 442 of file ConcurrentQueue.h.
detail::MemoryType stor::ConcurrentQueue< T, EnqPolicy >::used_ [private] |
Definition at line 451 of file ConcurrentQueue.h.