CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
ConcurrentQueue.h
Go to the documentation of this file.
1 // $Id: ConcurrentQueue.h,v 1.13 2011/04/07 09:28:22 mommsen Exp $
3 
4 
5 #ifndef EventFilter_StorageManager_ConcurrentQueue_h
6 #define EventFilter_StorageManager_ConcurrentQueue_h
7 
8 #include <algorithm>
9 #include <cstddef>
10 #include <exception>
11 #include <limits>
12 #include <list>
13 
14 #include <iostream> // debugging
15 
16 #include <boost/date_time/posix_time/posix_time_types.hpp>
17 #include <boost/utility/enable_if.hpp>
18 #include <boost/thread/condition.hpp>
19 #include <boost/thread/mutex.hpp>
20 
21 namespace stor
22 {
50  namespace detail
51  {
52  typedef size_t MemoryType;
53 
54  /*
55  This template is using SFINAE to figure out if the class used to
56  instantiate the ConcurrentQueue template has a method memoryUsed
57  returning the number of bytes occupied by the class itself.
58  */
59  template <typename T>
61  {
62  typedef char TrueType;
63  struct FalseType{ TrueType _[2]; };
64 
65  template <MemoryType (T::*)() const>
66  struct TestConst;
67 
68  template <typename C>
70  template <typename C>
71  static FalseType test(...);
72 
73  public:
74  static const bool value = (sizeof(test<T>(0)) == sizeof(TrueType));
75  };
76 
77  template <typename T>
78  MemoryType
79  memoryUsage(const std::pair<T,size_t>& t)
80  {
81  MemoryType usage(0UL);
82  try
83  {
84  usage = t.first.memoryUsed();
85  }
86  catch(...)
87  {}
88  return usage;
89  }
90 
91  template <typename T>
92  typename boost::enable_if<hasMemoryUsed<T>, MemoryType>::type
93  memoryUsage(const T& t)
94  {
95  MemoryType usage(0UL);
96  try
97  {
98  usage = t.memoryUsed();
99  }
100  catch(...)
101  {}
102  return usage;
103  }
104 
105  template <typename T>
106  typename boost::disable_if<hasMemoryUsed<T>, MemoryType>::type
107  memoryUsage(const T& t)
108  { return sizeof(T); }
109 
110  }// end namespace detail
111 
112 
113  template <class T>
114  struct FailIfFull
115  {
116  typedef void ReturnType;
117 
118  typedef T ValueType;
119  typedef std::list<T> SequenceType;
121 
122  static struct QueueIsFull : public std::exception
123  {
124  virtual const char* what() const throw()
125  {
126  return "Cannot add item to a full queue";
127  }
128  } queueIsFull;
129 
130  static void doInsert
131  (
132  T const& item,
134  SizeType& size,
135  detail::MemoryType const& itemSize,
136  detail::MemoryType& used,
137  boost::condition& nonempty
138  )
139  {
140  elements.push_back(item);
141  ++size;
142  used += itemSize;
143  nonempty.notify_one();
144  }
145 
146  static ReturnType doEnq
147  (
148  T const& item,
149  SequenceType& elements,
150  SizeType& size,
151  SizeType& capacity,
152  detail::MemoryType& used,
154  size_t& elementsDropped,
155  boost::condition& nonempty
156  )
157  {
158  detail::MemoryType itemSize = detail::memoryUsage(item);
159  if (size >= capacity || used+itemSize > memory)
160  {
161  ++elementsDropped;
162  throw queueIsFull;
163  }
164  else
165  {
166  doInsert(item, elements, size, itemSize, used, nonempty);
167  }
168  }
169  };
170 
171 
172  template <class T>
173  struct KeepNewest
174  {
175  typedef std::pair<T,size_t> ValueType;
176  typedef std::list<T> SequenceType;
179 
180  static void doInsert
181  (
182  T const& item,
183  SequenceType& elements,
184  SizeType& size,
185  detail::MemoryType const& itemSize,
186  detail::MemoryType& used,
187  boost::condition& nonempty
188  )
189  {
190  elements.push_back(item);
191  ++size;
192  used += itemSize;
193  nonempty.notify_one();
194  }
195 
196  static ReturnType doEnq
197  (
198  T const& item,
199  SequenceType& elements,
200  SizeType& size,
201  SizeType& capacity,
202  detail::MemoryType& used,
204  size_t& elementsDropped,
205  boost::condition& nonempty
206  )
207  {
208  SizeType elementsRemoved(0);
209  detail::MemoryType itemSize = detail::memoryUsage(item);
210  while ( (size==capacity || used+itemSize > memory) && !elements.empty() )
211  {
212  SequenceType holder;
213  // Move the item out of elements in a manner that will not throw.
214  holder.splice(holder.begin(), elements, elements.begin());
215  // Record the change in the length of elements.
216  --size;
217  used -= detail::memoryUsage( holder.front() );
218  ++elementsRemoved;
219  }
220  if (size < capacity && used+itemSize <= memory)
221  // we succeeded to make enough room for the new element
222  {
223  doInsert(item, elements, size, itemSize, used, nonempty);
224  }
225  else
226  {
227  // we cannot add the new element
228  ++elementsRemoved;
229  }
230  elementsDropped += elementsRemoved;
231  return elementsRemoved;
232  }
233  };
234 
235 
236  template <class T>
238  {
239  typedef std::pair<T,size_t> ValueType;
240  typedef std::list<T> SequenceType;
243 
244  static void doInsert
245  (
246  T const& item,
247  SequenceType& elements,
248  SizeType& size,
249  detail::MemoryType const& itemSize,
250  detail::MemoryType& used,
251  boost::condition& nonempty
252  )
253  {
254  elements.push_back(item);
255  ++size;
256  used += itemSize;
257  nonempty.notify_one();
258  }
259 
260  static ReturnType doEnq
261  (
262  T const& item,
263  SequenceType& elements,
264  SizeType& size,
265  SizeType& capacity,
266  detail::MemoryType& used,
268  size_t& elementsDropped,
269  boost::condition& nonempty
270  )
271  {
272  detail::MemoryType itemSize = detail::memoryUsage(item);
273  if (size < capacity && used+itemSize <= memory)
274  {
275  doInsert(item, elements, size, itemSize, used, nonempty);
276  return 0;
277  }
278  ++elementsDropped;
279  return 1;
280  }
281  };
282 
287  template <class T, class EnqPolicy=FailIfFull<T> >
289  {
290  public:
291  typedef typename EnqPolicy::ValueType ValueType;
292  typedef typename EnqPolicy::SequenceType SequenceType;
294 
299  explicit ConcurrentQueue
300  (
303  );
304 
312 
325  typename EnqPolicy::ReturnType enqNowait(T const& item);
326 
332  void enqWait(T const& p);
333 
341  bool enqTimedWait(T const& p, boost::posix_time::time_duration const&);
342 
350  bool deqNowait(ValueType&);
351 
358  void deqWait(ValueType&);
359 
368  bool deqTimedWait(ValueType&, boost::posix_time::time_duration const&);
369 
373  bool empty() const;
374 
378  bool full() const;
379 
384  SizeType size() const;
385 
390  SizeType capacity() const;
391 
397  bool setCapacity(SizeType n);
398 
402  detail::MemoryType used() const;
403 
408  detail::MemoryType memory() const;
409 
417 
423  SizeType clear();
424 
429 
430 
431  private:
432  typedef boost::mutex::scoped_lock LockType;
433 
435  mutable boost::condition queueNotEmpty_;
436  mutable boost::condition queueNotFull_;
437 
441  /*
442  N.B.: we rely on SizeType *not* being some synthesized large
443  type, so that reading the value is an atomic action, as is
444  incrementing or decrementing the value. We do *not* assume that
445  there is any atomic getAndIncrement or getAndDecrement
446  operation.
447  */
451 
452  /*
453  These private member functions assume that whatever locks
454  necessary for safe operation have already been obtained.
455  */
456 
457  /*
458  Insert the given item into the list, if it is not already full,
459  and increment size. Return true if the item is inserted, and
460  false if not.
461  */
462  bool insertIfPossible(T const& item);
463 
464  /*
465  Remove the object at the head of the queue, if there is one, and
466  assign item the value of this object.The assignment may throw an
467  exception; even if it does, the head will have been removed from
468  the queue, and the size appropriately adjusted. It is assumed
469  the queue is nonempty. Return true if the queue was nonempty,
470  and false if the queue was empty.
471  */
472  bool removeHeadIfPossible(ValueType& item);
473 
474  /*
475  Remove the object at the head of the queue, and assign item the
476  value of this object. The assignment may throw an exception;
477  even if it does, the head will have been removed from the queue,
478  and the size appropriately adjusted. It is assumed the queue is
479  nonempty.
480  */
481  void removeHead(ValueType& item);
482 
483  void assignItem(T& item, const T& element);
484  void assignItem(std::pair<T,size_t>& item, const T& element);
485 
486  /*
487  Return false if the queue can accept new entries.
488  */
489  bool isFull() const;
490 
491  /*
492  These functions are declared private and not implemented to
493  prevent their use.
494  */
497  };
498 
499  //------------------------------------------------------------------
500  // Implementation follows
501  //------------------------------------------------------------------
502 
503  template <class T, class EnqPolicy>
505  (
507  detail::MemoryType maxMemory
508  ) :
509  protectElements_(),
510  elements_(),
511  capacity_(maxSize),
512  size_(0),
513  memory_(maxMemory),
514  used_(0),
515  elementsDropped_(0)
516  {}
517 
518  template <class T, class EnqPolicy>
520  {
521  LockType lock(protectElements_);
522  elements_.clear();
523  size_ = 0;
524  used_ = 0;
525  elementsDropped_ = 0;
526  }
527 
528  template <class T, class EnqPolicy>
529  typename EnqPolicy::ReturnType
531  {
532  LockType lock(protectElements_);
533  return EnqPolicy::doEnq
534  (item, elements_, size_, capacity_, used_, memory_,
535  elementsDropped_, queueNotEmpty_);
536  }
537 
538  template <class T, class EnqPolicy>
539  void
541  {
542  LockType lock(protectElements_);
543  while ( isFull() ) queueNotFull_.wait(lock);
544  EnqPolicy::doInsert(item, elements_, size_,
545  detail::memoryUsage(item), used_, queueNotEmpty_);
546  }
547 
548  template <class T, class EnqPolicy>
549  bool
551  (
552  T const& item,
553  boost::posix_time::time_duration const& waitTime
554  )
555  {
556  LockType lock(protectElements_);
557  if ( isFull() )
558  {
559  queueNotFull_.timed_wait(lock, waitTime);
560  }
561  return insertIfPossible(item);
562  }
563 
564  template <class T, class EnqPolicy>
565  bool
567  {
568  LockType lock(protectElements_);
569  return removeHeadIfPossible(item);
570  }
571 
572  template <class T, class EnqPolicy>
573  void
575  {
576  LockType lock(protectElements_);
577  while (size_ == 0) queueNotEmpty_.wait(lock);
578  removeHead(item);
579  }
580 
581  template <class T, class EnqPolicy>
582  bool
584  (
585  ValueType& item,
586  boost::posix_time::time_duration const& waitTime
587  )
588  {
589  LockType lock(protectElements_);
590  if (size_ == 0)
591  {
592  queueNotEmpty_.timed_wait(lock, waitTime);
593  }
594  return removeHeadIfPossible(item);
595  }
596 
597  template <class T, class EnqPolicy>
598  bool
600  {
601  // No lock is necessary: the read is atomic.
602  return size_ == 0;
603  }
604 
605  template <class T, class EnqPolicy>
606  bool
608  {
609  LockType lock(protectElements_);
610  return isFull();
611  }
612 
613  template <class T, class EnqPolicy>
616  {
617  // No lock is necessary: the read is atomic.
618  return size_;
619  }
620 
621  template <class T, class EnqPolicy>
624  {
625  // No lock is necessary: the read is atomic.
626  return capacity_;
627  }
628 
629  template <class T, class EnqPolicy>
630  bool
632  {
633  LockType lock(protectElements_);
634  bool isEmpty = (size_ == 0);
635  if (isEmpty) capacity_ = newcapacity;
636  return isEmpty;
637  }
638 
639  template <class T, class EnqPolicy>
642  {
643  // No lock is necessary: the read is atomic.
644  return used_;
645  }
646 
647  template <class T, class EnqPolicy>
650  {
651  // No lock is necessary: the read is atomic.
652  return memory_;
653  }
654 
655  template <class T, class EnqPolicy>
656  bool
658  {
659  LockType lock(protectElements_);
660  bool isEmpty = (size_ == 0);
661  if (isEmpty) memory_ = newmemory;
662  return isEmpty;
663  }
664 
665  template <class T, class EnqPolicy>
668  {
669  LockType lock(protectElements_);
670  SizeType clearedEvents = size_;
671  elementsDropped_ += size_;
672  elements_.clear();
673  size_ = 0;
674  used_ = 0;
675  return clearedEvents;
676  }
677 
678  template <class T, class EnqPolicy>
679  void
681  {
682  LockType lock(protectElements_);
683  elementsDropped_ += n;
684  }
685 
686  //-----------------------------------------------------------
687  // Private member functions
688  //-----------------------------------------------------------
689 
690  template <class T, class EnqPolicy>
691  bool
693  {
694  if ( isFull() )
695  {
696  ++elementsDropped_;
697  return false;
698  }
699  else
700  {
701  EnqPolicy::doInsert(item, elements_, size_,
702  detail::memoryUsage(item), used_, queueNotEmpty_);
703  return true;
704  }
705  }
706 
707  template <class T, class EnqPolicy>
708  bool
710  {
711  if (size_ == 0) return false;
712 
713  removeHead(item);
714  return true;
715  }
716 
717  template <class T, class EnqPolicy>
718  void
720  {
721  SequenceType holder;
722  // Move the item out of elements_ in a manner that will not throw.
723  holder.splice(holder.begin(), elements_, elements_.begin());
724  // Record the change in the length of elements_.
725  --size_;
726  queueNotFull_.notify_one();
727 
728  assignItem(item, holder.front());
729  used_ -= detail::memoryUsage( item );
730  }
731 
732  template <class T, class EnqPolicy>
733  void
735  {
736  item = element;
737  }
738 
739  template <class T, class EnqPolicy>
740  void
741  ConcurrentQueue<T,EnqPolicy>::assignItem(std::pair<T,size_t>& item, const T& element)
742  {
743  item.first = element;
744  item.second = elementsDropped_;
745  elementsDropped_ = 0;
746  }
747 
748  template <class T, class EnqPolicy>
749  bool
751  {
752  if (size_ >= capacity_ || used_ >= memory_) return true;
753  return false;
754  }
755 
756 } // namespace stor
757 
758 #endif // EventFilter_StorageManager_ConcurrentQueue_h
759 
766 
static ReturnType doEnq(T const &item, SequenceType &elements, SizeType &size, SizeType &capacity, detail::MemoryType &used, detail::MemoryType &memory, size_t &elementsDropped, boost::condition &nonempty)
type
Definition: HCALResponse.h:22
bool deqNowait(ValueType &)
void deqWait(ValueType &)
std::pair< T, size_t > ValueType
static TrueType test(TestConst<&C::memoryUsed > *)
bool setCapacity(SizeType n)
static boost::mutex mutex
Definition: LHEProxy.cc:11
Definition: vlib.h:187
static void doInsert(T const &item, SequenceType &elements, SizeType &size, detail::MemoryType const &itemSize, detail::MemoryType &used, boost::condition &nonempty)
SizeType size() const
bool enqTimedWait(T const &p, boost::posix_time::time_duration const &)
list elements
Definition: asciidump.py:414
ConcurrentQueue & operator=(ConcurrentQueue< T, EnqPolicy > const &)
uint16_t size_type
boost::mutex protectElements_
SequenceType::size_type SizeType
static ReturnType doEnq(T const &item, SequenceType &elements, SizeType &size, SizeType &capacity, detail::MemoryType &used, detail::MemoryType &memory, size_t &elementsDropped, boost::condition &nonempty)
EnqPolicy::ValueType ValueType
detail::MemoryType memory() const
EnqPolicy::ReturnType enqNowait(T const &item)
const T & max(const T &a, const T &b)
tuple maxSize
&#39;/store/data/Commissioning08/BeamHalo/RECO/StuffAlmostToP5_v1/000/061/642/10A0FE34-A67D-DD11-AD05-000...
SequenceType::size_type SizeType
bool deqTimedWait(ValueType &, boost::posix_time::time_duration const &)
SequenceType::size_type SizeType
EnqPolicy::SequenceType SequenceType
bool setMemory(detail::MemoryType n)
bool removeHeadIfPossible(ValueType &item)
std::list< T > SequenceType
static void doInsert(T const &item, SequenceType &elements, SizeType &size, detail::MemoryType const &itemSize, detail::MemoryType &used, boost::condition &nonempty)
static ReturnType doEnq(T const &item, SequenceType &elements, SizeType &size, SizeType &capacity, detail::MemoryType &used, detail::MemoryType &memory, size_t &elementsDropped, boost::condition &nonempty)
boost::mutex::scoped_lock LockType
detail::MemoryType used_
detail::MemoryType memory_
std::list< T > SequenceType
detail::MemoryType used() const
void enqWait(T const &p)
MemoryType memoryUsage(const std::pair< T, size_t > &t)
SizeType capacity() const
boost::condition queueNotFull_
ConcurrentQueue(SizeType maxSize=std::numeric_limits< SizeType >::max(), detail::MemoryType maxMemory=std::numeric_limits< detail::MemoryType >::max())
void assignItem(T &item, const T &element)
static void doInsert(T const &item, SequenceType &elements, SizeType &size, detail::MemoryType const &itemSize, detail::MemoryType &used, boost::condition &nonempty)
std::pair< T, size_t > ValueType
void usage()
Definition: array2xmlEB.cc:14
size_(0)
Definition: OwnArray.h:181
std::list< T > SequenceType
bool insertIfPossible(T const &item)
void removeHead(ValueType &item)
stor::FailIfFull::QueueIsFull queueIsFull
long double T
tuple size
Write out results.
boost::condition queueNotEmpty_
virtual const char * what() const
void addExternallyDroppedEvents(SizeType)
SequenceType::size_type SizeType