00001
00003
00004
00005 #ifndef EventFilter_StorageManager_ConcurrentQueue_h
00006 #define EventFilter_StorageManager_ConcurrentQueue_h
00007
00008 #include <algorithm>
00009 #include <cstddef>
00010 #include <exception>
00011 #include <limits>
00012 #include <list>
00013
00014 #include <iostream>
00015
00016 #include <boost/date_time/posix_time/posix_time_types.hpp>
00017 #include <boost/utility/enable_if.hpp>
00018 #include <boost/thread/condition.hpp>
00019 #include <boost/thread/mutex.hpp>
00020
00021 namespace stor
00022 {
00050 namespace detail
00051 {
00052 typedef size_t MemoryType;
00053
00054
00055
00056
00057
00058
00059 template <typename T>
00060 class hasMemoryUsed
00061 {
00062 typedef char TrueType;
00063 struct FalseType{ TrueType _[2]; };
00064
00065 template <MemoryType (T::*)() const>
00066 struct TestConst;
00067
00068 template <typename C>
00069 static TrueType test( TestConst<&C::memoryUsed>* );
00070 template <typename C>
00071 static FalseType test(...);
00072
00073 public:
00074 static const bool value = (sizeof(test<T>(0)) == sizeof(TrueType));
00075 };
00076
00077 template <typename T>
00078 MemoryType
00079 memoryUsage(const std::pair<T,size_t>& t)
00080 {
00081 MemoryType usage(0UL);
00082 try
00083 {
00084 usage = t.first.memoryUsed();
00085 }
00086 catch(...)
00087 {}
00088 return usage;
00089 }
00090
00091 template <typename T>
00092 typename boost::enable_if<hasMemoryUsed<T>, MemoryType>::type
00093 memoryUsage(const T& t)
00094 {
00095 MemoryType usage(0UL);
00096 try
00097 {
00098 usage = t.memoryUsed();
00099 }
00100 catch(...)
00101 {}
00102 return usage;
00103 }
00104
00105 template <typename T>
00106 typename boost::disable_if<hasMemoryUsed<T>, MemoryType>::type
00107 memoryUsage(const T& t)
00108 { return sizeof(T); }
00109
00110 }
00111
00112
00113 template <class T>
00114 struct FailIfFull
00115 {
00116 typedef void ReturnType;
00117
00118 typedef T ValueType;
00119 typedef std::list<T> SequenceType;
00120 typedef typename SequenceType::size_type SizeType;
00121
00122 static struct QueueIsFull : public std::exception
00123 {
00124 virtual const char* what() const throw()
00125 {
00126 return "Cannot add item to a full queue";
00127 }
00128 } queueIsFull;
00129
00130 static void doInsert
00131 (
00132 T const& item,
00133 SequenceType& elements,
00134 SizeType& size,
00135 detail::MemoryType const& itemSize,
00136 detail::MemoryType& used,
00137 boost::condition& nonempty
00138 )
00139 {
00140 elements.push_back(item);
00141 ++size;
00142 used += itemSize;
00143 nonempty.notify_one();
00144 }
00145
00146 static ReturnType doEnq
00147 (
00148 T const& item,
00149 SequenceType& elements,
00150 SizeType& size,
00151 SizeType& capacity,
00152 detail::MemoryType& used,
00153 detail::MemoryType& memory,
00154 size_t& elementsDropped,
00155 boost::condition& nonempty
00156 )
00157 {
00158 detail::MemoryType itemSize = detail::memoryUsage(item);
00159 if (size >= capacity || used+itemSize > memory)
00160 {
00161 ++elementsDropped;
00162 throw queueIsFull;
00163 }
00164 else
00165 {
00166 doInsert(item, elements, size, itemSize, used, nonempty);
00167 }
00168 }
00169 };
00170
00171
00172 template <class T>
00173 struct KeepNewest
00174 {
00175 typedef std::pair<T,size_t> ValueType;
00176 typedef std::list<T> SequenceType;
00177 typedef typename SequenceType::size_type SizeType;
00178 typedef SizeType ReturnType;
00179
00180 static void doInsert
00181 (
00182 T const& item,
00183 SequenceType& elements,
00184 SizeType& size,
00185 detail::MemoryType const& itemSize,
00186 detail::MemoryType& used,
00187 boost::condition& nonempty
00188 )
00189 {
00190 elements.push_back(item);
00191 ++size;
00192 used += itemSize;
00193 nonempty.notify_one();
00194 }
00195
00196 static ReturnType doEnq
00197 (
00198 T const& item,
00199 SequenceType& elements,
00200 SizeType& size,
00201 SizeType& capacity,
00202 detail::MemoryType& used,
00203 detail::MemoryType& memory,
00204 size_t& elementsDropped,
00205 boost::condition& nonempty
00206 )
00207 {
00208 SizeType elementsRemoved(0);
00209 detail::MemoryType itemSize = detail::memoryUsage(item);
00210 while ( (size==capacity || used+itemSize > memory) && !elements.empty() )
00211 {
00212 SequenceType holder;
00213
00214 holder.splice(holder.begin(), elements, elements.begin());
00215
00216 --size;
00217 used -= detail::memoryUsage( holder.front() );
00218 ++elementsRemoved;
00219 }
00220 if (size < capacity && used+itemSize <= memory)
00221
00222 {
00223 doInsert(item, elements, size, itemSize, used, nonempty);
00224 }
00225 else
00226 {
00227
00228 ++elementsRemoved;
00229 }
00230 elementsDropped += elementsRemoved;
00231 return elementsRemoved;
00232 }
00233 };
00234
00235
00236 template <class T>
00237 struct RejectNewest
00238 {
00239 typedef std::pair<T,size_t> ValueType;
00240 typedef std::list<T> SequenceType;
00241 typedef typename SequenceType::size_type SizeType;
00242 typedef SizeType ReturnType;
00243
00244 static void doInsert
00245 (
00246 T const& item,
00247 SequenceType& elements,
00248 SizeType& size,
00249 detail::MemoryType const& itemSize,
00250 detail::MemoryType& used,
00251 boost::condition& nonempty
00252 )
00253 {
00254 elements.push_back(item);
00255 ++size;
00256 used += itemSize;
00257 nonempty.notify_one();
00258 }
00259
00260 static ReturnType doEnq
00261 (
00262 T const& item,
00263 SequenceType& elements,
00264 SizeType& size,
00265 SizeType& capacity,
00266 detail::MemoryType& used,
00267 detail::MemoryType& memory,
00268 size_t& elementsDropped,
00269 boost::condition& nonempty
00270 )
00271 {
00272 detail::MemoryType itemSize = detail::memoryUsage(item);
00273 if (size < capacity && used+itemSize <= memory)
00274 {
00275 doInsert(item, elements, size, itemSize, used, nonempty);
00276 return 0;
00277 }
00278 ++elementsDropped;
00279 return 1;
00280 }
00281 };
00282
00287 template <class T, class EnqPolicy=FailIfFull<T> >
00288 class ConcurrentQueue
00289 {
00290 public:
00291 typedef typename EnqPolicy::ValueType ValueType;
00292 typedef typename EnqPolicy::SequenceType SequenceType;
00293 typedef typename SequenceType::size_type SizeType;
00294
00299 explicit ConcurrentQueue
00300 (
00301 SizeType maxSize = std::numeric_limits<SizeType>::max(),
00302 detail::MemoryType maxMemory = std::numeric_limits<detail::MemoryType>::max()
00303 );
00304
00311 ~ConcurrentQueue();
00312
00325 typename EnqPolicy::ReturnType enqNowait(T const& item);
00326
00332 void enqWait(T const& p);
00333
00341 bool enqTimedWait(T const& p, boost::posix_time::time_duration const&);
00342
00350 bool deqNowait(ValueType&);
00351
00358 void deqWait(ValueType&);
00359
00368 bool deqTimedWait(ValueType&, boost::posix_time::time_duration const&);
00369
00373 bool empty() const;
00374
00378 bool full() const;
00379
00384 SizeType size() const;
00385
00390 SizeType capacity() const;
00391
00397 bool setCapacity(SizeType n);
00398
00402 detail::MemoryType used() const;
00403
00408 detail::MemoryType memory() const;
00409
00416 bool setMemory(detail::MemoryType n);
00417
00423 SizeType clear();
00424
00428 void addExternallyDroppedEvents(SizeType);
00429
00430
00431 private:
00432 typedef boost::mutex::scoped_lock LockType;
00433
00434 mutable boost::mutex protectElements_;
00435 mutable boost::condition queueNotEmpty_;
00436 mutable boost::condition queueNotFull_;
00437
00438 SequenceType elements_;
00439 SizeType capacity_;
00440 SizeType size_;
00441
00442
00443
00444
00445
00446
00447
00448 detail::MemoryType memory_;
00449 detail::MemoryType used_;
00450 size_t elementsDropped_;
00451
00452
00453
00454
00455
00456
00457
00458
00459
00460
00461
00462 bool insertIfPossible(T const& item);
00463
00464
00465
00466
00467
00468
00469
00470
00471
00472 bool removeHeadIfPossible(ValueType& item);
00473
00474
00475
00476
00477
00478
00479
00480
00481 void removeHead(ValueType& item);
00482
00483 void assignItem(T& item, const T& element);
00484 void assignItem(std::pair<T,size_t>& item, const T& element);
00485
00486
00487
00488
00489 bool isFull() const;
00490
00491
00492
00493
00494
00495 ConcurrentQueue(ConcurrentQueue<T,EnqPolicy> const&);
00496 ConcurrentQueue& operator=(ConcurrentQueue<T,EnqPolicy> const&);
00497 };
00498
00499
00500
00501
00502
00503 template <class T, class EnqPolicy>
00504 ConcurrentQueue<T,EnqPolicy>::ConcurrentQueue
00505 (
00506 SizeType maxSize,
00507 detail::MemoryType maxMemory
00508 ) :
00509 protectElements_(),
00510 elements_(),
00511 capacity_(maxSize),
00512 size_(0),
00513 memory_(maxMemory),
00514 used_(0),
00515 elementsDropped_(0)
00516 {}
00517
00518 template <class T, class EnqPolicy>
00519 ConcurrentQueue<T,EnqPolicy>::~ConcurrentQueue()
00520 {
00521 LockType lock(protectElements_);
00522 elements_.clear();
00523 size_ = 0;
00524 used_ = 0;
00525 elementsDropped_ = 0;
00526 }
00527
00528 template <class T, class EnqPolicy>
00529 typename EnqPolicy::ReturnType
00530 ConcurrentQueue<T,EnqPolicy>::enqNowait(T const& item)
00531 {
00532 LockType lock(protectElements_);
00533 return EnqPolicy::doEnq
00534 (item, elements_, size_, capacity_, used_, memory_,
00535 elementsDropped_, queueNotEmpty_);
00536 }
00537
00538 template <class T, class EnqPolicy>
00539 void
00540 ConcurrentQueue<T,EnqPolicy>::enqWait(T const& item)
00541 {
00542 LockType lock(protectElements_);
00543 while ( isFull() ) queueNotFull_.wait(lock);
00544 EnqPolicy::doInsert(item, elements_, size_,
00545 detail::memoryUsage(item), used_, queueNotEmpty_);
00546 }
00547
00548 template <class T, class EnqPolicy>
00549 bool
00550 ConcurrentQueue<T,EnqPolicy>::enqTimedWait
00551 (
00552 T const& item,
00553 boost::posix_time::time_duration const& waitTime
00554 )
00555 {
00556 LockType lock(protectElements_);
00557 if ( isFull() )
00558 {
00559 queueNotFull_.timed_wait(lock, waitTime);
00560 }
00561 return insertIfPossible(item);
00562 }
00563
00564 template <class T, class EnqPolicy>
00565 bool
00566 ConcurrentQueue<T,EnqPolicy>::deqNowait(ValueType& item)
00567 {
00568 LockType lock(protectElements_);
00569 return removeHeadIfPossible(item);
00570 }
00571
00572 template <class T, class EnqPolicy>
00573 void
00574 ConcurrentQueue<T,EnqPolicy>::deqWait(ValueType& item)
00575 {
00576 LockType lock(protectElements_);
00577 while (size_ == 0) queueNotEmpty_.wait(lock);
00578 removeHead(item);
00579 }
00580
00581 template <class T, class EnqPolicy>
00582 bool
00583 ConcurrentQueue<T,EnqPolicy>::deqTimedWait
00584 (
00585 ValueType& item,
00586 boost::posix_time::time_duration const& waitTime
00587 )
00588 {
00589 LockType lock(protectElements_);
00590 if (size_ == 0)
00591 {
00592 queueNotEmpty_.timed_wait(lock, waitTime);
00593 }
00594 return removeHeadIfPossible(item);
00595 }
00596
00597 template <class T, class EnqPolicy>
00598 bool
00599 ConcurrentQueue<T,EnqPolicy>::empty() const
00600 {
00601
00602 return size_ == 0;
00603 }
00604
00605 template <class T, class EnqPolicy>
00606 bool
00607 ConcurrentQueue<T,EnqPolicy>::full() const
00608 {
00609 LockType lock(protectElements_);
00610 return isFull();
00611 }
00612
00613 template <class T, class EnqPolicy>
00614 typename ConcurrentQueue<T,EnqPolicy>::SizeType
00615 ConcurrentQueue<T,EnqPolicy>::size() const
00616 {
00617
00618 return size_;
00619 }
00620
00621 template <class T, class EnqPolicy>
00622 typename ConcurrentQueue<T,EnqPolicy>::SizeType
00623 ConcurrentQueue<T,EnqPolicy>::capacity() const
00624 {
00625
00626 return capacity_;
00627 }
00628
00629 template <class T, class EnqPolicy>
00630 bool
00631 ConcurrentQueue<T,EnqPolicy>::setCapacity(SizeType newcapacity)
00632 {
00633 LockType lock(protectElements_);
00634 bool isEmpty = (size_ == 0);
00635 if (isEmpty) capacity_ = newcapacity;
00636 return isEmpty;
00637 }
00638
00639 template <class T, class EnqPolicy>
00640 detail::MemoryType
00641 ConcurrentQueue<T,EnqPolicy>::used() const
00642 {
00643
00644 return used_;
00645 }
00646
00647 template <class T, class EnqPolicy>
00648 detail::MemoryType
00649 ConcurrentQueue<T,EnqPolicy>::memory() const
00650 {
00651
00652 return memory_;
00653 }
00654
00655 template <class T, class EnqPolicy>
00656 bool
00657 ConcurrentQueue<T,EnqPolicy>::setMemory(detail::MemoryType newmemory)
00658 {
00659 LockType lock(protectElements_);
00660 bool isEmpty = (size_ == 0);
00661 if (isEmpty) memory_ = newmemory;
00662 return isEmpty;
00663 }
00664
00665 template <class T, class EnqPolicy>
00666 typename ConcurrentQueue<T,EnqPolicy>::SizeType
00667 ConcurrentQueue<T,EnqPolicy>::clear()
00668 {
00669 LockType lock(protectElements_);
00670 SizeType clearedEvents = size_;
00671 elementsDropped_ += size_;
00672 elements_.clear();
00673 size_ = 0;
00674 used_ = 0;
00675 return clearedEvents;
00676 }
00677
00678 template <class T, class EnqPolicy>
00679 void
00680 ConcurrentQueue<T,EnqPolicy>::addExternallyDroppedEvents(SizeType n)
00681 {
00682 LockType lock(protectElements_);
00683 elementsDropped_ += n;
00684 }
00685
00686
00687
00688
00689
00690 template <class T, class EnqPolicy>
00691 bool
00692 ConcurrentQueue<T,EnqPolicy>::insertIfPossible(T const& item)
00693 {
00694 if ( isFull() )
00695 {
00696 ++elementsDropped_;
00697 return false;
00698 }
00699 else
00700 {
00701 EnqPolicy::doInsert(item, elements_, size_,
00702 detail::memoryUsage(item), used_, queueNotEmpty_);
00703 return true;
00704 }
00705 }
00706
00707 template <class T, class EnqPolicy>
00708 bool
00709 ConcurrentQueue<T,EnqPolicy>::removeHeadIfPossible(ValueType& item)
00710 {
00711 if (size_ == 0) return false;
00712
00713 removeHead(item);
00714 return true;
00715 }
00716
00717 template <class T, class EnqPolicy>
00718 void
00719 ConcurrentQueue<T,EnqPolicy>::removeHead(ValueType& item)
00720 {
00721 SequenceType holder;
00722
00723 holder.splice(holder.begin(), elements_, elements_.begin());
00724
00725 --size_;
00726 queueNotFull_.notify_one();
00727
00728 assignItem(item, holder.front());
00729 used_ -= detail::memoryUsage( item );
00730 }
00731
00732 template <class T, class EnqPolicy>
00733 void
00734 ConcurrentQueue<T,EnqPolicy>::assignItem(T& item, const T& element)
00735 {
00736 item = element;
00737 }
00738
00739 template <class T, class EnqPolicy>
00740 void
00741 ConcurrentQueue<T,EnqPolicy>::assignItem(std::pair<T,size_t>& item, const T& element)
00742 {
00743 item.first = element;
00744 item.second = elementsDropped_;
00745 elementsDropped_ = 0;
00746 }
00747
00748 template <class T, class EnqPolicy>
00749 bool
00750 ConcurrentQueue<T,EnqPolicy>::isFull() const
00751 {
00752 if (size_ >= capacity_ || used_ >= memory_) return true;
00753 return false;
00754 }
00755
00756 }
00757
00758 #endif // EventFilter_StorageManager_ConcurrentQueue_h
00759
00766