5 #ifndef EventFilter_StorageManager_ConcurrentQueue_h
6 #define EventFilter_StorageManager_ConcurrentQueue_h
16 #include <boost/date_time/posix_time/posix_time_types.hpp>
17 #include <boost/utility/enable_if.hpp>
18 #include <boost/thread/condition.hpp>
19 #include <boost/thread/mutex.hpp>
65 template <MemoryType (T::*)() const>
84 usage = t.first.memoryUsed();
98 usage = t.memoryUsed();
105 template <
typename T>
108 {
return sizeof(
T); }
127 return "Cannot add item to a full queue";
138 boost::condition& nonempty
141 elements.push_back(item);
144 nonempty.notify_one();
155 size_t& elementsDropped,
156 boost::condition& nonempty
160 if (size >= capacity || used+itemSize > memory)
167 doInsert(item, elements, size, itemSize, used, nonempty);
189 boost::condition& nonempty
192 elements.push_back(item);
195 nonempty.notify_one();
206 size_t& elementsDropped,
207 boost::condition& nonempty
212 while ( (size==capacity || used+itemSize > memory) && !elements.empty() )
216 holder.splice(holder.begin(),
elements, elements.begin());
222 if (size < capacity && used+itemSize <= memory)
225 doInsert(item, elements, size, itemSize, used, nonempty);
232 elementsDropped += elementsRemoved;
233 return elementsRemoved;
253 boost::condition& nonempty
256 elements.push_back(item);
259 nonempty.notify_one();
270 size_t& elementsDropped,
271 boost::condition& nonempty
275 if (size < capacity && used+itemSize <= memory)
277 doInsert(item, elements, size, itemSize, used, nonempty);
289 template <
class T,
class EnqPolicy=FailIfFull<T> >
327 typename EnqPolicy::ReturnType
enqNowait(
T const& item);
343 bool enqTimedWait(
T const&
p, boost::posix_time::time_duration
const&);
486 void assignItem(std::pair<T,size_t>& item,
const T& element);
505 template <
class T,
class EnqPolicy>
520 template <
class T,
class EnqPolicy>
527 elementsDropped_ = 0;
530 template <
class T,
class EnqPolicy>
531 typename EnqPolicy::ReturnType
535 return EnqPolicy::doEnq
536 (item, elements_,
size_, capacity_, used_, memory_,
537 elementsDropped_, queueNotEmpty_);
540 template <
class T,
class EnqPolicy>
545 while ( isFull() ) queueNotFull_.wait(lock);
546 EnqPolicy::doInsert(item, elements_,
size_,
550 template <
class T,
class EnqPolicy>
555 boost::posix_time::time_duration
const& waitTime
561 queueNotFull_.timed_wait(lock, waitTime);
563 return insertIfPossible(item);
566 template <
class T,
class EnqPolicy>
571 return removeHeadIfPossible(item);
574 template <
class T,
class EnqPolicy>
579 while (
size_ == 0) queueNotEmpty_.wait(lock);
583 template <
class T,
class EnqPolicy>
588 boost::posix_time::time_duration
const& waitTime
594 queueNotEmpty_.timed_wait(lock, waitTime);
596 return removeHeadIfPossible(item);
599 template <
class T,
class EnqPolicy>
607 template <
class T,
class EnqPolicy>
615 template <
class T,
class EnqPolicy>
623 template <
class T,
class EnqPolicy>
631 template <
class T,
class EnqPolicy>
636 bool isEmpty = (
size_ == 0);
637 if (isEmpty) capacity_ = newcapacity;
641 template <
class T,
class EnqPolicy>
649 template <
class T,
class EnqPolicy>
657 template <
class T,
class EnqPolicy>
662 bool isEmpty = (
size_ == 0);
663 if (isEmpty) memory_ = newmemory;
667 template <
class T,
class EnqPolicy>
673 elementsDropped_ +=
size_;
677 return clearedEvents;
680 template <
class T,
class EnqPolicy>
685 elementsDropped_ +=
n;
692 template <
class T,
class EnqPolicy>
703 EnqPolicy::doInsert(item, elements_,
size_,
709 template <
class T,
class EnqPolicy>
713 if (
size_ == 0)
return false;
719 template <
class T,
class EnqPolicy>
725 holder.splice(holder.begin(), elements_, elements_.begin());
728 queueNotFull_.notify_one();
730 assignItem(item, holder.front());
734 template <
class T,
class EnqPolicy>
741 template <
class T,
class EnqPolicy>
745 item.first = element;
746 item.second = elementsDropped_;
747 elementsDropped_ = 0;
750 template <
class T,
class EnqPolicy>
754 if (
size_ >= capacity_ || used_ >= memory_)
return true;
760 #endif // EventFilter_StorageManager_ConcurrentQueue_h
static ReturnType doEnq(T const &item, SequenceType &elements, SizeType &size, SizeType &capacity, detail::MemoryType &used, detail::MemoryType &memory, size_t &elementsDropped, boost::condition &nonempty)
bool deqNowait(ValueType &)
void deqWait(ValueType &)
std::pair< T, size_t > ValueType
static TrueType test(TestConst<&C::memoryUsed > *)
bool setCapacity(SizeType n)
static boost::mutex mutex
stor::FailIfFull::QueueIsFull queueIsFull
static void doInsert(T const &item, SequenceType &elements, SizeType &size, detail::MemoryType const &itemSize, detail::MemoryType &used, boost::condition &nonempty)
bool enqTimedWait(T const &p, boost::posix_time::time_duration const &)
ConcurrentQueue & operator=(ConcurrentQueue< T, EnqPolicy > const &)
boost::mutex protectElements_
SequenceType::size_type SizeType
static ReturnType doEnq(T const &item, SequenceType &elements, SizeType &size, SizeType &capacity, detail::MemoryType &used, detail::MemoryType &memory, size_t &elementsDropped, boost::condition &nonempty)
EnqPolicy::ValueType ValueType
detail::MemoryType memory() const
EnqPolicy::ReturnType enqNowait(T const &item)
const T & max(const T &a, const T &b)
tuple maxSize
'/store/data/Commissioning08/BeamHalo/RECO/StuffAlmostToP5_v1/000/061/642/10A0FE34-A67D-DD11-AD05-000...
SequenceType::size_type SizeType
bool deqTimedWait(ValueType &, boost::posix_time::time_duration const &)
SequenceType::size_type SizeType
EnqPolicy::SequenceType SequenceType
bool setMemory(detail::MemoryType n)
bool removeHeadIfPossible(ValueType &item)
std::list< T > SequenceType
static void doInsert(T const &item, SequenceType &elements, SizeType &size, detail::MemoryType const &itemSize, detail::MemoryType &used, boost::condition &nonempty)
static ReturnType doEnq(T const &item, SequenceType &elements, SizeType &size, SizeType &capacity, detail::MemoryType &used, detail::MemoryType &memory, size_t &elementsDropped, boost::condition &nonempty)
boost::mutex::scoped_lock LockType
detail::MemoryType memory_
std::list< T > SequenceType
detail::MemoryType used() const
MemoryType memoryUsage(const std::pair< T, size_t > &t)
SizeType capacity() const
boost::condition queueNotFull_
ConcurrentQueue(SizeType maxSize=std::numeric_limits< SizeType >::max(), detail::MemoryType maxMemory=std::numeric_limits< detail::MemoryType >::max())
void assignItem(T &item, const T &element)
static void doInsert(T const &item, SequenceType &elements, SizeType &size, detail::MemoryType const &itemSize, detail::MemoryType &used, boost::condition &nonempty)
std::pair< T, size_t > ValueType
std::list< T > SequenceType
bool insertIfPossible(T const &item)
void removeHead(ValueType &item)
tuple size
Write out results.
boost::condition queueNotEmpty_
virtual const char * what() const
void addExternallyDroppedEvents(SizeType)
SequenceType::size_type SizeType