00001
00003
00004
00005 #ifndef EventFilter_StorageManager_ConcurrentQueue_h
00006 #define EventFilter_StorageManager_ConcurrentQueue_h
00007
00008 #include <algorithm>
00009 #include <cstddef>
00010 #include <exception>
00011 #include <limits>
00012 #include <list>
00013
00014 #include <iostream>
00015
00016 #include <boost/date_time/posix_time/posix_time_types.hpp>
00017 #include <boost/utility/enable_if.hpp>
00018 #include <boost/thread/condition.hpp>
00019 #include <boost/thread/mutex.hpp>
00020
00021 namespace stor
00022 {
00050 namespace detail
00051 {
00052 typedef size_t MemoryType;
00053
00054
00055
00056
00057
00058
00059 template <typename T>
00060 class hasMemoryUsed
00061 {
00062 typedef char TrueType;
00063 struct FalseType{ TrueType _[2]; };
00064
00065 template <MemoryType (T::*)() const>
00066 struct TestConst;
00067
00068 template <typename C>
00069 static TrueType test( TestConst<&C::memoryUsed>* );
00070 template <typename C>
00071 static FalseType test(...);
00072
00073 public:
00074 static const bool value = (sizeof(test<T>(0)) == sizeof(TrueType));
00075 };
00076
00077 template <typename T>
00078 MemoryType
00079 memoryUsage(const std::pair<T,size_t>& t)
00080 {
00081 MemoryType usage(0UL);
00082 try
00083 {
00084 usage = t.first.memoryUsed();
00085 }
00086 catch(...)
00087 {}
00088 return usage;
00089 }
00090
00091 template <typename T>
00092 typename boost::enable_if<hasMemoryUsed<T>, MemoryType>::type
00093 memoryUsage(const T& t)
00094 {
00095 MemoryType usage(0UL);
00096 try
00097 {
00098 usage = t.memoryUsed();
00099 }
00100 catch(...)
00101 {}
00102 return usage;
00103 }
00104
00105 template <typename T>
00106 typename boost::disable_if<hasMemoryUsed<T>, MemoryType>::type
00107 memoryUsage(const T& t)
00108 { return sizeof(T); }
00109
00110 }
00111
00112
00113 template <class T>
00114 struct FailIfFull
00115 {
00116 typedef void ReturnType;
00117
00118 typedef T ValueType;
00119 typedef std::list<T> SequenceType;
00120 typedef typename SequenceType::size_type SizeType;
00121
00122 static const struct QueueIsFull : public std::exception
00123 {
00124 QueueIsFull() {};
00125 virtual const char* what() const throw()
00126 {
00127 return "Cannot add item to a full queue";
00128 }
00129 } queueIsFull;
00130
00131 static void doInsert
00132 (
00133 T const& item,
00134 SequenceType& elements,
00135 SizeType& size,
00136 detail::MemoryType const& itemSize,
00137 detail::MemoryType& used,
00138 boost::condition& nonempty
00139 )
00140 {
00141 elements.push_back(item);
00142 ++size;
00143 used += itemSize;
00144 nonempty.notify_one();
00145 }
00146
00147 static ReturnType doEnq
00148 (
00149 T const& item,
00150 SequenceType& elements,
00151 SizeType& size,
00152 SizeType& capacity,
00153 detail::MemoryType& used,
00154 detail::MemoryType& memory,
00155 size_t& elementsDropped,
00156 boost::condition& nonempty
00157 )
00158 {
00159 detail::MemoryType itemSize = detail::memoryUsage(item);
00160 if (size >= capacity || used+itemSize > memory)
00161 {
00162 ++elementsDropped;
00163 throw queueIsFull;
00164 }
00165 else
00166 {
00167 doInsert(item, elements, size, itemSize, used, nonempty);
00168 }
00169 }
00170 };
00171 template <class T>
00172 const typename FailIfFull<T>::QueueIsFull FailIfFull<T>::queueIsFull;
00173
00174 template <class T>
00175 struct KeepNewest
00176 {
00177 typedef std::pair<T,size_t> ValueType;
00178 typedef std::list<T> SequenceType;
00179 typedef typename SequenceType::size_type SizeType;
00180 typedef SizeType ReturnType;
00181
00182 static void doInsert
00183 (
00184 T const& item,
00185 SequenceType& elements,
00186 SizeType& size,
00187 detail::MemoryType const& itemSize,
00188 detail::MemoryType& used,
00189 boost::condition& nonempty
00190 )
00191 {
00192 elements.push_back(item);
00193 ++size;
00194 used += itemSize;
00195 nonempty.notify_one();
00196 }
00197
00198 static ReturnType doEnq
00199 (
00200 T const& item,
00201 SequenceType& elements,
00202 SizeType& size,
00203 SizeType& capacity,
00204 detail::MemoryType& used,
00205 detail::MemoryType& memory,
00206 size_t& elementsDropped,
00207 boost::condition& nonempty
00208 )
00209 {
00210 SizeType elementsRemoved(0);
00211 detail::MemoryType itemSize = detail::memoryUsage(item);
00212 while ( (size==capacity || used+itemSize > memory) && !elements.empty() )
00213 {
00214 SequenceType holder;
00215
00216 holder.splice(holder.begin(), elements, elements.begin());
00217
00218 --size;
00219 used -= detail::memoryUsage( holder.front() );
00220 ++elementsRemoved;
00221 }
00222 if (size < capacity && used+itemSize <= memory)
00223
00224 {
00225 doInsert(item, elements, size, itemSize, used, nonempty);
00226 }
00227 else
00228 {
00229
00230 ++elementsRemoved;
00231 }
00232 elementsDropped += elementsRemoved;
00233 return elementsRemoved;
00234 }
00235 };
00236
00237
00238 template <class T>
00239 struct RejectNewest
00240 {
00241 typedef std::pair<T,size_t> ValueType;
00242 typedef std::list<T> SequenceType;
00243 typedef typename SequenceType::size_type SizeType;
00244 typedef SizeType ReturnType;
00245
00246 static void doInsert
00247 (
00248 T const& item,
00249 SequenceType& elements,
00250 SizeType& size,
00251 detail::MemoryType const& itemSize,
00252 detail::MemoryType& used,
00253 boost::condition& nonempty
00254 )
00255 {
00256 elements.push_back(item);
00257 ++size;
00258 used += itemSize;
00259 nonempty.notify_one();
00260 }
00261
00262 static ReturnType doEnq
00263 (
00264 T const& item,
00265 SequenceType& elements,
00266 SizeType& size,
00267 SizeType& capacity,
00268 detail::MemoryType& used,
00269 detail::MemoryType& memory,
00270 size_t& elementsDropped,
00271 boost::condition& nonempty
00272 )
00273 {
00274 detail::MemoryType itemSize = detail::memoryUsage(item);
00275 if (size < capacity && used+itemSize <= memory)
00276 {
00277 doInsert(item, elements, size, itemSize, used, nonempty);
00278 return 0;
00279 }
00280 ++elementsDropped;
00281 return 1;
00282 }
00283 };
00284
00289 template <class T, class EnqPolicy=FailIfFull<T> >
00290 class ConcurrentQueue
00291 {
00292 public:
00293 typedef typename EnqPolicy::ValueType ValueType;
00294 typedef typename EnqPolicy::SequenceType SequenceType;
00295 typedef typename SequenceType::size_type SizeType;
00296
00301 explicit ConcurrentQueue
00302 (
00303 SizeType maxSize = std::numeric_limits<SizeType>::max(),
00304 detail::MemoryType maxMemory = std::numeric_limits<detail::MemoryType>::max()
00305 );
00306
00313 ~ConcurrentQueue();
00314
00327 typename EnqPolicy::ReturnType enqNowait(T const& item);
00328
00334 void enqWait(T const& p);
00335
00343 bool enqTimedWait(T const& p, boost::posix_time::time_duration const&);
00344
00352 bool deqNowait(ValueType&);
00353
00360 void deqWait(ValueType&);
00361
00370 bool deqTimedWait(ValueType&, boost::posix_time::time_duration const&);
00371
00375 bool empty() const;
00376
00380 bool full() const;
00381
00386 SizeType size() const;
00387
00392 SizeType capacity() const;
00393
00399 bool setCapacity(SizeType n);
00400
00404 detail::MemoryType used() const;
00405
00410 detail::MemoryType memory() const;
00411
00418 bool setMemory(detail::MemoryType n);
00419
00425 SizeType clear();
00426
00430 void addExternallyDroppedEvents(SizeType);
00431
00432
00433 private:
00434 typedef boost::mutex::scoped_lock LockType;
00435
00436 mutable boost::mutex protectElements_;
00437 mutable boost::condition queueNotEmpty_;
00438 mutable boost::condition queueNotFull_;
00439
00440 SequenceType elements_;
00441 SizeType capacity_;
00442 SizeType size_;
00443
00444
00445
00446
00447
00448
00449
00450 detail::MemoryType memory_;
00451 detail::MemoryType used_;
00452 size_t elementsDropped_;
00453
00454
00455
00456
00457
00458
00459
00460
00461
00462
00463
00464 bool insertIfPossible(T const& item);
00465
00466
00467
00468
00469
00470
00471
00472
00473
00474 bool removeHeadIfPossible(ValueType& item);
00475
00476
00477
00478
00479
00480
00481
00482
00483 void removeHead(ValueType& item);
00484
00485 void assignItem(T& item, const T& element);
00486 void assignItem(std::pair<T,size_t>& item, const T& element);
00487
00488
00489
00490
00491 bool isFull() const;
00492
00493
00494
00495
00496
00497 ConcurrentQueue(ConcurrentQueue<T,EnqPolicy> const&);
00498 ConcurrentQueue& operator=(ConcurrentQueue<T,EnqPolicy> const&);
00499 };
00500
00501
00502
00503
00504
00505 template <class T, class EnqPolicy>
00506 ConcurrentQueue<T,EnqPolicy>::ConcurrentQueue
00507 (
00508 SizeType maxSize,
00509 detail::MemoryType maxMemory
00510 ) :
00511 protectElements_(),
00512 elements_(),
00513 capacity_(maxSize),
00514 size_(0),
00515 memory_(maxMemory),
00516 used_(0),
00517 elementsDropped_(0)
00518 {}
00519
00520 template <class T, class EnqPolicy>
00521 ConcurrentQueue<T,EnqPolicy>::~ConcurrentQueue()
00522 {
00523 LockType lock(protectElements_);
00524 elements_.clear();
00525 size_ = 0;
00526 used_ = 0;
00527 elementsDropped_ = 0;
00528 }
00529
00530 template <class T, class EnqPolicy>
00531 typename EnqPolicy::ReturnType
00532 ConcurrentQueue<T,EnqPolicy>::enqNowait(T const& item)
00533 {
00534 LockType lock(protectElements_);
00535 return EnqPolicy::doEnq
00536 (item, elements_, size_, capacity_, used_, memory_,
00537 elementsDropped_, queueNotEmpty_);
00538 }
00539
00540 template <class T, class EnqPolicy>
00541 void
00542 ConcurrentQueue<T,EnqPolicy>::enqWait(T const& item)
00543 {
00544 LockType lock(protectElements_);
00545 while ( isFull() ) queueNotFull_.wait(lock);
00546 EnqPolicy::doInsert(item, elements_, size_,
00547 detail::memoryUsage(item), used_, queueNotEmpty_);
00548 }
00549
00550 template <class T, class EnqPolicy>
00551 bool
00552 ConcurrentQueue<T,EnqPolicy>::enqTimedWait
00553 (
00554 T const& item,
00555 boost::posix_time::time_duration const& waitTime
00556 )
00557 {
00558 LockType lock(protectElements_);
00559 if ( isFull() )
00560 {
00561 queueNotFull_.timed_wait(lock, waitTime);
00562 }
00563 return insertIfPossible(item);
00564 }
00565
00566 template <class T, class EnqPolicy>
00567 bool
00568 ConcurrentQueue<T,EnqPolicy>::deqNowait(ValueType& item)
00569 {
00570 LockType lock(protectElements_);
00571 return removeHeadIfPossible(item);
00572 }
00573
00574 template <class T, class EnqPolicy>
00575 void
00576 ConcurrentQueue<T,EnqPolicy>::deqWait(ValueType& item)
00577 {
00578 LockType lock(protectElements_);
00579 while (size_ == 0) queueNotEmpty_.wait(lock);
00580 removeHead(item);
00581 }
00582
00583 template <class T, class EnqPolicy>
00584 bool
00585 ConcurrentQueue<T,EnqPolicy>::deqTimedWait
00586 (
00587 ValueType& item,
00588 boost::posix_time::time_duration const& waitTime
00589 )
00590 {
00591 LockType lock(protectElements_);
00592 if (size_ == 0)
00593 {
00594 queueNotEmpty_.timed_wait(lock, waitTime);
00595 }
00596 return removeHeadIfPossible(item);
00597 }
00598
00599 template <class T, class EnqPolicy>
00600 bool
00601 ConcurrentQueue<T,EnqPolicy>::empty() const
00602 {
00603
00604 return size_ == 0;
00605 }
00606
00607 template <class T, class EnqPolicy>
00608 bool
00609 ConcurrentQueue<T,EnqPolicy>::full() const
00610 {
00611 LockType lock(protectElements_);
00612 return isFull();
00613 }
00614
00615 template <class T, class EnqPolicy>
00616 typename ConcurrentQueue<T,EnqPolicy>::SizeType
00617 ConcurrentQueue<T,EnqPolicy>::size() const
00618 {
00619
00620 return size_;
00621 }
00622
00623 template <class T, class EnqPolicy>
00624 typename ConcurrentQueue<T,EnqPolicy>::SizeType
00625 ConcurrentQueue<T,EnqPolicy>::capacity() const
00626 {
00627
00628 return capacity_;
00629 }
00630
00631 template <class T, class EnqPolicy>
00632 bool
00633 ConcurrentQueue<T,EnqPolicy>::setCapacity(SizeType newcapacity)
00634 {
00635 LockType lock(protectElements_);
00636 bool isEmpty = (size_ == 0);
00637 if (isEmpty) capacity_ = newcapacity;
00638 return isEmpty;
00639 }
00640
00641 template <class T, class EnqPolicy>
00642 detail::MemoryType
00643 ConcurrentQueue<T,EnqPolicy>::used() const
00644 {
00645
00646 return used_;
00647 }
00648
00649 template <class T, class EnqPolicy>
00650 detail::MemoryType
00651 ConcurrentQueue<T,EnqPolicy>::memory() const
00652 {
00653
00654 return memory_;
00655 }
00656
00657 template <class T, class EnqPolicy>
00658 bool
00659 ConcurrentQueue<T,EnqPolicy>::setMemory(detail::MemoryType newmemory)
00660 {
00661 LockType lock(protectElements_);
00662 bool isEmpty = (size_ == 0);
00663 if (isEmpty) memory_ = newmemory;
00664 return isEmpty;
00665 }
00666
00667 template <class T, class EnqPolicy>
00668 typename ConcurrentQueue<T,EnqPolicy>::SizeType
00669 ConcurrentQueue<T,EnqPolicy>::clear()
00670 {
00671 LockType lock(protectElements_);
00672 SizeType clearedEvents = size_;
00673 elementsDropped_ += size_;
00674 elements_.clear();
00675 size_ = 0;
00676 used_ = 0;
00677 return clearedEvents;
00678 }
00679
00680 template <class T, class EnqPolicy>
00681 void
00682 ConcurrentQueue<T,EnqPolicy>::addExternallyDroppedEvents(SizeType n)
00683 {
00684 LockType lock(protectElements_);
00685 elementsDropped_ += n;
00686 }
00687
00688
00689
00690
00691
00692 template <class T, class EnqPolicy>
00693 bool
00694 ConcurrentQueue<T,EnqPolicy>::insertIfPossible(T const& item)
00695 {
00696 if ( isFull() )
00697 {
00698 ++elementsDropped_;
00699 return false;
00700 }
00701 else
00702 {
00703 EnqPolicy::doInsert(item, elements_, size_,
00704 detail::memoryUsage(item), used_, queueNotEmpty_);
00705 return true;
00706 }
00707 }
00708
00709 template <class T, class EnqPolicy>
00710 bool
00711 ConcurrentQueue<T,EnqPolicy>::removeHeadIfPossible(ValueType& item)
00712 {
00713 if (size_ == 0) return false;
00714
00715 removeHead(item);
00716 return true;
00717 }
00718
00719 template <class T, class EnqPolicy>
00720 void
00721 ConcurrentQueue<T,EnqPolicy>::removeHead(ValueType& item)
00722 {
00723 SequenceType holder;
00724
00725 holder.splice(holder.begin(), elements_, elements_.begin());
00726
00727 --size_;
00728 queueNotFull_.notify_one();
00729
00730 assignItem(item, holder.front());
00731 used_ -= detail::memoryUsage( item );
00732 }
00733
00734 template <class T, class EnqPolicy>
00735 void
00736 ConcurrentQueue<T,EnqPolicy>::assignItem(T& item, const T& element)
00737 {
00738 item = element;
00739 }
00740
00741 template <class T, class EnqPolicy>
00742 void
00743 ConcurrentQueue<T,EnqPolicy>::assignItem(std::pair<T,size_t>& item, const T& element)
00744 {
00745 item.first = element;
00746 item.second = elementsDropped_;
00747 elementsDropped_ = 0;
00748 }
00749
00750 template <class T, class EnqPolicy>
00751 bool
00752 ConcurrentQueue<T,EnqPolicy>::isFull() const
00753 {
00754 if (size_ >= capacity_ || used_ >= memory_) return true;
00755 return false;
00756 }
00757
00758 }
00759
00760 #endif // EventFilter_StorageManager_ConcurrentQueue_h
00761
00768