CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
ConcurrentQueue.h
Go to the documentation of this file.
1 // $Id: ConcurrentQueue.h,v 1.15 2013/01/07 11:16:27 eulisse Exp $
3 
4 
5 #ifndef EventFilter_StorageManager_ConcurrentQueue_h
6 #define EventFilter_StorageManager_ConcurrentQueue_h
7 
8 #include <algorithm>
9 #include <cstddef>
10 #include <exception>
11 #include <limits>
12 #include <list>
13 
14 #include <iostream> // debugging
15 
16 #include <boost/date_time/posix_time/posix_time_types.hpp>
17 #include <boost/utility/enable_if.hpp>
18 #include <boost/thread/condition.hpp>
19 #include <boost/thread/mutex.hpp>
20 
21 namespace stor
22 {
50  namespace detail
51  {
52  typedef size_t MemoryType;
53 
54  /*
55  This template is using SFINAE to figure out if the class used to
56  instantiate the ConcurrentQueue template has a method memoryUsed
57  returning the number of bytes occupied by the class itself.
58  */
59  template <typename T>
61  {
62  typedef char TrueType;
63  struct FalseType{ TrueType _[2]; };
64 
65  template <MemoryType (T::*)() const>
66  struct TestConst;
67 
68  template <typename C>
70  template <typename C>
71  static FalseType test(...);
72 
73  public:
74  static const bool value = (sizeof(test<T>(0)) == sizeof(TrueType));
75  };
76 
77  template <typename T>
78  MemoryType
79  memoryUsage(const std::pair<T,size_t>& t)
80  {
81  MemoryType usage(0UL);
82  try
83  {
84  usage = t.first.memoryUsed();
85  }
86  catch(...)
87  {}
88  return usage;
89  }
90 
91  template <typename T>
92  typename boost::enable_if<hasMemoryUsed<T>, MemoryType>::type
93  memoryUsage(const T& t)
94  {
95  MemoryType usage(0UL);
96  try
97  {
98  usage = t.memoryUsed();
99  }
100  catch(...)
101  {}
102  return usage;
103  }
104 
105  template <typename T>
106  typename boost::disable_if<hasMemoryUsed<T>, MemoryType>::type
107  memoryUsage(const T& t)
108  { return sizeof(T); }
109 
110  }// end namespace detail
111 
112 
113  template <class T>
114  struct FailIfFull
115  {
116  typedef void ReturnType;
117 
118  typedef T ValueType;
119  typedef std::list<T> SequenceType;
121 
122  static const struct QueueIsFull : public std::exception
123  {
125  virtual const char* what() const throw()
126  {
127  return "Cannot add item to a full queue";
128  }
129  } queueIsFull;
130 
131  static void doInsert
132  (
133  T const& item,
135  SizeType& size,
136  detail::MemoryType const& itemSize,
137  detail::MemoryType& used,
138  boost::condition& nonempty
139  )
140  {
141  elements.push_back(item);
142  ++size;
143  used += itemSize;
144  nonempty.notify_one();
145  }
146 
147  static ReturnType doEnq
148  (
149  T const& item,
150  SequenceType& elements,
151  SizeType& size,
152  SizeType& capacity,
153  detail::MemoryType& used,
155  size_t& elementsDropped,
156  boost::condition& nonempty
157  )
158  {
159  detail::MemoryType itemSize = detail::memoryUsage(item);
160  if (size >= capacity || used+itemSize > memory)
161  {
162  ++elementsDropped;
163  throw queueIsFull;
164  }
165  else
166  {
167  doInsert(item, elements, size, itemSize, used, nonempty);
168  }
169  }
170  };
171  template <class T>
173 
174  template <class T>
175  struct KeepNewest
176  {
177  typedef std::pair<T,size_t> ValueType;
178  typedef std::list<T> SequenceType;
181 
182  static void doInsert
183  (
184  T const& item,
185  SequenceType& elements,
186  SizeType& size,
187  detail::MemoryType const& itemSize,
188  detail::MemoryType& used,
189  boost::condition& nonempty
190  )
191  {
192  elements.push_back(item);
193  ++size;
194  used += itemSize;
195  nonempty.notify_one();
196  }
197 
198  static ReturnType doEnq
199  (
200  T const& item,
201  SequenceType& elements,
202  SizeType& size,
203  SizeType& capacity,
204  detail::MemoryType& used,
206  size_t& elementsDropped,
207  boost::condition& nonempty
208  )
209  {
210  SizeType elementsRemoved(0);
211  detail::MemoryType itemSize = detail::memoryUsage(item);
212  while ( (size==capacity || used+itemSize > memory) && !elements.empty() )
213  {
214  SequenceType holder;
215  // Move the item out of elements in a manner that will not throw.
216  holder.splice(holder.begin(), elements, elements.begin());
217  // Record the change in the length of elements.
218  --size;
219  used -= detail::memoryUsage( holder.front() );
220  ++elementsRemoved;
221  }
222  if (size < capacity && used+itemSize <= memory)
223  // we succeeded to make enough room for the new element
224  {
225  doInsert(item, elements, size, itemSize, used, nonempty);
226  }
227  else
228  {
229  // we cannot add the new element
230  ++elementsRemoved;
231  }
232  elementsDropped += elementsRemoved;
233  return elementsRemoved;
234  }
235  };
236 
237 
238  template <class T>
240  {
241  typedef std::pair<T,size_t> ValueType;
242  typedef std::list<T> SequenceType;
245 
246  static void doInsert
247  (
248  T const& item,
249  SequenceType& elements,
250  SizeType& size,
251  detail::MemoryType const& itemSize,
252  detail::MemoryType& used,
253  boost::condition& nonempty
254  )
255  {
256  elements.push_back(item);
257  ++size;
258  used += itemSize;
259  nonempty.notify_one();
260  }
261 
262  static ReturnType doEnq
263  (
264  T const& item,
265  SequenceType& elements,
266  SizeType& size,
267  SizeType& capacity,
268  detail::MemoryType& used,
270  size_t& elementsDropped,
271  boost::condition& nonempty
272  )
273  {
274  detail::MemoryType itemSize = detail::memoryUsage(item);
275  if (size < capacity && used+itemSize <= memory)
276  {
277  doInsert(item, elements, size, itemSize, used, nonempty);
278  return 0;
279  }
280  ++elementsDropped;
281  return 1;
282  }
283  };
284 
289  template <class T, class EnqPolicy=FailIfFull<T> >
291  {
292  public:
293  typedef typename EnqPolicy::ValueType ValueType;
294  typedef typename EnqPolicy::SequenceType SequenceType;
296 
301  explicit ConcurrentQueue
302  (
305  );
306 
314 
327  typename EnqPolicy::ReturnType enqNowait(T const& item);
328 
334  void enqWait(T const& p);
335 
343  bool enqTimedWait(T const& p, boost::posix_time::time_duration const&);
344 
352  bool deqNowait(ValueType&);
353 
360  void deqWait(ValueType&);
361 
370  bool deqTimedWait(ValueType&, boost::posix_time::time_duration const&);
371 
375  bool empty() const;
376 
380  bool full() const;
381 
386  SizeType size() const;
387 
392  SizeType capacity() const;
393 
399  bool setCapacity(SizeType n);
400 
404  detail::MemoryType used() const;
405 
410  detail::MemoryType memory() const;
411 
419 
425  SizeType clear();
426 
431 
432 
433  private:
434  typedef boost::mutex::scoped_lock LockType;
435 
437  mutable boost::condition queueNotEmpty_;
438  mutable boost::condition queueNotFull_;
439 
443  /*
444  N.B.: we rely on SizeType *not* being some synthesized large
445  type, so that reading the value is an atomic action, as is
446  incrementing or decrementing the value. We do *not* assume that
447  there is any atomic getAndIncrement or getAndDecrement
448  operation.
449  */
453 
454  /*
455  These private member functions assume that whatever locks
456  necessary for safe operation have already been obtained.
457  */
458 
459  /*
460  Insert the given item into the list, if it is not already full,
461  and increment size. Return true if the item is inserted, and
462  false if not.
463  */
464  bool insertIfPossible(T const& item);
465 
466  /*
467  Remove the object at the head of the queue, if there is one, and
468  assign item the value of this object.The assignment may throw an
469  exception; even if it does, the head will have been removed from
470  the queue, and the size appropriately adjusted. It is assumed
471  the queue is nonempty. Return true if the queue was nonempty,
472  and false if the queue was empty.
473  */
474  bool removeHeadIfPossible(ValueType& item);
475 
476  /*
477  Remove the object at the head of the queue, and assign item the
478  value of this object. The assignment may throw an exception;
479  even if it does, the head will have been removed from the queue,
480  and the size appropriately adjusted. It is assumed the queue is
481  nonempty.
482  */
483  void removeHead(ValueType& item);
484 
485  void assignItem(T& item, const T& element);
486  void assignItem(std::pair<T,size_t>& item, const T& element);
487 
488  /*
489  Return false if the queue can accept new entries.
490  */
491  bool isFull() const;
492 
493  /*
494  These functions are declared private and not implemented to
495  prevent their use.
496  */
499  };
500 
501  //------------------------------------------------------------------
502  // Implementation follows
503  //------------------------------------------------------------------
504 
505  template <class T, class EnqPolicy>
507  (
509  detail::MemoryType maxMemory
510  ) :
511  protectElements_(),
512  elements_(),
513  capacity_(maxSize),
514  size_(0),
515  memory_(maxMemory),
516  used_(0),
517  elementsDropped_(0)
518  {}
519 
520  template <class T, class EnqPolicy>
522  {
523  LockType lock(protectElements_);
524  elements_.clear();
525  size_ = 0;
526  used_ = 0;
527  elementsDropped_ = 0;
528  }
529 
530  template <class T, class EnqPolicy>
531  typename EnqPolicy::ReturnType
533  {
534  LockType lock(protectElements_);
535  return EnqPolicy::doEnq
536  (item, elements_, size_, capacity_, used_, memory_,
537  elementsDropped_, queueNotEmpty_);
538  }
539 
540  template <class T, class EnqPolicy>
541  void
543  {
544  LockType lock(protectElements_);
545  while ( isFull() ) queueNotFull_.wait(lock);
546  EnqPolicy::doInsert(item, elements_, size_,
547  detail::memoryUsage(item), used_, queueNotEmpty_);
548  }
549 
550  template <class T, class EnqPolicy>
551  bool
553  (
554  T const& item,
555  boost::posix_time::time_duration const& waitTime
556  )
557  {
558  LockType lock(protectElements_);
559  if ( isFull() )
560  {
561  queueNotFull_.timed_wait(lock, waitTime);
562  }
563  return insertIfPossible(item);
564  }
565 
566  template <class T, class EnqPolicy>
567  bool
569  {
570  LockType lock(protectElements_);
571  return removeHeadIfPossible(item);
572  }
573 
574  template <class T, class EnqPolicy>
575  void
577  {
578  LockType lock(protectElements_);
579  while (size_ == 0) queueNotEmpty_.wait(lock);
580  removeHead(item);
581  }
582 
583  template <class T, class EnqPolicy>
584  bool
586  (
587  ValueType& item,
588  boost::posix_time::time_duration const& waitTime
589  )
590  {
591  LockType lock(protectElements_);
592  if (size_ == 0)
593  {
594  queueNotEmpty_.timed_wait(lock, waitTime);
595  }
596  return removeHeadIfPossible(item);
597  }
598 
599  template <class T, class EnqPolicy>
600  bool
602  {
603  // No lock is necessary: the read is atomic.
604  return size_ == 0;
605  }
606 
607  template <class T, class EnqPolicy>
608  bool
610  {
611  LockType lock(protectElements_);
612  return isFull();
613  }
614 
615  template <class T, class EnqPolicy>
618  {
619  // No lock is necessary: the read is atomic.
620  return size_;
621  }
622 
623  template <class T, class EnqPolicy>
626  {
627  // No lock is necessary: the read is atomic.
628  return capacity_;
629  }
630 
631  template <class T, class EnqPolicy>
632  bool
634  {
635  LockType lock(protectElements_);
636  bool isEmpty = (size_ == 0);
637  if (isEmpty) capacity_ = newcapacity;
638  return isEmpty;
639  }
640 
641  template <class T, class EnqPolicy>
644  {
645  // No lock is necessary: the read is atomic.
646  return used_;
647  }
648 
649  template <class T, class EnqPolicy>
652  {
653  // No lock is necessary: the read is atomic.
654  return memory_;
655  }
656 
657  template <class T, class EnqPolicy>
658  bool
660  {
661  LockType lock(protectElements_);
662  bool isEmpty = (size_ == 0);
663  if (isEmpty) memory_ = newmemory;
664  return isEmpty;
665  }
666 
667  template <class T, class EnqPolicy>
670  {
671  LockType lock(protectElements_);
672  SizeType clearedEvents = size_;
673  elementsDropped_ += size_;
674  elements_.clear();
675  size_ = 0;
676  used_ = 0;
677  return clearedEvents;
678  }
679 
680  template <class T, class EnqPolicy>
681  void
683  {
684  LockType lock(protectElements_);
685  elementsDropped_ += n;
686  }
687 
688  //-----------------------------------------------------------
689  // Private member functions
690  //-----------------------------------------------------------
691 
692  template <class T, class EnqPolicy>
693  bool
695  {
696  if ( isFull() )
697  {
698  ++elementsDropped_;
699  return false;
700  }
701  else
702  {
703  EnqPolicy::doInsert(item, elements_, size_,
704  detail::memoryUsage(item), used_, queueNotEmpty_);
705  return true;
706  }
707  }
708 
709  template <class T, class EnqPolicy>
710  bool
712  {
713  if (size_ == 0) return false;
714 
715  removeHead(item);
716  return true;
717  }
718 
719  template <class T, class EnqPolicy>
720  void
722  {
723  SequenceType holder;
724  // Move the item out of elements_ in a manner that will not throw.
725  holder.splice(holder.begin(), elements_, elements_.begin());
726  // Record the change in the length of elements_.
727  --size_;
728  queueNotFull_.notify_one();
729 
730  assignItem(item, holder.front());
731  used_ -= detail::memoryUsage( item );
732  }
733 
734  template <class T, class EnqPolicy>
735  void
737  {
738  item = element;
739  }
740 
741  template <class T, class EnqPolicy>
742  void
743  ConcurrentQueue<T,EnqPolicy>::assignItem(std::pair<T,size_t>& item, const T& element)
744  {
745  item.first = element;
746  item.second = elementsDropped_;
747  elementsDropped_ = 0;
748  }
749 
750  template <class T, class EnqPolicy>
751  bool
753  {
754  if (size_ >= capacity_ || used_ >= memory_) return true;
755  return false;
756  }
757 
758 } // namespace stor
759 
760 #endif // EventFilter_StorageManager_ConcurrentQueue_h
761 
768 
static ReturnType doEnq(T const &item, SequenceType &elements, SizeType &size, SizeType &capacity, detail::MemoryType &used, detail::MemoryType &memory, size_t &elementsDropped, boost::condition &nonempty)
type
Definition: HCALResponse.h:21
bool deqNowait(ValueType &)
void deqWait(ValueType &)
std::pair< T, size_t > ValueType
static TrueType test(TestConst<&C::memoryUsed > *)
bool setCapacity(SizeType n)
static boost::mutex mutex
Definition: LHEProxy.cc:11
Definition: vlib.h:187
stor::FailIfFull::QueueIsFull queueIsFull
static void doInsert(T const &item, SequenceType &elements, SizeType &size, detail::MemoryType const &itemSize, detail::MemoryType &used, boost::condition &nonempty)
SizeType size() const
bool enqTimedWait(T const &p, boost::posix_time::time_duration const &)
list elements
Definition: asciidump.py:414
ConcurrentQueue & operator=(ConcurrentQueue< T, EnqPolicy > const &)
uint16_t size_type
boost::mutex protectElements_
SequenceType::size_type SizeType
static ReturnType doEnq(T const &item, SequenceType &elements, SizeType &size, SizeType &capacity, detail::MemoryType &used, detail::MemoryType &memory, size_t &elementsDropped, boost::condition &nonempty)
EnqPolicy::ValueType ValueType
detail::MemoryType memory() const
EnqPolicy::ReturnType enqNowait(T const &item)
const T & max(const T &a, const T &b)
tuple maxSize
&#39;/store/data/Commissioning08/BeamHalo/RECO/StuffAlmostToP5_v1/000/061/642/10A0FE34-A67D-DD11-AD05-000...
SequenceType::size_type SizeType
bool deqTimedWait(ValueType &, boost::posix_time::time_duration const &)
SequenceType::size_type SizeType
EnqPolicy::SequenceType SequenceType
bool setMemory(detail::MemoryType n)
bool removeHeadIfPossible(ValueType &item)
std::list< T > SequenceType
static void doInsert(T const &item, SequenceType &elements, SizeType &size, detail::MemoryType const &itemSize, detail::MemoryType &used, boost::condition &nonempty)
static ReturnType doEnq(T const &item, SequenceType &elements, SizeType &size, SizeType &capacity, detail::MemoryType &used, detail::MemoryType &memory, size_t &elementsDropped, boost::condition &nonempty)
boost::mutex::scoped_lock LockType
detail::MemoryType used_
detail::MemoryType memory_
std::list< T > SequenceType
detail::MemoryType used() const
void enqWait(T const &p)
MemoryType memoryUsage(const std::pair< T, size_t > &t)
SizeType capacity() const
boost::condition queueNotFull_
ConcurrentQueue(SizeType maxSize=std::numeric_limits< SizeType >::max(), detail::MemoryType maxMemory=std::numeric_limits< detail::MemoryType >::max())
void assignItem(T &item, const T &element)
static void doInsert(T const &item, SequenceType &elements, SizeType &size, detail::MemoryType const &itemSize, detail::MemoryType &used, boost::condition &nonempty)
std::pair< T, size_t > ValueType
void usage()
Definition: array2xmlEB.cc:14
size_(0)
Definition: OwnArray.h:181
std::list< T > SequenceType
bool insertIfPossible(T const &item)
void removeHead(ValueType &item)
long double T
tuple size
Write out results.
boost::condition queueNotEmpty_
virtual const char * what() const
void addExternallyDroppedEvents(SizeType)
SequenceType::size_type SizeType