CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_5_3_1/src/EventFilter/StorageManager/interface/ConcurrentQueue.h

Go to the documentation of this file.
00001 // $Id: ConcurrentQueue.h,v 1.13 2011/04/07 09:28:22 mommsen Exp $
00003 
00004 
00005 #ifndef EventFilter_StorageManager_ConcurrentQueue_h
00006 #define EventFilter_StorageManager_ConcurrentQueue_h
00007 
00008 #include <algorithm>
00009 #include <cstddef>
00010 #include <exception>
00011 #include <limits>
00012 #include <list>
00013 
00014 #include <iostream> // debugging
00015 
00016 #include <boost/date_time/posix_time/posix_time_types.hpp>
00017 #include <boost/utility/enable_if.hpp>
00018 #include <boost/thread/condition.hpp>
00019 #include <boost/thread/mutex.hpp>
00020 
00021 namespace stor
00022 {
00050   namespace detail
00051   {
00052     typedef size_t MemoryType;
00053 
00054     /*
00055       This template is using SFINAE to figure out if the class used to
00056       instantiate the ConcurrentQueue template has a method memoryUsed
00057       returning the number of bytes occupied by the class itself.
00058     */
00059     template <typename T>
00060     class hasMemoryUsed
00061     {
00062       typedef char TrueType;
00063       struct FalseType{ TrueType _[2]; };
00064       
00065       template <MemoryType (T::*)() const>
00066       struct TestConst;
00067       
00068       template <typename C>
00069       static TrueType test( TestConst<&C::memoryUsed>* );
00070       template <typename C>
00071       static FalseType test(...);
00072       
00073     public:
00074       static const bool value = (sizeof(test<T>(0)) == sizeof(TrueType));
00075     };
00076     
00077     template <typename T>
00078     MemoryType 
00079     memoryUsage(const std::pair<T,size_t>& t)
00080     {
00081       MemoryType usage(0UL);
00082       try
00083       {
00084         usage = t.first.memoryUsed();
00085       }
00086       catch(...)
00087       {}
00088       return usage;
00089     }
00090     
00091     template <typename T>
00092     typename boost::enable_if<hasMemoryUsed<T>, MemoryType>::type
00093     memoryUsage(const T& t)
00094     {
00095       MemoryType usage(0UL);
00096       try
00097       {
00098         usage = t.memoryUsed();
00099       }
00100       catch(...)
00101       {}
00102       return usage;
00103     }
00104   
00105     template <typename T>
00106     typename boost::disable_if<hasMemoryUsed<T>, MemoryType>::type
00107     memoryUsage(const T& t)
00108     { return sizeof(T); }
00109 
00110   }// end namespace detail
00111 
00112   
00113   template <class T>
00114   struct FailIfFull
00115   {
00116     typedef void ReturnType;
00117 
00118     typedef T ValueType;
00119     typedef std::list<T> SequenceType;
00120     typedef typename SequenceType::size_type SizeType;
00121 
00122     static struct QueueIsFull : public std::exception
00123     {
00124       virtual const char* what() const throw()
00125       {
00126         return "Cannot add item to a full queue";
00127       }
00128     } queueIsFull;
00129 
00130     static void doInsert
00131     (
00132       T const& item,
00133       SequenceType& elements,
00134       SizeType& size,
00135       detail::MemoryType const& itemSize,
00136       detail::MemoryType& used,
00137       boost::condition& nonempty
00138     )
00139     {
00140       elements.push_back(item);
00141       ++size;
00142       used += itemSize;
00143       nonempty.notify_one();
00144     }
00145                
00146     static ReturnType doEnq
00147     (
00148       T const& item,
00149       SequenceType& elements,
00150       SizeType& size,
00151       SizeType& capacity,
00152       detail::MemoryType& used,
00153       detail::MemoryType& memory,
00154       size_t& elementsDropped,
00155       boost::condition& nonempty
00156     )
00157     {
00158       detail::MemoryType itemSize = detail::memoryUsage(item);
00159       if (size >= capacity || used+itemSize > memory)
00160       {
00161         ++elementsDropped;
00162         throw queueIsFull;
00163       }
00164       else
00165       {
00166         doInsert(item, elements, size, itemSize, used, nonempty);
00167       }
00168     }         
00169   };
00170 
00171 
00172   template <class T>
00173   struct KeepNewest
00174   {
00175     typedef std::pair<T,size_t> ValueType;
00176     typedef std::list<T> SequenceType;
00177     typedef typename SequenceType::size_type SizeType;
00178     typedef SizeType ReturnType;
00179 
00180     static void doInsert
00181     (
00182       T const& item,
00183       SequenceType& elements,
00184       SizeType& size,
00185       detail::MemoryType const& itemSize,
00186       detail::MemoryType& used,
00187       boost::condition& nonempty
00188     )
00189     {
00190       elements.push_back(item);
00191       ++size;
00192       used += itemSize;
00193       nonempty.notify_one();
00194     }
00195 
00196     static ReturnType doEnq
00197     (
00198       T const& item,
00199       SequenceType& elements,
00200       SizeType& size,
00201       SizeType& capacity,
00202       detail::MemoryType& used,
00203       detail::MemoryType& memory,
00204       size_t& elementsDropped,
00205       boost::condition& nonempty
00206     )
00207     {
00208       SizeType elementsRemoved(0);
00209       detail::MemoryType itemSize = detail::memoryUsage(item);
00210       while ( (size==capacity || used+itemSize > memory) && !elements.empty() )
00211       {
00212         SequenceType holder;
00213         // Move the item out of elements in a manner that will not throw.
00214         holder.splice(holder.begin(), elements, elements.begin());
00215         // Record the change in the length of elements.
00216         --size;
00217         used -= detail::memoryUsage( holder.front() );
00218         ++elementsRemoved;
00219       }
00220       if (size < capacity && used+itemSize <= memory)
00221         // we succeeded to make enough room for the new element
00222       {
00223         doInsert(item, elements, size, itemSize, used, nonempty);
00224       }
00225       else
00226       {
00227         // we cannot add the new element
00228         ++elementsRemoved;
00229       }
00230       elementsDropped += elementsRemoved;
00231       return elementsRemoved;
00232     }
00233   };
00234 
00235 
00236   template <class T>
00237   struct RejectNewest
00238   {
00239     typedef std::pair<T,size_t> ValueType;
00240     typedef std::list<T> SequenceType;
00241     typedef typename SequenceType::size_type SizeType;
00242     typedef SizeType ReturnType;
00243 
00244     static void doInsert
00245     (
00246       T const& item,
00247       SequenceType& elements,
00248       SizeType& size,
00249       detail::MemoryType const& itemSize,
00250       detail::MemoryType& used,
00251       boost::condition& nonempty
00252     )
00253     {
00254       elements.push_back(item);
00255       ++size;
00256       used += itemSize;
00257       nonempty.notify_one();
00258     }
00259 
00260     static ReturnType doEnq
00261     (
00262       T const& item,
00263       SequenceType& elements,
00264       SizeType& size,
00265       SizeType& capacity,
00266       detail::MemoryType& used,
00267       detail::MemoryType& memory,
00268       size_t& elementsDropped,
00269       boost::condition& nonempty
00270     )
00271     {
00272       detail::MemoryType itemSize = detail::memoryUsage(item);
00273       if (size < capacity && used+itemSize <= memory)
00274       {
00275         doInsert(item, elements, size, itemSize, used, nonempty);
00276         return 0;
00277       }
00278       ++elementsDropped;
00279       return 1;
00280     }
00281   };
00282 
00287   template <class T, class EnqPolicy=FailIfFull<T> >
00288   class ConcurrentQueue
00289   {
00290   public:
00291     typedef typename EnqPolicy::ValueType ValueType;
00292     typedef typename EnqPolicy::SequenceType SequenceType;
00293     typedef typename SequenceType::size_type SizeType;
00294 
00299     explicit ConcurrentQueue
00300     (
00301       SizeType maxSize = std::numeric_limits<SizeType>::max(),
00302       detail::MemoryType maxMemory = std::numeric_limits<detail::MemoryType>::max()
00303     );
00304 
00311     ~ConcurrentQueue();
00312 
00325     typename EnqPolicy::ReturnType enqNowait(T const& item);
00326 
00332     void enqWait(T const& p);
00333 
00341     bool enqTimedWait(T const& p, boost::posix_time::time_duration const&);
00342 
00350     bool deqNowait(ValueType&);
00351 
00358     void deqWait(ValueType&);
00359 
00368     bool deqTimedWait(ValueType&, boost::posix_time::time_duration const&);
00369 
00373     bool empty() const;
00374 
00378     bool full() const;
00379 
00384     SizeType size() const;
00385 
00390     SizeType capacity() const;
00391 
00397     bool setCapacity(SizeType n);
00398 
00402     detail::MemoryType used() const;
00403 
00408     detail::MemoryType memory() const;
00409 
00416     bool setMemory(detail::MemoryType n);
00417 
00423     SizeType clear();
00424 
00428     void addExternallyDroppedEvents(SizeType);
00429     
00430 
00431   private:
00432     typedef boost::mutex::scoped_lock LockType;
00433 
00434     mutable boost::mutex protectElements_;
00435     mutable boost::condition queueNotEmpty_;
00436     mutable boost::condition queueNotFull_;
00437 
00438     SequenceType elements_;
00439     SizeType capacity_;
00440     SizeType size_;
00441     /*
00442       N.B.: we rely on SizeType *not* being some synthesized large
00443       type, so that reading the value is an atomic action, as is
00444       incrementing or decrementing the value. We do *not* assume that
00445       there is any atomic getAndIncrement or getAndDecrement
00446       operation.
00447     */
00448     detail::MemoryType memory_;
00449     detail::MemoryType used_;
00450     size_t elementsDropped_;
00451 
00452     /*
00453       These private member functions assume that whatever locks
00454       necessary for safe operation have already been obtained.
00455      */
00456 
00457     /*
00458       Insert the given item into the list, if it is not already full,
00459       and increment size. Return true if the item is inserted, and
00460       false if not.
00461     */
00462     bool insertIfPossible(T const& item);
00463 
00464     /*
00465       Remove the object at the head of the queue, if there is one, and
00466       assign item the value of this object.The assignment may throw an
00467       exception; even if it does, the head will have been removed from
00468       the queue, and the size appropriately adjusted. It is assumed
00469       the queue is nonempty. Return true if the queue was nonempty,
00470       and false if the queue was empty.
00471      */
00472     bool removeHeadIfPossible(ValueType& item);
00473 
00474     /*
00475       Remove the object at the head of the queue, and assign item the
00476       value of this object. The assignment may throw an exception;
00477       even if it does, the head will have been removed from the queue,
00478       and the size appropriately adjusted. It is assumed the queue is
00479       nonempty.
00480      */
00481     void removeHead(ValueType& item);
00482 
00483     void assignItem(T& item, const T& element);
00484     void assignItem(std::pair<T,size_t>& item, const T& element);
00485 
00486     /*
00487       Return false if the queue can accept new entries.
00488      */
00489     bool isFull() const;
00490 
00491     /*
00492       These functions are declared private and not implemented to
00493       prevent their use.
00494      */
00495     ConcurrentQueue(ConcurrentQueue<T,EnqPolicy> const&);
00496     ConcurrentQueue& operator=(ConcurrentQueue<T,EnqPolicy> const&);
00497   };
00498 
00499   //------------------------------------------------------------------
00500   // Implementation follows
00501   //------------------------------------------------------------------
00502 
00503   template <class T, class EnqPolicy>
00504   ConcurrentQueue<T,EnqPolicy>::ConcurrentQueue
00505   (
00506     SizeType maxSize,
00507     detail::MemoryType maxMemory
00508   ) :
00509     protectElements_(),
00510     elements_(),
00511     capacity_(maxSize),
00512     size_(0),
00513     memory_(maxMemory),
00514     used_(0),
00515     elementsDropped_(0)
00516   {}
00517 
00518   template <class T, class EnqPolicy>
00519   ConcurrentQueue<T,EnqPolicy>::~ConcurrentQueue()
00520   {
00521     LockType lock(protectElements_);
00522     elements_.clear();
00523     size_ = 0;
00524     used_ = 0;
00525     elementsDropped_ = 0;
00526   }
00527 
00528   template <class T, class EnqPolicy>
00529   typename EnqPolicy::ReturnType
00530   ConcurrentQueue<T,EnqPolicy>::enqNowait(T const& item)
00531   {
00532     LockType lock(protectElements_);
00533     return EnqPolicy::doEnq
00534       (item, elements_, size_, capacity_, used_, memory_,
00535         elementsDropped_, queueNotEmpty_);
00536   }
00537 
00538   template <class T, class EnqPolicy>
00539   void
00540   ConcurrentQueue<T,EnqPolicy>::enqWait(T const& item)
00541   {
00542     LockType lock(protectElements_);
00543     while ( isFull() ) queueNotFull_.wait(lock);
00544     EnqPolicy::doInsert(item, elements_, size_,
00545       detail::memoryUsage(item), used_, queueNotEmpty_);
00546   }
00547 
00548   template <class T, class EnqPolicy>
00549   bool
00550   ConcurrentQueue<T,EnqPolicy>::enqTimedWait
00551   (
00552     T const& item, 
00553     boost::posix_time::time_duration const& waitTime
00554   )
00555   {
00556     LockType lock(protectElements_);
00557     if ( isFull() )
00558     {
00559       queueNotFull_.timed_wait(lock, waitTime);
00560     }
00561     return insertIfPossible(item);
00562   }
00563 
00564   template <class T, class EnqPolicy>
00565   bool
00566   ConcurrentQueue<T,EnqPolicy>::deqNowait(ValueType& item)
00567   {
00568     LockType lock(protectElements_);
00569     return removeHeadIfPossible(item);
00570   }
00571 
00572   template <class T, class EnqPolicy>
00573   void
00574   ConcurrentQueue<T,EnqPolicy>::deqWait(ValueType& item)
00575   {
00576     LockType lock(protectElements_);
00577     while (size_ == 0) queueNotEmpty_.wait(lock);
00578     removeHead(item);
00579   }
00580 
00581   template <class T, class EnqPolicy>
00582   bool
00583   ConcurrentQueue<T,EnqPolicy>::deqTimedWait
00584   (
00585     ValueType& item,
00586     boost::posix_time::time_duration const& waitTime
00587   )
00588   {
00589     LockType lock(protectElements_);
00590     if (size_ == 0)
00591     {
00592       queueNotEmpty_.timed_wait(lock, waitTime);
00593     }
00594     return removeHeadIfPossible(item);
00595   }
00596 
00597   template <class T, class EnqPolicy>
00598   bool 
00599   ConcurrentQueue<T,EnqPolicy>::empty() const
00600   {
00601     // No lock is necessary: the read is atomic.
00602     return size_ == 0;
00603   }
00604 
00605   template <class T, class EnqPolicy>
00606   bool
00607   ConcurrentQueue<T,EnqPolicy>::full() const
00608   {
00609     LockType lock(protectElements_);
00610     return isFull();
00611   }
00612 
00613   template <class T, class EnqPolicy>
00614   typename ConcurrentQueue<T,EnqPolicy>::SizeType 
00615   ConcurrentQueue<T,EnqPolicy>::size() const
00616   {
00617     // No lock is necessary: the read is atomic.
00618     return size_;
00619   }
00620 
00621   template <class T, class EnqPolicy>
00622   typename ConcurrentQueue<T,EnqPolicy>::SizeType
00623   ConcurrentQueue<T,EnqPolicy>::capacity() const
00624   {
00625     // No lock is necessary: the read is atomic.
00626     return capacity_;
00627   }
00628 
00629   template <class T, class EnqPolicy>
00630   bool
00631   ConcurrentQueue<T,EnqPolicy>::setCapacity(SizeType newcapacity)
00632   {
00633     LockType lock(protectElements_);
00634     bool isEmpty = (size_ == 0);
00635     if (isEmpty) capacity_ = newcapacity;
00636     return isEmpty;
00637   }
00638 
00639   template <class T, class EnqPolicy>
00640   detail::MemoryType 
00641   ConcurrentQueue<T,EnqPolicy>::used() const
00642   {
00643     // No lock is necessary: the read is atomic.
00644     return used_;
00645   }
00646 
00647   template <class T, class EnqPolicy>
00648   detail::MemoryType
00649   ConcurrentQueue<T,EnqPolicy>::memory() const
00650   {
00651     // No lock is necessary: the read is atomic.
00652     return memory_;
00653   }
00654 
00655   template <class T, class EnqPolicy>
00656   bool
00657   ConcurrentQueue<T,EnqPolicy>::setMemory(detail::MemoryType newmemory)
00658   {
00659     LockType lock(protectElements_);
00660     bool isEmpty = (size_ == 0);
00661     if (isEmpty) memory_ = newmemory;
00662     return isEmpty;
00663   }
00664 
00665   template <class T, class EnqPolicy>
00666   typename ConcurrentQueue<T,EnqPolicy>::SizeType
00667   ConcurrentQueue<T,EnqPolicy>::clear()
00668   {
00669     LockType lock(protectElements_);
00670     SizeType clearedEvents = size_;
00671     elementsDropped_ += size_;
00672     elements_.clear();
00673     size_ = 0;
00674     used_ = 0;
00675     return clearedEvents;
00676   }
00677   
00678   template <class T, class EnqPolicy>
00679   void 
00680   ConcurrentQueue<T,EnqPolicy>::addExternallyDroppedEvents(SizeType n)
00681   {
00682     LockType lock(protectElements_);
00683     elementsDropped_ += n;
00684   }
00685 
00686   //-----------------------------------------------------------
00687   // Private member functions
00688   //-----------------------------------------------------------
00689 
00690   template <class T, class EnqPolicy>
00691   bool
00692   ConcurrentQueue<T,EnqPolicy>::insertIfPossible(T const& item)
00693   {
00694     if ( isFull() )
00695     {
00696       ++elementsDropped_;
00697       return false;
00698     }
00699     else
00700     {
00701       EnqPolicy::doInsert(item, elements_, size_,
00702       detail::memoryUsage(item), used_, queueNotEmpty_);
00703       return true;
00704     }
00705   }
00706 
00707   template <class T, class EnqPolicy>
00708   bool
00709   ConcurrentQueue<T,EnqPolicy>::removeHeadIfPossible(ValueType& item)
00710   {
00711     if (size_ == 0) return false;
00712 
00713     removeHead(item);
00714     return true;
00715   }
00716 
00717   template <class T, class EnqPolicy>
00718   void
00719   ConcurrentQueue<T,EnqPolicy>::removeHead(ValueType& item)
00720   {
00721     SequenceType holder;
00722     // Move the item out of elements_ in a manner that will not throw.
00723     holder.splice(holder.begin(), elements_, elements_.begin());
00724     // Record the change in the length of elements_.
00725     --size_;
00726     queueNotFull_.notify_one();
00727 
00728     assignItem(item, holder.front());
00729     used_ -= detail::memoryUsage( item );
00730   }
00731   
00732   template <class T, class EnqPolicy>
00733   void
00734   ConcurrentQueue<T,EnqPolicy>::assignItem(T& item, const T& element)
00735   {
00736     item = element;
00737   }
00738   
00739   template <class T, class EnqPolicy>
00740   void
00741   ConcurrentQueue<T,EnqPolicy>::assignItem(std::pair<T,size_t>& item, const T& element)
00742   {
00743     item.first = element;
00744     item.second = elementsDropped_;
00745     elementsDropped_ = 0;
00746   }
00747   
00748   template <class T, class EnqPolicy>
00749   bool
00750   ConcurrentQueue<T,EnqPolicy>::isFull() const
00751   {
00752     if (size_ >= capacity_ || used_ >= memory_) return true;
00753     return false;
00754   }
00755 
00756 } // namespace stor
00757 
00758 #endif // EventFilter_StorageManager_ConcurrentQueue_h
00759 
00766