#include <ConcurrentQueue.h>
Public Types | |
typedef EnqPolicy::SequenceType | SequenceType |
typedef SequenceType::size_type | SizeType |
typedef EnqPolicy::ValueType | ValueType |
Public Member Functions | |
void | addExternallyDroppedEvents (SizeType) |
SizeType | capacity () const |
SizeType | clear () |
ConcurrentQueue (SizeType maxSize=std::numeric_limits< SizeType >::max(), detail::MemoryType maxMemory=std::numeric_limits< detail::MemoryType >::max()) | |
bool | deqNowait (ValueType &) |
bool | deqTimedWait (ValueType &, boost::posix_time::time_duration const &) |
void | deqWait (ValueType &) |
bool | empty () const |
EnqPolicy::ReturnType | enqNowait (T const &item) |
bool | enqTimedWait (T const &p, boost::posix_time::time_duration const &) |
void | enqWait (T const &p) |
bool | full () const |
detail::MemoryType | memory () const |
bool | setCapacity (SizeType n) |
bool | setMemory (detail::MemoryType n) |
SizeType | size () const |
detail::MemoryType | used () const |
~ConcurrentQueue () | |
Private Types | |
typedef boost::mutex::scoped_lock | LockType |
Private Member Functions | |
void | assignItem (T &item, const T &element) |
void | assignItem (std::pair< T, size_t > &item, const T &element) |
ConcurrentQueue (ConcurrentQueue< T, EnqPolicy > const &) | |
bool | insertIfPossible (T const &item) |
bool | isFull () const |
ConcurrentQueue & | operator= (ConcurrentQueue< T, EnqPolicy > const &) |
void | removeHead (ValueType &item) |
bool | removeHeadIfPossible (ValueType &item) |
Private Attributes | |
SizeType | capacity_ |
SequenceType | elements_ |
size_t | elementsDropped_ |
detail::MemoryType | memory_ |
boost::mutex | protectElements_ |
boost::condition | queueNotEmpty_ |
boost::condition | queueNotFull_ |
SizeType | size_ |
detail::MemoryType | used_ |
ConcurrentQueue<T> class template declaration.
Definition at line 289 of file ConcurrentQueue.h.
typedef boost::mutex::scoped_lock stor::ConcurrentQueue< T, EnqPolicy >::LockType [private] |
Definition at line 433 of file ConcurrentQueue.h.
typedef EnqPolicy::SequenceType stor::ConcurrentQueue< T, EnqPolicy >::SequenceType |
Definition at line 293 of file ConcurrentQueue.h.
typedef SequenceType::size_type stor::ConcurrentQueue< T, EnqPolicy >::SizeType |
Definition at line 294 of file ConcurrentQueue.h.
typedef EnqPolicy::ValueType stor::ConcurrentQueue< T, EnqPolicy >::ValueType |
Definition at line 292 of file ConcurrentQueue.h.
stor::ConcurrentQueue< T, EnqPolicy >::ConcurrentQueue | ( | SizeType | maxSize = std::numeric_limits<SizeType>::max() , |
detail::MemoryType | maxMemory = std::numeric_limits<detail::MemoryType>::max() |
||
) | [explicit] |
ConcurrentQueue is always bounded. By default, the bound is absurdly large.
Definition at line 506 of file ConcurrentQueue.h.
: protectElements_(), elements_(), capacity_(maxSize), size_(0), memory_(maxMemory), used_(0), elementsDropped_(0) {}
stor::ConcurrentQueue< T, EnqPolicy >::~ConcurrentQueue | ( | ) |
Applications should arrange to make sure that the destructor of a ConcurrentQueue is not called while some other thread is using that queue. There is some protection against doing this, but it seems impossible to make sufficient protection.
Definition at line 520 of file ConcurrentQueue.h.
References CommonMethods::lock().
{ LockType lock(protectElements_); elements_.clear(); size_ = 0; used_ = 0; elementsDropped_ = 0; }
stor::ConcurrentQueue< T, EnqPolicy >::ConcurrentQueue | ( | ConcurrentQueue< T, EnqPolicy > const & | ) | [private] |
void stor::ConcurrentQueue< T, EnqPolicy >::addExternallyDroppedEvents | ( | SizeType | n | ) |
Adds the passed count to the counter of dropped events
Definition at line 681 of file ConcurrentQueue.h.
References CommonMethods::lock(), and n.
{ LockType lock(protectElements_); elementsDropped_ += n; }
void stor::ConcurrentQueue< T, EnqPolicy >::assignItem | ( | T & | item, |
const T & | element | ||
) | [private] |
Definition at line 735 of file ConcurrentQueue.h.
{ item = element; }
void stor::ConcurrentQueue< T, EnqPolicy >::assignItem | ( | std::pair< T, size_t > & | item, |
const T & | element | ||
) | [private] |
Definition at line 742 of file ConcurrentQueue.h.
{ item.first = element; item.second = elementsDropped_; elementsDropped_ = 0; }
ConcurrentQueue< T, EnqPolicy >::SizeType stor::ConcurrentQueue< T, EnqPolicy >::capacity | ( | ) | const |
Return the capacity of the queue, that is, the maximum number of items it can contain.
Definition at line 624 of file ConcurrentQueue.h.
{ // No lock is necessary: the read is atomic. return capacity_; }
ConcurrentQueue< T, EnqPolicy >::SizeType stor::ConcurrentQueue< T, EnqPolicy >::clear | ( | void | ) |
Remove all items from the queue. This changes the size to zero but does not change the capacity. Returns the number of cleared events.
Definition at line 668 of file ConcurrentQueue.h.
References CommonMethods::lock().
{ LockType lock(protectElements_); SizeType clearedEvents = size_; elementsDropped_ += size_; elements_.clear(); size_ = 0; used_ = 0; return clearedEvents; }
bool stor::ConcurrentQueue< T, EnqPolicy >::deqNowait | ( | ValueType & | item | ) |
Assign the value at the head of the queue to item and then remove the head of the queue. If successful, return true; on failure, return false. This function fill fail without waiting if the queue is empty. This function may throw any exception thrown by the assignment operator of type EnqPolicy::ValueType.
Definition at line 567 of file ConcurrentQueue.h.
References CommonMethods::lock().
{ LockType lock(protectElements_); return removeHeadIfPossible(item); }
bool stor::ConcurrentQueue< T, EnqPolicy >::deqTimedWait | ( | ValueType & | item, |
boost::posix_time::time_duration const & | waitTime | ||
) |
Assign the value at the head of the queue to item and then remove the head of the queue. If the queue is empty wait until is has become non-empty or until timeDuration has passed. Return true if an item has been removed from the queue or false if the timeout has expired. This may throw any exception thrown by the assignment operator of type EnqPolicy::ValueType.
Definition at line 585 of file ConcurrentQueue.h.
References CommonMethods::lock().
{ LockType lock(protectElements_); if (size_ == 0) { queueNotEmpty_.timed_wait(lock, waitTime); } return removeHeadIfPossible(item); }
void stor::ConcurrentQueue< T, EnqPolicy >::deqWait | ( | ValueType & | item | ) |
Assign the value of the head of the queue to item and then remove the head of the queue. If the queue is empty wait until is has become non-empty. This may throw any exception thrown by the assignment operator of type EnqPolicy::ValueType.
Definition at line 575 of file ConcurrentQueue.h.
References CommonMethods::lock().
{ LockType lock(protectElements_); while (size_ == 0) queueNotEmpty_.wait(lock); removeHead(item); }
bool stor::ConcurrentQueue< T, EnqPolicy >::empty | ( | void | ) | const |
Return true if the queue is empty, and false if it is not.
Definition at line 600 of file ConcurrentQueue.h.
{ // No lock is necessary: the read is atomic. return size_ == 0; }
EnqPolicy::ReturnType stor::ConcurrentQueue< T, EnqPolicy >::enqNowait | ( | T const & | item | ) |
Copying a ConcurrentQueue is illegal, as is asigning to a ConcurrentQueue. The copy constructor and copy assignment operator are both private and unimplemented. Add a copy if item to the queue, according to the rules determined by the EnqPolicy; see documentation above the the provided EnqPolicy choices. This may throw any exception thrown by the assignment operator of type T, or badAlloc.
Definition at line 531 of file ConcurrentQueue.h.
References CommonMethods::lock().
{ LockType lock(protectElements_); return EnqPolicy::doEnq (item, elements_, size_, capacity_, used_, memory_, elementsDropped_, queueNotEmpty_); }
bool stor::ConcurrentQueue< T, EnqPolicy >::enqTimedWait | ( | T const & | p, |
boost::posix_time::time_duration const & | waitTime | ||
) |
Add a copy of item to the queue. If the queue is full wait until it becomes non-full or until timeDuration has passed. Return true if the items has been put onto the queue or false if the timeout has expired. This may throw any exception thrown by the assignment operator of T, or badAlloc.
Definition at line 552 of file ConcurrentQueue.h.
References CommonMethods::lock().
{ LockType lock(protectElements_); if ( isFull() ) { queueNotFull_.timed_wait(lock, waitTime); } return insertIfPossible(item); }
void stor::ConcurrentQueue< T, EnqPolicy >::enqWait | ( | T const & | p | ) |
Add a copy of item to the queue. If the queue is full wait until it becomes non-full. This may throw any exception thrown by the assignment operator of type T, or badAlloc.
Definition at line 541 of file ConcurrentQueue.h.
References CommonMethods::lock(), and stor::detail::memoryUsage().
{ LockType lock(protectElements_); while ( isFull() ) queueNotFull_.wait(lock); EnqPolicy::doInsert(item, elements_, size_, detail::memoryUsage(item), used_, queueNotEmpty_); }
bool stor::ConcurrentQueue< T, EnqPolicy >::full | ( | ) | const |
Return true if the queue is full, and false if it is not.
Definition at line 608 of file ConcurrentQueue.h.
References CommonMethods::lock().
{ LockType lock(protectElements_); return isFull(); }
bool stor::ConcurrentQueue< T, EnqPolicy >::insertIfPossible | ( | T const & | item | ) | [private] |
Definition at line 693 of file ConcurrentQueue.h.
References stor::detail::memoryUsage().
{ if ( isFull() ) { ++elementsDropped_; return false; } else { EnqPolicy::doInsert(item, elements_, size_, detail::memoryUsage(item), used_, queueNotEmpty_); return true; } }
bool stor::ConcurrentQueue< T, EnqPolicy >::isFull | ( | ) | const [private] |
detail::MemoryType stor::ConcurrentQueue< T, EnqPolicy >::memory | ( | ) | const |
Return the memory of the queue in bytes, that is, the maximum memory the items in the queue may occupy
Definition at line 650 of file ConcurrentQueue.h.
{ // No lock is necessary: the read is atomic. return memory_; }
ConcurrentQueue& stor::ConcurrentQueue< T, EnqPolicy >::operator= | ( | ConcurrentQueue< T, EnqPolicy > const & | ) | [private] |
void stor::ConcurrentQueue< T, EnqPolicy >::removeHead | ( | ValueType & | item | ) | [private] |
Definition at line 720 of file ConcurrentQueue.h.
References stor::detail::memoryUsage().
{ SequenceType holder; // Move the item out of elements_ in a manner that will not throw. holder.splice(holder.begin(), elements_, elements_.begin()); // Record the change in the length of elements_. --size_; queueNotFull_.notify_one(); assignItem(item, holder.front()); used_ -= detail::memoryUsage( item ); }
bool stor::ConcurrentQueue< T, EnqPolicy >::removeHeadIfPossible | ( | ValueType & | item | ) | [private] |
Definition at line 710 of file ConcurrentQueue.h.
{ if (size_ == 0) return false; removeHead(item); return true; }
bool stor::ConcurrentQueue< T, EnqPolicy >::setCapacity | ( | SizeType | n | ) |
Reset the capacity of the queue. This can only be done if the queue is empty. This function returns false if the queue was not modified, and true if it was modified.
Definition at line 632 of file ConcurrentQueue.h.
References CommonMethods::lock().
{ LockType lock(protectElements_); bool isEmpty = (size_ == 0); if (isEmpty) capacity_ = newcapacity; return isEmpty; }
bool stor::ConcurrentQueue< T, EnqPolicy >::setMemory | ( | detail::MemoryType | n | ) |
Reset the memory usage in bytes of the queue. A value of 0 disabled the memory check. This can only be done if the queue is empty. This function returns false if the queue was not modified, and true if it was modified.
Definition at line 658 of file ConcurrentQueue.h.
References CommonMethods::lock().
{ LockType lock(protectElements_); bool isEmpty = (size_ == 0); if (isEmpty) memory_ = newmemory; return isEmpty; }
ConcurrentQueue< T, EnqPolicy >::SizeType stor::ConcurrentQueue< T, EnqPolicy >::size | ( | void | ) | const |
Return the size of the queue, that is, the number of items it contains.
Definition at line 616 of file ConcurrentQueue.h.
{ // No lock is necessary: the read is atomic. return size_; }
detail::MemoryType stor::ConcurrentQueue< T, EnqPolicy >::used | ( | ) | const |
Return the memory in bytes used by items in the queue
Definition at line 642 of file ConcurrentQueue.h.
{ // No lock is necessary: the read is atomic. return used_; }
SizeType stor::ConcurrentQueue< T, EnqPolicy >::capacity_ [private] |
Definition at line 440 of file ConcurrentQueue.h.
SequenceType stor::ConcurrentQueue< T, EnqPolicy >::elements_ [private] |
Definition at line 439 of file ConcurrentQueue.h.
size_t stor::ConcurrentQueue< T, EnqPolicy >::elementsDropped_ [private] |
Definition at line 451 of file ConcurrentQueue.h.
detail::MemoryType stor::ConcurrentQueue< T, EnqPolicy >::memory_ [private] |
Definition at line 449 of file ConcurrentQueue.h.
boost::mutex stor::ConcurrentQueue< T, EnqPolicy >::protectElements_ [mutable, private] |
Definition at line 435 of file ConcurrentQueue.h.
boost::condition stor::ConcurrentQueue< T, EnqPolicy >::queueNotEmpty_ [mutable, private] |
Definition at line 436 of file ConcurrentQueue.h.
boost::condition stor::ConcurrentQueue< T, EnqPolicy >::queueNotFull_ [mutable, private] |
Definition at line 437 of file ConcurrentQueue.h.
SizeType stor::ConcurrentQueue< T, EnqPolicy >::size_ [private] |
Definition at line 441 of file ConcurrentQueue.h.
detail::MemoryType stor::ConcurrentQueue< T, EnqPolicy >::used_ [private] |
Definition at line 450 of file ConcurrentQueue.h.