5 #ifndef EventFilter_StorageManager_ConcurrentQueue_h
6 #define EventFilter_StorageManager_ConcurrentQueue_h
16 #include <boost/date_time/posix_time/posix_time_types.hpp>
17 #include <boost/utility/enable_if.hpp>
18 #include <boost/thread/condition.hpp>
19 #include <boost/thread/mutex.hpp>
65 template <MemoryType (T::*)() const>
84 usage = t.first.memoryUsed();
98 usage = t.memoryUsed();
105 template <
typename T>
108 {
return sizeof(
T); }
126 return "Cannot add item to a full queue";
137 boost::condition& nonempty
140 elements.push_back(item);
143 nonempty.notify_one();
154 size_t& elementsDropped,
155 boost::condition& nonempty
159 if (size >= capacity || used+itemSize > memory)
166 doInsert(item, elements, size, itemSize, used, nonempty);
187 boost::condition& nonempty
190 elements.push_back(item);
193 nonempty.notify_one();
204 size_t& elementsDropped,
205 boost::condition& nonempty
210 while ( (size==capacity || used+itemSize > memory) && !elements.empty() )
214 holder.splice(holder.begin(),
elements, elements.begin());
220 if (size < capacity && used+itemSize <= memory)
223 doInsert(item, elements, size, itemSize, used, nonempty);
230 elementsDropped += elementsRemoved;
231 return elementsRemoved;
251 boost::condition& nonempty
254 elements.push_back(item);
257 nonempty.notify_one();
268 size_t& elementsDropped,
269 boost::condition& nonempty
273 if (size < capacity && used+itemSize <= memory)
275 doInsert(item, elements, size, itemSize, used, nonempty);
287 template <
class T,
class EnqPolicy=FailIfFull<T> >
325 typename EnqPolicy::ReturnType
enqNowait(
T const& item);
341 bool enqTimedWait(
T const&
p, boost::posix_time::time_duration
const&);
484 void assignItem(std::pair<T,size_t>& item,
const T& element);
503 template <
class T,
class EnqPolicy>
518 template <
class T,
class EnqPolicy>
525 elementsDropped_ = 0;
528 template <
class T,
class EnqPolicy>
529 typename EnqPolicy::ReturnType
533 return EnqPolicy::doEnq
534 (item, elements_,
size_, capacity_, used_, memory_,
535 elementsDropped_, queueNotEmpty_);
538 template <
class T,
class EnqPolicy>
543 while ( isFull() ) queueNotFull_.wait(lock);
544 EnqPolicy::doInsert(item, elements_,
size_,
548 template <
class T,
class EnqPolicy>
553 boost::posix_time::time_duration
const& waitTime
559 queueNotFull_.timed_wait(lock, waitTime);
561 return insertIfPossible(item);
564 template <
class T,
class EnqPolicy>
569 return removeHeadIfPossible(item);
572 template <
class T,
class EnqPolicy>
577 while (
size_ == 0) queueNotEmpty_.wait(lock);
581 template <
class T,
class EnqPolicy>
586 boost::posix_time::time_duration
const& waitTime
592 queueNotEmpty_.timed_wait(lock, waitTime);
594 return removeHeadIfPossible(item);
597 template <
class T,
class EnqPolicy>
605 template <
class T,
class EnqPolicy>
613 template <
class T,
class EnqPolicy>
621 template <
class T,
class EnqPolicy>
629 template <
class T,
class EnqPolicy>
634 bool isEmpty = (
size_ == 0);
635 if (isEmpty) capacity_ = newcapacity;
639 template <
class T,
class EnqPolicy>
647 template <
class T,
class EnqPolicy>
655 template <
class T,
class EnqPolicy>
660 bool isEmpty = (
size_ == 0);
661 if (isEmpty) memory_ = newmemory;
665 template <
class T,
class EnqPolicy>
671 elementsDropped_ +=
size_;
675 return clearedEvents;
678 template <
class T,
class EnqPolicy>
683 elementsDropped_ +=
n;
690 template <
class T,
class EnqPolicy>
701 EnqPolicy::doInsert(item, elements_,
size_,
707 template <
class T,
class EnqPolicy>
711 if (
size_ == 0)
return false;
717 template <
class T,
class EnqPolicy>
723 holder.splice(holder.begin(), elements_, elements_.begin());
726 queueNotFull_.notify_one();
728 assignItem(item, holder.front());
732 template <
class T,
class EnqPolicy>
739 template <
class T,
class EnqPolicy>
743 item.first = element;
744 item.second = elementsDropped_;
745 elementsDropped_ = 0;
748 template <
class T,
class EnqPolicy>
752 if (
size_ >= capacity_ || used_ >= memory_)
return true;
758 #endif // EventFilter_StorageManager_ConcurrentQueue_h
static ReturnType doEnq(T const &item, SequenceType &elements, SizeType &size, SizeType &capacity, detail::MemoryType &used, detail::MemoryType &memory, size_t &elementsDropped, boost::condition &nonempty)
bool deqNowait(ValueType &)
void deqWait(ValueType &)
std::pair< T, size_t > ValueType
static TrueType test(TestConst<&C::memoryUsed > *)
bool setCapacity(SizeType n)
static boost::mutex mutex
static void doInsert(T const &item, SequenceType &elements, SizeType &size, detail::MemoryType const &itemSize, detail::MemoryType &used, boost::condition &nonempty)
bool enqTimedWait(T const &p, boost::posix_time::time_duration const &)
ConcurrentQueue & operator=(ConcurrentQueue< T, EnqPolicy > const &)
boost::mutex protectElements_
SequenceType::size_type SizeType
static ReturnType doEnq(T const &item, SequenceType &elements, SizeType &size, SizeType &capacity, detail::MemoryType &used, detail::MemoryType &memory, size_t &elementsDropped, boost::condition &nonempty)
EnqPolicy::ValueType ValueType
detail::MemoryType memory() const
EnqPolicy::ReturnType enqNowait(T const &item)
const T & max(const T &a, const T &b)
tuple maxSize
'/store/data/Commissioning08/BeamHalo/RECO/StuffAlmostToP5_v1/000/061/642/10A0FE34-A67D-DD11-AD05-000...
SequenceType::size_type SizeType
bool deqTimedWait(ValueType &, boost::posix_time::time_duration const &)
SequenceType::size_type SizeType
EnqPolicy::SequenceType SequenceType
bool setMemory(detail::MemoryType n)
bool removeHeadIfPossible(ValueType &item)
std::list< T > SequenceType
static void doInsert(T const &item, SequenceType &elements, SizeType &size, detail::MemoryType const &itemSize, detail::MemoryType &used, boost::condition &nonempty)
static ReturnType doEnq(T const &item, SequenceType &elements, SizeType &size, SizeType &capacity, detail::MemoryType &used, detail::MemoryType &memory, size_t &elementsDropped, boost::condition &nonempty)
boost::mutex::scoped_lock LockType
detail::MemoryType memory_
std::list< T > SequenceType
detail::MemoryType used() const
MemoryType memoryUsage(const std::pair< T, size_t > &t)
SizeType capacity() const
boost::condition queueNotFull_
ConcurrentQueue(SizeType maxSize=std::numeric_limits< SizeType >::max(), detail::MemoryType maxMemory=std::numeric_limits< detail::MemoryType >::max())
void assignItem(T &item, const T &element)
static void doInsert(T const &item, SequenceType &elements, SizeType &size, detail::MemoryType const &itemSize, detail::MemoryType &used, boost::condition &nonempty)
std::pair< T, size_t > ValueType
std::list< T > SequenceType
bool insertIfPossible(T const &item)
void removeHead(ValueType &item)
stor::FailIfFull::QueueIsFull queueIsFull
tuple size
Write out results.
boost::condition queueNotEmpty_
virtual const char * what() const
void addExternallyDroppedEvents(SizeType)
SequenceType::size_type SizeType