CMS 3D CMS Logo

Public Types | Public Member Functions | Private Types | Private Member Functions | Private Attributes

stor::ConcurrentQueue< T, EnqPolicy > Class Template Reference

#include <ConcurrentQueue.h>

List of all members.

Public Types

typedef EnqPolicy::SequenceType SequenceType
typedef SequenceType::size_type SizeType
typedef EnqPolicy::ValueType ValueType

Public Member Functions

void addExternallyDroppedEvents (SizeType)
SizeType capacity () const
SizeType clear ()
 ConcurrentQueue (SizeType maxSize=std::numeric_limits< SizeType >::max(), detail::MemoryType maxMemory=std::numeric_limits< detail::MemoryType >::max())
bool deqNowait (ValueType &)
bool deqTimedWait (ValueType &, boost::posix_time::time_duration const &)
void deqWait (ValueType &)
bool empty () const
EnqPolicy::ReturnType enqNowait (T const &item)
bool enqTimedWait (T const &p, boost::posix_time::time_duration const &)
void enqWait (T const &p)
bool full () const
detail::MemoryType memory () const
bool setCapacity (SizeType n)
bool setMemory (detail::MemoryType n)
SizeType size () const
detail::MemoryType used () const
 ~ConcurrentQueue ()

Private Types

typedef boost::mutex::scoped_lock LockType

Private Member Functions

void assignItem (T &item, const T &element)
void assignItem (std::pair< T, size_t > &item, const T &element)
 ConcurrentQueue (ConcurrentQueue< T, EnqPolicy > const &)
bool insertIfPossible (T const &item)
bool isFull () const
ConcurrentQueueoperator= (ConcurrentQueue< T, EnqPolicy > const &)
void removeHead (ValueType &item)
bool removeHeadIfPossible (ValueType &item)

Private Attributes

SizeType capacity_
SequenceType elements_
size_t elementsDropped_
detail::MemoryType memory_
boost::mutex protectElements_
boost::condition queueNotEmpty_
boost::condition queueNotFull_
SizeType size_
detail::MemoryType used_

Detailed Description

template<class T, class EnqPolicy = FailIfFull<T>>
class stor::ConcurrentQueue< T, EnqPolicy >

ConcurrentQueue<T> class template declaration.

Definition at line 288 of file ConcurrentQueue.h.


Member Typedef Documentation

template<class T, class EnqPolicy = FailIfFull<T>>
typedef boost::mutex::scoped_lock stor::ConcurrentQueue< T, EnqPolicy >::LockType [private]

Definition at line 432 of file ConcurrentQueue.h.

template<class T, class EnqPolicy = FailIfFull<T>>
typedef EnqPolicy::SequenceType stor::ConcurrentQueue< T, EnqPolicy >::SequenceType

Definition at line 292 of file ConcurrentQueue.h.

template<class T, class EnqPolicy = FailIfFull<T>>
typedef SequenceType::size_type stor::ConcurrentQueue< T, EnqPolicy >::SizeType

Definition at line 293 of file ConcurrentQueue.h.

template<class T, class EnqPolicy = FailIfFull<T>>
typedef EnqPolicy::ValueType stor::ConcurrentQueue< T, EnqPolicy >::ValueType

Definition at line 291 of file ConcurrentQueue.h.


Constructor & Destructor Documentation

template<class T , class EnqPolicy >
stor::ConcurrentQueue< T, EnqPolicy >::ConcurrentQueue ( SizeType  maxSize = std::numeric_limits<SizeType>::max(),
detail::MemoryType  maxMemory = std::numeric_limits<detail::MemoryType>::max() 
) [explicit]

ConcurrentQueue is always bounded. By default, the bound is absurdly large.

Definition at line 505 of file ConcurrentQueue.h.

template<class T , class EnqPolicy >
stor::ConcurrentQueue< T, EnqPolicy >::~ConcurrentQueue ( )

Applications should arrange to make sure that the destructor of a ConcurrentQueue is not called while some other thread is using that queue. There is some protection against doing this, but it seems impossible to make sufficient protection.

Definition at line 519 of file ConcurrentQueue.h.

References CommonMethods::lock(), and edm::size_().

template<class T, class EnqPolicy = FailIfFull<T>>
stor::ConcurrentQueue< T, EnqPolicy >::ConcurrentQueue ( ConcurrentQueue< T, EnqPolicy > const &  ) [private]

Member Function Documentation

template<class T , class EnqPolicy >
void stor::ConcurrentQueue< T, EnqPolicy >::addExternallyDroppedEvents ( SizeType  n)

Adds the passed count to the counter of dropped events

Definition at line 680 of file ConcurrentQueue.h.

References CommonMethods::lock(), and n.

template<class T, class EnqPolicy >
void stor::ConcurrentQueue< T, EnqPolicy >::assignItem ( T item,
const T element 
) [private]

Definition at line 734 of file ConcurrentQueue.h.

  {
    item = element;
  }
template<class T, class EnqPolicy >
void stor::ConcurrentQueue< T, EnqPolicy >::assignItem ( std::pair< T, size_t > &  item,
const T element 
) [private]

Definition at line 741 of file ConcurrentQueue.h.

  {
    item.first = element;
    item.second = elementsDropped_;
    elementsDropped_ = 0;
  }
template<class T , class EnqPolicy >
ConcurrentQueue< T, EnqPolicy >::SizeType stor::ConcurrentQueue< T, EnqPolicy >::capacity ( ) const

Return the capacity of the queue, that is, the maximum number of items it can contain.

Definition at line 623 of file ConcurrentQueue.h.

  {
    // No lock is necessary: the read is atomic.
    return capacity_;
  }
template<class T , class EnqPolicy >
ConcurrentQueue< T, EnqPolicy >::SizeType stor::ConcurrentQueue< T, EnqPolicy >::clear ( void  )

Remove all items from the queue. This changes the size to zero but does not change the capacity. Returns the number of cleared events.

Definition at line 667 of file ConcurrentQueue.h.

References CommonMethods::lock(), and edm::size_().

  {
    LockType lock(protectElements_);
    SizeType clearedEvents = size_;
    elementsDropped_ += size_;
    elements_.clear();
    size_ = 0;
    used_ = 0;
    return clearedEvents;
  }
template<class T , class EnqPolicy >
bool stor::ConcurrentQueue< T, EnqPolicy >::deqNowait ( ValueType item)

Assign the value at the head of the queue to item and then remove the head of the queue. If successful, return true; on failure, return false. This function fill fail without waiting if the queue is empty. This function may throw any exception thrown by the assignment operator of type EnqPolicy::ValueType.

Definition at line 566 of file ConcurrentQueue.h.

References CommonMethods::lock().

template<class T , class EnqPolicy >
bool stor::ConcurrentQueue< T, EnqPolicy >::deqTimedWait ( ValueType item,
boost::posix_time::time_duration const &  waitTime 
)

Assign the value at the head of the queue to item and then remove the head of the queue. If the queue is empty wait until is has become non-empty or until timeDuration has passed. Return true if an item has been removed from the queue or false if the timeout has expired. This may throw any exception thrown by the assignment operator of type EnqPolicy::ValueType.

Definition at line 584 of file ConcurrentQueue.h.

References CommonMethods::lock(), and edm::size_().

  {
    LockType lock(protectElements_);
    if (size_ == 0)
    {
      queueNotEmpty_.timed_wait(lock, waitTime);
    }
    return removeHeadIfPossible(item);
  }
template<class T , class EnqPolicy >
void stor::ConcurrentQueue< T, EnqPolicy >::deqWait ( ValueType item)

Assign the value of the head of the queue to item and then remove the head of the queue. If the queue is empty wait until is has become non-empty. This may throw any exception thrown by the assignment operator of type EnqPolicy::ValueType.

Definition at line 574 of file ConcurrentQueue.h.

References CommonMethods::lock(), and edm::size_().

  {
    LockType lock(protectElements_);
    while (size_ == 0) queueNotEmpty_.wait(lock);
    removeHead(item);
  }
template<class T , class EnqPolicy >
bool stor::ConcurrentQueue< T, EnqPolicy >::empty ( void  ) const

Return true if the queue is empty, and false if it is not.

Definition at line 599 of file ConcurrentQueue.h.

References edm::size_().

  {
    // No lock is necessary: the read is atomic.
    return size_ == 0;
  }
template<class T, class EnqPolicy >
EnqPolicy::ReturnType stor::ConcurrentQueue< T, EnqPolicy >::enqNowait ( T const &  item)

Copying a ConcurrentQueue is illegal, as is asigning to a ConcurrentQueue. The copy constructor and copy assignment operator are both private and unimplemented. Add a copy if item to the queue, according to the rules determined by the EnqPolicy; see documentation above the the provided EnqPolicy choices. This may throw any exception thrown by the assignment operator of type T, or badAlloc.

Definition at line 530 of file ConcurrentQueue.h.

References CommonMethods::lock(), and edm::size_().

template<class T, class EnqPolicy >
bool stor::ConcurrentQueue< T, EnqPolicy >::enqTimedWait ( T const &  p,
boost::posix_time::time_duration const &  waitTime 
)

Add a copy of item to the queue. If the queue is full wait until it becomes non-full or until timeDuration has passed. Return true if the items has been put onto the queue or false if the timeout has expired. This may throw any exception thrown by the assignment operator of T, or badAlloc.

Definition at line 551 of file ConcurrentQueue.h.

References CommonMethods::lock().

  {
    LockType lock(protectElements_);
    if ( isFull() )
    {
      queueNotFull_.timed_wait(lock, waitTime);
    }
    return insertIfPossible(item);
  }
template<class T, class EnqPolicy >
void stor::ConcurrentQueue< T, EnqPolicy >::enqWait ( T const &  p)

Add a copy of item to the queue. If the queue is full wait until it becomes non-full. This may throw any exception thrown by the assignment operator of type T, or badAlloc.

Definition at line 540 of file ConcurrentQueue.h.

References CommonMethods::lock(), stor::detail::memoryUsage(), and edm::size_().

  {
    LockType lock(protectElements_);
    while ( isFull() ) queueNotFull_.wait(lock);
    EnqPolicy::doInsert(item, elements_, size_,
      detail::memoryUsage(item), used_, queueNotEmpty_);
  }
template<class T , class EnqPolicy >
bool stor::ConcurrentQueue< T, EnqPolicy >::full ( ) const

Return true if the queue is full, and false if it is not.

Definition at line 607 of file ConcurrentQueue.h.

References CommonMethods::lock().

template<class T, class EnqPolicy >
bool stor::ConcurrentQueue< T, EnqPolicy >::insertIfPossible ( T const &  item) [private]

Definition at line 692 of file ConcurrentQueue.h.

References stor::detail::memoryUsage(), and edm::size_().

  {
    if ( isFull() )
    {
      ++elementsDropped_;
      return false;
    }
    else
    {
      EnqPolicy::doInsert(item, elements_, size_,
      detail::memoryUsage(item), used_, queueNotEmpty_);
      return true;
    }
  }
template<class T , class EnqPolicy >
bool stor::ConcurrentQueue< T, EnqPolicy >::isFull ( ) const [private]

Definition at line 750 of file ConcurrentQueue.h.

References edm::size_().

  {
    if (size_ >= capacity_ || used_ >= memory_) return true;
    return false;
  }
template<class T , class EnqPolicy >
detail::MemoryType stor::ConcurrentQueue< T, EnqPolicy >::memory ( ) const

Return the memory of the queue in bytes, that is, the maximum memory the items in the queue may occupy

Definition at line 649 of file ConcurrentQueue.h.

  {
    // No lock is necessary: the read is atomic.
    return memory_;
  }
template<class T, class EnqPolicy = FailIfFull<T>>
ConcurrentQueue& stor::ConcurrentQueue< T, EnqPolicy >::operator= ( ConcurrentQueue< T, EnqPolicy > const &  ) [private]
template<class T , class EnqPolicy >
void stor::ConcurrentQueue< T, EnqPolicy >::removeHead ( ValueType item) [private]

Definition at line 719 of file ConcurrentQueue.h.

References stor::detail::memoryUsage(), and edm::size_().

  {
    SequenceType holder;
    // Move the item out of elements_ in a manner that will not throw.
    holder.splice(holder.begin(), elements_, elements_.begin());
    // Record the change in the length of elements_.
    --size_;
    queueNotFull_.notify_one();

    assignItem(item, holder.front());
    used_ -= detail::memoryUsage( item );
  }
template<class T , class EnqPolicy >
bool stor::ConcurrentQueue< T, EnqPolicy >::removeHeadIfPossible ( ValueType item) [private]

Definition at line 709 of file ConcurrentQueue.h.

References edm::size_().

  {
    if (size_ == 0) return false;

    removeHead(item);
    return true;
  }
template<class T , class EnqPolicy >
bool stor::ConcurrentQueue< T, EnqPolicy >::setCapacity ( SizeType  n)

Reset the capacity of the queue. This can only be done if the queue is empty. This function returns false if the queue was not modified, and true if it was modified.

Definition at line 631 of file ConcurrentQueue.h.

References CommonMethods::lock(), and edm::size_().

  {
    LockType lock(protectElements_);
    bool isEmpty = (size_ == 0);
    if (isEmpty) capacity_ = newcapacity;
    return isEmpty;
  }
template<class T , class EnqPolicy >
bool stor::ConcurrentQueue< T, EnqPolicy >::setMemory ( detail::MemoryType  n)

Reset the memory usage in bytes of the queue. A value of 0 disabled the memory check. This can only be done if the queue is empty. This function returns false if the queue was not modified, and true if it was modified.

Definition at line 657 of file ConcurrentQueue.h.

References CommonMethods::lock(), and edm::size_().

  {
    LockType lock(protectElements_);
    bool isEmpty = (size_ == 0);
    if (isEmpty) memory_ = newmemory;
    return isEmpty;
  }
template<class T , class EnqPolicy >
ConcurrentQueue< T, EnqPolicy >::SizeType stor::ConcurrentQueue< T, EnqPolicy >::size ( void  ) const

Return the size of the queue, that is, the number of items it contains.

Definition at line 615 of file ConcurrentQueue.h.

References edm::size_().

  {
    // No lock is necessary: the read is atomic.
    return size_;
  }
template<class T , class EnqPolicy >
detail::MemoryType stor::ConcurrentQueue< T, EnqPolicy >::used ( ) const

Return the memory in bytes used by items in the queue

Definition at line 641 of file ConcurrentQueue.h.

  {
    // No lock is necessary: the read is atomic.
    return used_;
  }

Member Data Documentation

template<class T, class EnqPolicy = FailIfFull<T>>
SizeType stor::ConcurrentQueue< T, EnqPolicy >::capacity_ [private]

Definition at line 439 of file ConcurrentQueue.h.

template<class T, class EnqPolicy = FailIfFull<T>>
SequenceType stor::ConcurrentQueue< T, EnqPolicy >::elements_ [private]

Definition at line 438 of file ConcurrentQueue.h.

template<class T, class EnqPolicy = FailIfFull<T>>
size_t stor::ConcurrentQueue< T, EnqPolicy >::elementsDropped_ [private]

Definition at line 450 of file ConcurrentQueue.h.

template<class T, class EnqPolicy = FailIfFull<T>>
detail::MemoryType stor::ConcurrentQueue< T, EnqPolicy >::memory_ [private]

Definition at line 448 of file ConcurrentQueue.h.

template<class T, class EnqPolicy = FailIfFull<T>>
boost::mutex stor::ConcurrentQueue< T, EnqPolicy >::protectElements_ [mutable, private]

Definition at line 434 of file ConcurrentQueue.h.

template<class T, class EnqPolicy = FailIfFull<T>>
boost::condition stor::ConcurrentQueue< T, EnqPolicy >::queueNotEmpty_ [mutable, private]

Definition at line 435 of file ConcurrentQueue.h.

template<class T, class EnqPolicy = FailIfFull<T>>
boost::condition stor::ConcurrentQueue< T, EnqPolicy >::queueNotFull_ [mutable, private]

Definition at line 436 of file ConcurrentQueue.h.

template<class T, class EnqPolicy = FailIfFull<T>>
SizeType stor::ConcurrentQueue< T, EnqPolicy >::size_ [private]

Definition at line 440 of file ConcurrentQueue.h.

template<class T, class EnqPolicy = FailIfFull<T>>
detail::MemoryType stor::ConcurrentQueue< T, EnqPolicy >::used_ [private]

Definition at line 449 of file ConcurrentQueue.h.