CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Types | Public Member Functions | Private Types | Private Member Functions | Private Attributes
stor::ConcurrentQueue< T, EnqPolicy > Class Template Reference

#include <ConcurrentQueue.h>

Public Types

typedef EnqPolicy::SequenceType SequenceType
 
typedef SequenceType::size_type SizeType
 
typedef EnqPolicy::ValueType ValueType
 

Public Member Functions

void addExternallyDroppedEvents (SizeType)
 
SizeType capacity () const
 
SizeType clear ()
 
 ConcurrentQueue (SizeType maxSize=std::numeric_limits< SizeType >::max(), detail::MemoryType maxMemory=std::numeric_limits< detail::MemoryType >::max())
 
bool deqNowait (ValueType &)
 
bool deqTimedWait (ValueType &, boost::posix_time::time_duration const &)
 
void deqWait (ValueType &)
 
bool empty () const
 
EnqPolicy::ReturnType enqNowait (T const &item)
 
bool enqTimedWait (T const &p, boost::posix_time::time_duration const &)
 
void enqWait (T const &p)
 
bool full () const
 
detail::MemoryType memory () const
 
bool setCapacity (SizeType n)
 
bool setMemory (detail::MemoryType n)
 
SizeType size () const
 
detail::MemoryType used () const
 
 ~ConcurrentQueue ()
 

Private Types

typedef boost::mutex::scoped_lock LockType
 

Private Member Functions

void assignItem (T &item, const T &element)
 
void assignItem (std::pair< T, size_t > &item, const T &element)
 
 ConcurrentQueue (ConcurrentQueue< T, EnqPolicy > const &)
 
bool insertIfPossible (T const &item)
 
bool isFull () const
 
ConcurrentQueueoperator= (ConcurrentQueue< T, EnqPolicy > const &)
 
void removeHead (ValueType &item)
 
bool removeHeadIfPossible (ValueType &item)
 

Private Attributes

SizeType capacity_
 
SequenceType elements_
 
size_t elementsDropped_
 
detail::MemoryType memory_
 
boost::mutex protectElements_
 
boost::condition queueNotEmpty_
 
boost::condition queueNotFull_
 
SizeType size_
 
detail::MemoryType used_
 

Detailed Description

template<class T, class EnqPolicy = FailIfFull<T>>
class stor::ConcurrentQueue< T, EnqPolicy >

ConcurrentQueue<T> class template declaration.

Definition at line 290 of file ConcurrentQueue.h.

Member Typedef Documentation

template<class T, class EnqPolicy = FailIfFull<T>>
typedef boost::mutex::scoped_lock stor::ConcurrentQueue< T, EnqPolicy >::LockType
private

Definition at line 434 of file ConcurrentQueue.h.

template<class T, class EnqPolicy = FailIfFull<T>>
typedef EnqPolicy::SequenceType stor::ConcurrentQueue< T, EnqPolicy >::SequenceType

Definition at line 294 of file ConcurrentQueue.h.

template<class T, class EnqPolicy = FailIfFull<T>>
typedef SequenceType::size_type stor::ConcurrentQueue< T, EnqPolicy >::SizeType

Definition at line 295 of file ConcurrentQueue.h.

template<class T, class EnqPolicy = FailIfFull<T>>
typedef EnqPolicy::ValueType stor::ConcurrentQueue< T, EnqPolicy >::ValueType

Definition at line 293 of file ConcurrentQueue.h.

Constructor & Destructor Documentation

template<class T , class EnqPolicy >
stor::ConcurrentQueue< T, EnqPolicy >::ConcurrentQueue ( SizeType  maxSize = std::numeric_limits<SizeType>::max(),
detail::MemoryType  maxMemory = std::numeric_limits<detail::MemoryType>::max() 
)
explicit

ConcurrentQueue is always bounded. By default, the bound is absurdly large.

Definition at line 507 of file ConcurrentQueue.h.

510  :
512  elements_(),
514  size_(0),
515  memory_(maxMemory),
516  used_(0),
518  {}
boost::mutex protectElements_
tuple maxSize
&#39;/store/data/Commissioning08/BeamHalo/RECO/StuffAlmostToP5_v1/000/061/642/10A0FE34-A67D-DD11-AD05-000...
detail::MemoryType used_
detail::MemoryType memory_
template<class T , class EnqPolicy >
stor::ConcurrentQueue< T, EnqPolicy >::~ConcurrentQueue ( )

Applications should arrange to make sure that the destructor of a ConcurrentQueue is not called while some other thread is using that queue. There is some protection against doing this, but it seems impossible to make sufficient protection.

Definition at line 521 of file ConcurrentQueue.h.

References CommonMethods::lock(), and edm::size_().

522  {
524  elements_.clear();
525  size_ = 0;
526  used_ = 0;
527  elementsDropped_ = 0;
528  }
boost::mutex protectElements_
boost::mutex::scoped_lock LockType
detail::MemoryType used_
template<class T, class EnqPolicy = FailIfFull<T>>
stor::ConcurrentQueue< T, EnqPolicy >::ConcurrentQueue ( ConcurrentQueue< T, EnqPolicy > const &  )
private

Member Function Documentation

template<class T , class EnqPolicy >
void stor::ConcurrentQueue< T, EnqPolicy >::addExternallyDroppedEvents ( SizeType  n)

Adds the passed count to the counter of dropped events

Definition at line 682 of file ConcurrentQueue.h.

References CommonMethods::lock(), and n.

683  {
685  elementsDropped_ += n;
686  }
boost::mutex protectElements_
boost::mutex::scoped_lock LockType
template<class T, class EnqPolicy >
void stor::ConcurrentQueue< T, EnqPolicy >::assignItem ( T item,
const T element 
)
private

Definition at line 736 of file ConcurrentQueue.h.

737  {
738  item = element;
739  }
template<class T, class EnqPolicy >
void stor::ConcurrentQueue< T, EnqPolicy >::assignItem ( std::pair< T, size_t > &  item,
const T element 
)
private

Definition at line 743 of file ConcurrentQueue.h.

744  {
745  item.first = element;
746  item.second = elementsDropped_;
747  elementsDropped_ = 0;
748  }
template<class T , class EnqPolicy >
ConcurrentQueue< T, EnqPolicy >::SizeType stor::ConcurrentQueue< T, EnqPolicy >::capacity ( ) const

Return the capacity of the queue, that is, the maximum number of items it can contain.

Definition at line 625 of file ConcurrentQueue.h.

626  {
627  // No lock is necessary: the read is atomic.
628  return capacity_;
629  }
template<class T , class EnqPolicy >
ConcurrentQueue< T, EnqPolicy >::SizeType stor::ConcurrentQueue< T, EnqPolicy >::clear ( void  )

Remove all items from the queue. This changes the size to zero but does not change the capacity. Returns the number of cleared events.

Definition at line 669 of file ConcurrentQueue.h.

References CommonMethods::lock(), and edm::size_().

Referenced by Vispa.Views.WidgetView.WidgetView::closeEvent(), Vispa.Views.BoxDecayView.BoxDecayView::closeEvent(), Vispa.Share.FindAlgorithm.FindAlgorithm::findUsingFindDialog(), Vispa.Views.LineDecayView.LineDecayView::setDataObjects(), Vispa.Views.WidgetView.WidgetView::setDataObjects(), Vispa.Views.TreeView.TreeView::updateContent(), Vispa.Views.TableView.TableView::updateContent(), Vispa.Views.BoxDecayView.BoxDecayView::updateContent(), and Vispa.Views.PropertyView.PropertyView::updateContent().

670  {
672  SizeType clearedEvents = size_;
674  elements_.clear();
675  size_ = 0;
676  used_ = 0;
677  return clearedEvents;
678  }
boost::mutex protectElements_
SequenceType::size_type SizeType
boost::mutex::scoped_lock LockType
detail::MemoryType used_
template<class T , class EnqPolicy >
bool stor::ConcurrentQueue< T, EnqPolicy >::deqNowait ( ValueType item)

Assign the value at the head of the queue to item and then remove the head of the queue. If successful, return true; on failure, return false. This function fill fail without waiting if the queue is empty. This function may throw any exception thrown by the assignment operator of type EnqPolicy::ValueType.

Definition at line 568 of file ConcurrentQueue.h.

References CommonMethods::lock().

569  {
571  return removeHeadIfPossible(item);
572  }
boost::mutex protectElements_
bool removeHeadIfPossible(ValueType &item)
boost::mutex::scoped_lock LockType
template<class T , class EnqPolicy >
bool stor::ConcurrentQueue< T, EnqPolicy >::deqTimedWait ( ValueType item,
boost::posix_time::time_duration const &  waitTime 
)

Assign the value at the head of the queue to item and then remove the head of the queue. If the queue is empty wait until is has become non-empty or until timeDuration has passed. Return true if an item has been removed from the queue or false if the timeout has expired. This may throw any exception thrown by the assignment operator of type EnqPolicy::ValueType.

Definition at line 586 of file ConcurrentQueue.h.

References CommonMethods::lock(), and edm::size_().

590  {
592  if (size_ == 0)
593  {
594  queueNotEmpty_.timed_wait(lock, waitTime);
595  }
596  return removeHeadIfPossible(item);
597  }
boost::mutex protectElements_
bool removeHeadIfPossible(ValueType &item)
boost::mutex::scoped_lock LockType
boost::condition queueNotEmpty_
template<class T , class EnqPolicy >
void stor::ConcurrentQueue< T, EnqPolicy >::deqWait ( ValueType item)

Assign the value of the head of the queue to item and then remove the head of the queue. If the queue is empty wait until is has become non-empty. This may throw any exception thrown by the assignment operator of type EnqPolicy::ValueType.

Definition at line 576 of file ConcurrentQueue.h.

References CommonMethods::lock(), and edm::size_().

577  {
579  while (size_ == 0) queueNotEmpty_.wait(lock);
580  removeHead(item);
581  }
boost::mutex protectElements_
boost::mutex::scoped_lock LockType
void removeHead(ValueType &item)
boost::condition queueNotEmpty_
template<class T , class EnqPolicy >
bool stor::ConcurrentQueue< T, EnqPolicy >::empty ( void  ) const

Return true if the queue is empty, and false if it is not.

Definition at line 601 of file ConcurrentQueue.h.

References edm::size_().

Referenced by Vispa.Gui.VispaWidget.TextField::setAutosizeFont(), and Vispa.Gui.VispaWidget.TextField::setAutotruncate().

602  {
603  // No lock is necessary: the read is atomic.
604  return size_ == 0;
605  }
template<class T, class EnqPolicy >
EnqPolicy::ReturnType stor::ConcurrentQueue< T, EnqPolicy >::enqNowait ( T const &  item)

Copying a ConcurrentQueue is illegal, as is asigning to a ConcurrentQueue. The copy constructor and copy assignment operator are both private and unimplemented. Add a copy if item to the queue, according to the rules determined by the EnqPolicy; see documentation above the the provided EnqPolicy choices. This may throw any exception thrown by the assignment operator of type T, or badAlloc.

Definition at line 532 of file ConcurrentQueue.h.

References CommonMethods::lock(), and edm::size_().

533  {
535  return EnqPolicy::doEnq
538  }
boost::mutex protectElements_
boost::mutex::scoped_lock LockType
detail::MemoryType used_
detail::MemoryType memory_
boost::condition queueNotEmpty_
template<class T, class EnqPolicy >
bool stor::ConcurrentQueue< T, EnqPolicy >::enqTimedWait ( T const &  p,
boost::posix_time::time_duration const &  waitTime 
)

Add a copy of item to the queue. If the queue is full wait until it becomes non-full or until timeDuration has passed. Return true if the items has been put onto the queue or false if the timeout has expired. This may throw any exception thrown by the assignment operator of T, or badAlloc.

Definition at line 553 of file ConcurrentQueue.h.

References CommonMethods::lock().

557  {
559  if ( isFull() )
560  {
561  queueNotFull_.timed_wait(lock, waitTime);
562  }
563  return insertIfPossible(item);
564  }
boost::mutex protectElements_
boost::mutex::scoped_lock LockType
boost::condition queueNotFull_
bool insertIfPossible(T const &item)
template<class T, class EnqPolicy >
void stor::ConcurrentQueue< T, EnqPolicy >::enqWait ( T const &  p)

Add a copy of item to the queue. If the queue is full wait until it becomes non-full. This may throw any exception thrown by the assignment operator of type T, or badAlloc.

Definition at line 542 of file ConcurrentQueue.h.

References CommonMethods::lock(), stor::detail::memoryUsage(), and edm::size_().

543  {
545  while ( isFull() ) queueNotFull_.wait(lock);
546  EnqPolicy::doInsert(item, elements_, size_,
548  }
boost::mutex protectElements_
boost::mutex::scoped_lock LockType
detail::MemoryType used_
MemoryType memoryUsage(const std::pair< T, size_t > &t)
boost::condition queueNotFull_
boost::condition queueNotEmpty_
template<class T , class EnqPolicy >
bool stor::ConcurrentQueue< T, EnqPolicy >::full ( ) const

Return true if the queue is full, and false if it is not.

Definition at line 609 of file ConcurrentQueue.h.

References CommonMethods::lock().

610  {
612  return isFull();
613  }
boost::mutex protectElements_
boost::mutex::scoped_lock LockType
template<class T, class EnqPolicy >
bool stor::ConcurrentQueue< T, EnqPolicy >::insertIfPossible ( T const &  item)
private

Definition at line 694 of file ConcurrentQueue.h.

References stor::detail::memoryUsage(), and edm::size_().

695  {
696  if ( isFull() )
697  {
699  return false;
700  }
701  else
702  {
703  EnqPolicy::doInsert(item, elements_, size_,
705  return true;
706  }
707  }
detail::MemoryType used_
MemoryType memoryUsage(const std::pair< T, size_t > &t)
boost::condition queueNotEmpty_
template<class T , class EnqPolicy >
bool stor::ConcurrentQueue< T, EnqPolicy >::isFull ( ) const
private

Definition at line 752 of file ConcurrentQueue.h.

References edm::size_().

753  {
754  if (size_ >= capacity_ || used_ >= memory_) return true;
755  return false;
756  }
detail::MemoryType used_
detail::MemoryType memory_
template<class T , class EnqPolicy >
detail::MemoryType stor::ConcurrentQueue< T, EnqPolicy >::memory ( ) const

Return the memory of the queue in bytes, that is, the maximum memory the items in the queue may occupy

Definition at line 651 of file ConcurrentQueue.h.

652  {
653  // No lock is necessary: the read is atomic.
654  return memory_;
655  }
detail::MemoryType memory_
template<class T, class EnqPolicy = FailIfFull<T>>
ConcurrentQueue& stor::ConcurrentQueue< T, EnqPolicy >::operator= ( ConcurrentQueue< T, EnqPolicy > const &  )
private
template<class T , class EnqPolicy >
void stor::ConcurrentQueue< T, EnqPolicy >::removeHead ( ValueType item)
private

Definition at line 721 of file ConcurrentQueue.h.

References stor::detail::memoryUsage(), and edm::size_().

722  {
723  SequenceType holder;
724  // Move the item out of elements_ in a manner that will not throw.
725  holder.splice(holder.begin(), elements_, elements_.begin());
726  // Record the change in the length of elements_.
727  --size_;
728  queueNotFull_.notify_one();
729 
730  assignItem(item, holder.front());
731  used_ -= detail::memoryUsage( item );
732  }
EnqPolicy::SequenceType SequenceType
detail::MemoryType used_
MemoryType memoryUsage(const std::pair< T, size_t > &t)
boost::condition queueNotFull_
void assignItem(T &item, const T &element)
template<class T , class EnqPolicy >
bool stor::ConcurrentQueue< T, EnqPolicy >::removeHeadIfPossible ( ValueType item)
private

Definition at line 711 of file ConcurrentQueue.h.

References edm::size_().

712  {
713  if (size_ == 0) return false;
714 
715  removeHead(item);
716  return true;
717  }
void removeHead(ValueType &item)
template<class T , class EnqPolicy >
bool stor::ConcurrentQueue< T, EnqPolicy >::setCapacity ( SizeType  n)

Reset the capacity of the queue. This can only be done if the queue is empty. This function returns false if the queue was not modified, and true if it was modified.

Definition at line 633 of file ConcurrentQueue.h.

References CommonMethods::lock(), and edm::size_().

634  {
636  bool isEmpty = (size_ == 0);
637  if (isEmpty) capacity_ = newcapacity;
638  return isEmpty;
639  }
boost::mutex protectElements_
boost::mutex::scoped_lock LockType
template<class T , class EnqPolicy >
bool stor::ConcurrentQueue< T, EnqPolicy >::setMemory ( detail::MemoryType  n)

Reset the memory usage in bytes of the queue. A value of 0 disabled the memory check. This can only be done if the queue is empty. This function returns false if the queue was not modified, and true if it was modified.

Definition at line 659 of file ConcurrentQueue.h.

References CommonMethods::lock(), and edm::size_().

660  {
662  bool isEmpty = (size_ == 0);
663  if (isEmpty) memory_ = newmemory;
664  return isEmpty;
665  }
boost::mutex protectElements_
boost::mutex::scoped_lock LockType
detail::MemoryType memory_
template<class T , class EnqPolicy >
ConcurrentQueue< T, EnqPolicy >::SizeType stor::ConcurrentQueue< T, EnqPolicy >::size ( void  ) const

Return the size of the queue, that is, the number of items it contains.

Definition at line 617 of file ConcurrentQueue.h.

References edm::size_().

618  {
619  // No lock is necessary: the read is atomic.
620  return size_;
621  }
template<class T , class EnqPolicy >
detail::MemoryType stor::ConcurrentQueue< T, EnqPolicy >::used ( ) const

Return the memory in bytes used by items in the queue

Definition at line 643 of file ConcurrentQueue.h.

644  {
645  // No lock is necessary: the read is atomic.
646  return used_;
647  }
detail::MemoryType used_

Member Data Documentation

template<class T, class EnqPolicy = FailIfFull<T>>
SizeType stor::ConcurrentQueue< T, EnqPolicy >::capacity_
private

Definition at line 441 of file ConcurrentQueue.h.

template<class T, class EnqPolicy = FailIfFull<T>>
SequenceType stor::ConcurrentQueue< T, EnqPolicy >::elements_
private

Definition at line 440 of file ConcurrentQueue.h.

template<class T, class EnqPolicy = FailIfFull<T>>
size_t stor::ConcurrentQueue< T, EnqPolicy >::elementsDropped_
private

Definition at line 452 of file ConcurrentQueue.h.

template<class T, class EnqPolicy = FailIfFull<T>>
detail::MemoryType stor::ConcurrentQueue< T, EnqPolicy >::memory_
private

Definition at line 450 of file ConcurrentQueue.h.

template<class T, class EnqPolicy = FailIfFull<T>>
boost::mutex stor::ConcurrentQueue< T, EnqPolicy >::protectElements_
mutableprivate

Definition at line 436 of file ConcurrentQueue.h.

template<class T, class EnqPolicy = FailIfFull<T>>
boost::condition stor::ConcurrentQueue< T, EnqPolicy >::queueNotEmpty_
mutableprivate

Definition at line 437 of file ConcurrentQueue.h.

template<class T, class EnqPolicy = FailIfFull<T>>
boost::condition stor::ConcurrentQueue< T, EnqPolicy >::queueNotFull_
mutableprivate

Definition at line 438 of file ConcurrentQueue.h.

template<class T, class EnqPolicy = FailIfFull<T>>
SizeType stor::ConcurrentQueue< T, EnqPolicy >::size_
private

Definition at line 442 of file ConcurrentQueue.h.

template<class T, class EnqPolicy = FailIfFull<T>>
detail::MemoryType stor::ConcurrentQueue< T, EnqPolicy >::used_
private

Definition at line 451 of file ConcurrentQueue.h.