#include <ConcurrentQueue.h>
Public Types | |
typedef EnqPolicy::SequenceType | SequenceType |
typedef SequenceType::size_type | SizeType |
typedef EnqPolicy::ValueType | ValueType |
Public Member Functions | |
void | addExternallyDroppedEvents (SizeType) |
SizeType | capacity () const |
SizeType | clear () |
ConcurrentQueue (SizeType maxSize=std::numeric_limits< SizeType >::max(), detail::MemoryType maxMemory=std::numeric_limits< detail::MemoryType >::max()) | |
bool | deqNowait (ValueType &) |
bool | deqTimedWait (ValueType &, boost::posix_time::time_duration const &) |
void | deqWait (ValueType &) |
bool | empty () const |
EnqPolicy::ReturnType | enqNowait (T const &item) |
bool | enqTimedWait (T const &p, boost::posix_time::time_duration const &) |
void | enqWait (T const &p) |
bool | full () const |
detail::MemoryType | memory () const |
bool | setCapacity (SizeType n) |
bool | setMemory (detail::MemoryType n) |
SizeType | size () const |
detail::MemoryType | used () const |
~ConcurrentQueue () | |
Private Types | |
typedef boost::mutex::scoped_lock | LockType |
Private Member Functions | |
void | assignItem (T &item, const T &element) |
void | assignItem (std::pair< T, size_t > &item, const T &element) |
ConcurrentQueue (ConcurrentQueue< T, EnqPolicy > const &) | |
bool | insertIfPossible (T const &item) |
bool | isFull () const |
ConcurrentQueue & | operator= (ConcurrentQueue< T, EnqPolicy > const &) |
void | removeHead (ValueType &item) |
bool | removeHeadIfPossible (ValueType &item) |
Private Attributes | |
SizeType | capacity_ |
SequenceType | elements_ |
size_t | elementsDropped_ |
detail::MemoryType | memory_ |
boost::mutex | protectElements_ |
boost::condition | queueNotEmpty_ |
boost::condition | queueNotFull_ |
SizeType | size_ |
detail::MemoryType | used_ |
ConcurrentQueue<T> class template declaration.
Definition at line 288 of file ConcurrentQueue.h.
typedef boost::mutex::scoped_lock stor::ConcurrentQueue< T, EnqPolicy >::LockType [private] |
Definition at line 432 of file ConcurrentQueue.h.
typedef EnqPolicy::SequenceType stor::ConcurrentQueue< T, EnqPolicy >::SequenceType |
Definition at line 292 of file ConcurrentQueue.h.
typedef SequenceType::size_type stor::ConcurrentQueue< T, EnqPolicy >::SizeType |
Definition at line 293 of file ConcurrentQueue.h.
typedef EnqPolicy::ValueType stor::ConcurrentQueue< T, EnqPolicy >::ValueType |
Definition at line 291 of file ConcurrentQueue.h.
stor::ConcurrentQueue< T, EnqPolicy >::ConcurrentQueue | ( | SizeType | maxSize = std::numeric_limits<SizeType>::max() , |
detail::MemoryType | maxMemory = std::numeric_limits<detail::MemoryType>::max() |
||
) | [explicit] |
ConcurrentQueue is always bounded. By default, the bound is absurdly large.
Definition at line 505 of file ConcurrentQueue.h.
: protectElements_(), elements_(), capacity_(maxSize), size_(0), memory_(maxMemory), used_(0), elementsDropped_(0) {}
stor::ConcurrentQueue< T, EnqPolicy >::~ConcurrentQueue | ( | ) |
Applications should arrange to make sure that the destructor of a ConcurrentQueue is not called while some other thread is using that queue. There is some protection against doing this, but it seems impossible to make sufficient protection.
Definition at line 519 of file ConcurrentQueue.h.
References CommonMethods::lock().
{ LockType lock(protectElements_); elements_.clear(); size_ = 0; used_ = 0; elementsDropped_ = 0; }
stor::ConcurrentQueue< T, EnqPolicy >::ConcurrentQueue | ( | ConcurrentQueue< T, EnqPolicy > const & | ) | [private] |
void stor::ConcurrentQueue< T, EnqPolicy >::addExternallyDroppedEvents | ( | SizeType | n | ) |
Adds the passed count to the counter of dropped events
Definition at line 680 of file ConcurrentQueue.h.
References CommonMethods::lock(), and n.
{ LockType lock(protectElements_); elementsDropped_ += n; }
void stor::ConcurrentQueue< T, EnqPolicy >::assignItem | ( | T & | item, |
const T & | element | ||
) | [private] |
Definition at line 734 of file ConcurrentQueue.h.
{ item = element; }
void stor::ConcurrentQueue< T, EnqPolicy >::assignItem | ( | std::pair< T, size_t > & | item, |
const T & | element | ||
) | [private] |
Definition at line 741 of file ConcurrentQueue.h.
{ item.first = element; item.second = elementsDropped_; elementsDropped_ = 0; }
ConcurrentQueue< T, EnqPolicy >::SizeType stor::ConcurrentQueue< T, EnqPolicy >::capacity | ( | ) | const |
Return the capacity of the queue, that is, the maximum number of items it can contain.
Definition at line 623 of file ConcurrentQueue.h.
{ // No lock is necessary: the read is atomic. return capacity_; }
ConcurrentQueue< T, EnqPolicy >::SizeType stor::ConcurrentQueue< T, EnqPolicy >::clear | ( | void | ) |
Remove all items from the queue. This changes the size to zero but does not change the capacity. Returns the number of cleared events.
Definition at line 667 of file ConcurrentQueue.h.
References CommonMethods::lock().
{ LockType lock(protectElements_); SizeType clearedEvents = size_; elementsDropped_ += size_; elements_.clear(); size_ = 0; used_ = 0; return clearedEvents; }
bool stor::ConcurrentQueue< T, EnqPolicy >::deqNowait | ( | ValueType & | item | ) |
Assign the value at the head of the queue to item and then remove the head of the queue. If successful, return true; on failure, return false. This function fill fail without waiting if the queue is empty. This function may throw any exception thrown by the assignment operator of type EnqPolicy::ValueType.
Definition at line 566 of file ConcurrentQueue.h.
References CommonMethods::lock().
{ LockType lock(protectElements_); return removeHeadIfPossible(item); }
bool stor::ConcurrentQueue< T, EnqPolicy >::deqTimedWait | ( | ValueType & | item, |
boost::posix_time::time_duration const & | waitTime | ||
) |
Assign the value at the head of the queue to item and then remove the head of the queue. If the queue is empty wait until is has become non-empty or until timeDuration has passed. Return true if an item has been removed from the queue or false if the timeout has expired. This may throw any exception thrown by the assignment operator of type EnqPolicy::ValueType.
Definition at line 584 of file ConcurrentQueue.h.
References CommonMethods::lock().
{ LockType lock(protectElements_); if (size_ == 0) { queueNotEmpty_.timed_wait(lock, waitTime); } return removeHeadIfPossible(item); }
void stor::ConcurrentQueue< T, EnqPolicy >::deqWait | ( | ValueType & | item | ) |
Assign the value of the head of the queue to item and then remove the head of the queue. If the queue is empty wait until is has become non-empty. This may throw any exception thrown by the assignment operator of type EnqPolicy::ValueType.
Definition at line 574 of file ConcurrentQueue.h.
References CommonMethods::lock().
{ LockType lock(protectElements_); while (size_ == 0) queueNotEmpty_.wait(lock); removeHead(item); }
bool stor::ConcurrentQueue< T, EnqPolicy >::empty | ( | void | ) | const |
Return true if the queue is empty, and false if it is not.
Definition at line 599 of file ConcurrentQueue.h.
{ // No lock is necessary: the read is atomic. return size_ == 0; }
EnqPolicy::ReturnType stor::ConcurrentQueue< T, EnqPolicy >::enqNowait | ( | T const & | item | ) |
Copying a ConcurrentQueue is illegal, as is asigning to a ConcurrentQueue. The copy constructor and copy assignment operator are both private and unimplemented. Add a copy if item to the queue, according to the rules determined by the EnqPolicy; see documentation above the the provided EnqPolicy choices. This may throw any exception thrown by the assignment operator of type T, or badAlloc.
Definition at line 530 of file ConcurrentQueue.h.
References CommonMethods::lock().
{ LockType lock(protectElements_); return EnqPolicy::doEnq (item, elements_, size_, capacity_, used_, memory_, elementsDropped_, queueNotEmpty_); }
bool stor::ConcurrentQueue< T, EnqPolicy >::enqTimedWait | ( | T const & | p, |
boost::posix_time::time_duration const & | waitTime | ||
) |
Add a copy of item to the queue. If the queue is full wait until it becomes non-full or until timeDuration has passed. Return true if the items has been put onto the queue or false if the timeout has expired. This may throw any exception thrown by the assignment operator of T, or badAlloc.
Definition at line 551 of file ConcurrentQueue.h.
References CommonMethods::lock().
{ LockType lock(protectElements_); if ( isFull() ) { queueNotFull_.timed_wait(lock, waitTime); } return insertIfPossible(item); }
void stor::ConcurrentQueue< T, EnqPolicy >::enqWait | ( | T const & | p | ) |
Add a copy of item to the queue. If the queue is full wait until it becomes non-full. This may throw any exception thrown by the assignment operator of type T, or badAlloc.
Definition at line 540 of file ConcurrentQueue.h.
References CommonMethods::lock(), and stor::detail::memoryUsage().
{ LockType lock(protectElements_); while ( isFull() ) queueNotFull_.wait(lock); EnqPolicy::doInsert(item, elements_, size_, detail::memoryUsage(item), used_, queueNotEmpty_); }
bool stor::ConcurrentQueue< T, EnqPolicy >::full | ( | ) | const |
Return true if the queue is full, and false if it is not.
Definition at line 607 of file ConcurrentQueue.h.
References CommonMethods::lock().
{ LockType lock(protectElements_); return isFull(); }
bool stor::ConcurrentQueue< T, EnqPolicy >::insertIfPossible | ( | T const & | item | ) | [private] |
Definition at line 692 of file ConcurrentQueue.h.
References stor::detail::memoryUsage().
{ if ( isFull() ) { ++elementsDropped_; return false; } else { EnqPolicy::doInsert(item, elements_, size_, detail::memoryUsage(item), used_, queueNotEmpty_); return true; } }
bool stor::ConcurrentQueue< T, EnqPolicy >::isFull | ( | ) | const [private] |
detail::MemoryType stor::ConcurrentQueue< T, EnqPolicy >::memory | ( | ) | const |
Return the memory of the queue in bytes, that is, the maximum memory the items in the queue may occupy
Definition at line 649 of file ConcurrentQueue.h.
{ // No lock is necessary: the read is atomic. return memory_; }
ConcurrentQueue& stor::ConcurrentQueue< T, EnqPolicy >::operator= | ( | ConcurrentQueue< T, EnqPolicy > const & | ) | [private] |
void stor::ConcurrentQueue< T, EnqPolicy >::removeHead | ( | ValueType & | item | ) | [private] |
Definition at line 719 of file ConcurrentQueue.h.
References stor::detail::memoryUsage().
{ SequenceType holder; // Move the item out of elements_ in a manner that will not throw. holder.splice(holder.begin(), elements_, elements_.begin()); // Record the change in the length of elements_. --size_; queueNotFull_.notify_one(); assignItem(item, holder.front()); used_ -= detail::memoryUsage( item ); }
bool stor::ConcurrentQueue< T, EnqPolicy >::removeHeadIfPossible | ( | ValueType & | item | ) | [private] |
Definition at line 709 of file ConcurrentQueue.h.
{ if (size_ == 0) return false; removeHead(item); return true; }
bool stor::ConcurrentQueue< T, EnqPolicy >::setCapacity | ( | SizeType | n | ) |
Reset the capacity of the queue. This can only be done if the queue is empty. This function returns false if the queue was not modified, and true if it was modified.
Definition at line 631 of file ConcurrentQueue.h.
References CommonMethods::lock().
{ LockType lock(protectElements_); bool isEmpty = (size_ == 0); if (isEmpty) capacity_ = newcapacity; return isEmpty; }
bool stor::ConcurrentQueue< T, EnqPolicy >::setMemory | ( | detail::MemoryType | n | ) |
Reset the memory usage in bytes of the queue. A value of 0 disabled the memory check. This can only be done if the queue is empty. This function returns false if the queue was not modified, and true if it was modified.
Definition at line 657 of file ConcurrentQueue.h.
References CommonMethods::lock().
{ LockType lock(protectElements_); bool isEmpty = (size_ == 0); if (isEmpty) memory_ = newmemory; return isEmpty; }
ConcurrentQueue< T, EnqPolicy >::SizeType stor::ConcurrentQueue< T, EnqPolicy >::size | ( | void | ) | const |
Return the size of the queue, that is, the number of items it contains.
Definition at line 615 of file ConcurrentQueue.h.
{ // No lock is necessary: the read is atomic. return size_; }
detail::MemoryType stor::ConcurrentQueue< T, EnqPolicy >::used | ( | ) | const |
Return the memory in bytes used by items in the queue
Definition at line 641 of file ConcurrentQueue.h.
{ // No lock is necessary: the read is atomic. return used_; }
SizeType stor::ConcurrentQueue< T, EnqPolicy >::capacity_ [private] |
Definition at line 439 of file ConcurrentQueue.h.
SequenceType stor::ConcurrentQueue< T, EnqPolicy >::elements_ [private] |
Definition at line 438 of file ConcurrentQueue.h.
size_t stor::ConcurrentQueue< T, EnqPolicy >::elementsDropped_ [private] |
Definition at line 450 of file ConcurrentQueue.h.
detail::MemoryType stor::ConcurrentQueue< T, EnqPolicy >::memory_ [private] |
Definition at line 448 of file ConcurrentQueue.h.
boost::mutex stor::ConcurrentQueue< T, EnqPolicy >::protectElements_ [mutable, private] |
Definition at line 434 of file ConcurrentQueue.h.
boost::condition stor::ConcurrentQueue< T, EnqPolicy >::queueNotEmpty_ [mutable, private] |
Definition at line 435 of file ConcurrentQueue.h.
boost::condition stor::ConcurrentQueue< T, EnqPolicy >::queueNotFull_ [mutable, private] |
Definition at line 436 of file ConcurrentQueue.h.
SizeType stor::ConcurrentQueue< T, EnqPolicy >::size_ [private] |
Definition at line 440 of file ConcurrentQueue.h.
detail::MemoryType stor::ConcurrentQueue< T, EnqPolicy >::used_ [private] |
Definition at line 449 of file ConcurrentQueue.h.