CMS 3D CMS Logo

/afs/cern.ch/work/a/aaltunda/public/www/CMSSW_6_2_5/src/EventFilter/StorageManager/interface/ConcurrentQueue.h

Go to the documentation of this file.
00001 // $Id: ConcurrentQueue.h,v 1.15 2013/01/07 11:16:27 eulisse Exp $
00003 
00004 
00005 #ifndef EventFilter_StorageManager_ConcurrentQueue_h
00006 #define EventFilter_StorageManager_ConcurrentQueue_h
00007 
00008 #include <algorithm>
00009 #include <cstddef>
00010 #include <exception>
00011 #include <limits>
00012 #include <list>
00013 
00014 #include <iostream> // debugging
00015 
00016 #include <boost/date_time/posix_time/posix_time_types.hpp>
00017 #include <boost/utility/enable_if.hpp>
00018 #include <boost/thread/condition.hpp>
00019 #include <boost/thread/mutex.hpp>
00020 
00021 namespace stor
00022 {
00050   namespace detail
00051   {
00052     typedef size_t MemoryType;
00053 
00054     /*
00055       This template is using SFINAE to figure out if the class used to
00056       instantiate the ConcurrentQueue template has a method memoryUsed
00057       returning the number of bytes occupied by the class itself.
00058     */
00059     template <typename T>
00060     class hasMemoryUsed
00061     {
00062       typedef char TrueType;
00063       struct FalseType{ TrueType _[2]; };
00064       
00065       template <MemoryType (T::*)() const>
00066       struct TestConst;
00067       
00068       template <typename C>
00069       static TrueType test( TestConst<&C::memoryUsed>* );
00070       template <typename C>
00071       static FalseType test(...);
00072       
00073     public:
00074       static const bool value = (sizeof(test<T>(0)) == sizeof(TrueType));
00075     };
00076     
00077     template <typename T>
00078     MemoryType 
00079     memoryUsage(const std::pair<T,size_t>& t)
00080     {
00081       MemoryType usage(0UL);
00082       try
00083       {
00084         usage = t.first.memoryUsed();
00085       }
00086       catch(...)
00087       {}
00088       return usage;
00089     }
00090     
00091     template <typename T>
00092     typename boost::enable_if<hasMemoryUsed<T>, MemoryType>::type
00093     memoryUsage(const T& t)
00094     {
00095       MemoryType usage(0UL);
00096       try
00097       {
00098         usage = t.memoryUsed();
00099       }
00100       catch(...)
00101       {}
00102       return usage;
00103     }
00104   
00105     template <typename T>
00106     typename boost::disable_if<hasMemoryUsed<T>, MemoryType>::type
00107     memoryUsage(const T& t)
00108     { return sizeof(T); }
00109 
00110   }// end namespace detail
00111 
00112   
00113   template <class T>
00114   struct FailIfFull
00115   {
00116     typedef void ReturnType;
00117 
00118     typedef T ValueType;
00119     typedef std::list<T> SequenceType;
00120     typedef typename SequenceType::size_type SizeType;
00121 
00122     static const struct QueueIsFull : public std::exception
00123     {
00124       QueueIsFull() {};
00125       virtual const char* what() const throw()
00126       {
00127         return "Cannot add item to a full queue";
00128       }
00129     } queueIsFull;
00130 
00131     static void doInsert
00132     (
00133       T const& item,
00134       SequenceType& elements,
00135       SizeType& size,
00136       detail::MemoryType const& itemSize,
00137       detail::MemoryType& used,
00138       boost::condition& nonempty
00139     )
00140     {
00141       elements.push_back(item);
00142       ++size;
00143       used += itemSize;
00144       nonempty.notify_one();
00145     }
00146                
00147     static ReturnType doEnq
00148     (
00149       T const& item,
00150       SequenceType& elements,
00151       SizeType& size,
00152       SizeType& capacity,
00153       detail::MemoryType& used,
00154       detail::MemoryType& memory,
00155       size_t& elementsDropped,
00156       boost::condition& nonempty
00157     )
00158     {
00159       detail::MemoryType itemSize = detail::memoryUsage(item);
00160       if (size >= capacity || used+itemSize > memory)
00161       {
00162         ++elementsDropped;
00163         throw queueIsFull;
00164       }
00165       else
00166       {
00167         doInsert(item, elements, size, itemSize, used, nonempty);
00168       }
00169     }         
00170   };
00171   template <class T>
00172   const typename FailIfFull<T>::QueueIsFull FailIfFull<T>::queueIsFull;
00173 
00174   template <class T>
00175   struct KeepNewest
00176   {
00177     typedef std::pair<T,size_t> ValueType;
00178     typedef std::list<T> SequenceType;
00179     typedef typename SequenceType::size_type SizeType;
00180     typedef SizeType ReturnType;
00181 
00182     static void doInsert
00183     (
00184       T const& item,
00185       SequenceType& elements,
00186       SizeType& size,
00187       detail::MemoryType const& itemSize,
00188       detail::MemoryType& used,
00189       boost::condition& nonempty
00190     )
00191     {
00192       elements.push_back(item);
00193       ++size;
00194       used += itemSize;
00195       nonempty.notify_one();
00196     }
00197 
00198     static ReturnType doEnq
00199     (
00200       T const& item,
00201       SequenceType& elements,
00202       SizeType& size,
00203       SizeType& capacity,
00204       detail::MemoryType& used,
00205       detail::MemoryType& memory,
00206       size_t& elementsDropped,
00207       boost::condition& nonempty
00208     )
00209     {
00210       SizeType elementsRemoved(0);
00211       detail::MemoryType itemSize = detail::memoryUsage(item);
00212       while ( (size==capacity || used+itemSize > memory) && !elements.empty() )
00213       {
00214         SequenceType holder;
00215         // Move the item out of elements in a manner that will not throw.
00216         holder.splice(holder.begin(), elements, elements.begin());
00217         // Record the change in the length of elements.
00218         --size;
00219         used -= detail::memoryUsage( holder.front() );
00220         ++elementsRemoved;
00221       }
00222       if (size < capacity && used+itemSize <= memory)
00223         // we succeeded to make enough room for the new element
00224       {
00225         doInsert(item, elements, size, itemSize, used, nonempty);
00226       }
00227       else
00228       {
00229         // we cannot add the new element
00230         ++elementsRemoved;
00231       }
00232       elementsDropped += elementsRemoved;
00233       return elementsRemoved;
00234     }
00235   };
00236 
00237 
00238   template <class T>
00239   struct RejectNewest
00240   {
00241     typedef std::pair<T,size_t> ValueType;
00242     typedef std::list<T> SequenceType;
00243     typedef typename SequenceType::size_type SizeType;
00244     typedef SizeType ReturnType;
00245 
00246     static void doInsert
00247     (
00248       T const& item,
00249       SequenceType& elements,
00250       SizeType& size,
00251       detail::MemoryType const& itemSize,
00252       detail::MemoryType& used,
00253       boost::condition& nonempty
00254     )
00255     {
00256       elements.push_back(item);
00257       ++size;
00258       used += itemSize;
00259       nonempty.notify_one();
00260     }
00261 
00262     static ReturnType doEnq
00263     (
00264       T const& item,
00265       SequenceType& elements,
00266       SizeType& size,
00267       SizeType& capacity,
00268       detail::MemoryType& used,
00269       detail::MemoryType& memory,
00270       size_t& elementsDropped,
00271       boost::condition& nonempty
00272     )
00273     {
00274       detail::MemoryType itemSize = detail::memoryUsage(item);
00275       if (size < capacity && used+itemSize <= memory)
00276       {
00277         doInsert(item, elements, size, itemSize, used, nonempty);
00278         return 0;
00279       }
00280       ++elementsDropped;
00281       return 1;
00282     }
00283   };
00284 
00289   template <class T, class EnqPolicy=FailIfFull<T> >
00290   class ConcurrentQueue
00291   {
00292   public:
00293     typedef typename EnqPolicy::ValueType ValueType;
00294     typedef typename EnqPolicy::SequenceType SequenceType;
00295     typedef typename SequenceType::size_type SizeType;
00296 
00301     explicit ConcurrentQueue
00302     (
00303       SizeType maxSize = std::numeric_limits<SizeType>::max(),
00304       detail::MemoryType maxMemory = std::numeric_limits<detail::MemoryType>::max()
00305     );
00306 
00313     ~ConcurrentQueue();
00314 
00327     typename EnqPolicy::ReturnType enqNowait(T const& item);
00328 
00334     void enqWait(T const& p);
00335 
00343     bool enqTimedWait(T const& p, boost::posix_time::time_duration const&);
00344 
00352     bool deqNowait(ValueType&);
00353 
00360     void deqWait(ValueType&);
00361 
00370     bool deqTimedWait(ValueType&, boost::posix_time::time_duration const&);
00371 
00375     bool empty() const;
00376 
00380     bool full() const;
00381 
00386     SizeType size() const;
00387 
00392     SizeType capacity() const;
00393 
00399     bool setCapacity(SizeType n);
00400 
00404     detail::MemoryType used() const;
00405 
00410     detail::MemoryType memory() const;
00411 
00418     bool setMemory(detail::MemoryType n);
00419 
00425     SizeType clear();
00426 
00430     void addExternallyDroppedEvents(SizeType);
00431     
00432 
00433   private:
00434     typedef boost::mutex::scoped_lock LockType;
00435 
00436     mutable boost::mutex protectElements_;
00437     mutable boost::condition queueNotEmpty_;
00438     mutable boost::condition queueNotFull_;
00439 
00440     SequenceType elements_;
00441     SizeType capacity_;
00442     SizeType size_;
00443     /*
00444       N.B.: we rely on SizeType *not* being some synthesized large
00445       type, so that reading the value is an atomic action, as is
00446       incrementing or decrementing the value. We do *not* assume that
00447       there is any atomic getAndIncrement or getAndDecrement
00448       operation.
00449     */
00450     detail::MemoryType memory_;
00451     detail::MemoryType used_;
00452     size_t elementsDropped_;
00453 
00454     /*
00455       These private member functions assume that whatever locks
00456       necessary for safe operation have already been obtained.
00457      */
00458 
00459     /*
00460       Insert the given item into the list, if it is not already full,
00461       and increment size. Return true if the item is inserted, and
00462       false if not.
00463     */
00464     bool insertIfPossible(T const& item);
00465 
00466     /*
00467       Remove the object at the head of the queue, if there is one, and
00468       assign item the value of this object.The assignment may throw an
00469       exception; even if it does, the head will have been removed from
00470       the queue, and the size appropriately adjusted. It is assumed
00471       the queue is nonempty. Return true if the queue was nonempty,
00472       and false if the queue was empty.
00473      */
00474     bool removeHeadIfPossible(ValueType& item);
00475 
00476     /*
00477       Remove the object at the head of the queue, and assign item the
00478       value of this object. The assignment may throw an exception;
00479       even if it does, the head will have been removed from the queue,
00480       and the size appropriately adjusted. It is assumed the queue is
00481       nonempty.
00482      */
00483     void removeHead(ValueType& item);
00484 
00485     void assignItem(T& item, const T& element);
00486     void assignItem(std::pair<T,size_t>& item, const T& element);
00487 
00488     /*
00489       Return false if the queue can accept new entries.
00490      */
00491     bool isFull() const;
00492 
00493     /*
00494       These functions are declared private and not implemented to
00495       prevent their use.
00496      */
00497     ConcurrentQueue(ConcurrentQueue<T,EnqPolicy> const&);
00498     ConcurrentQueue& operator=(ConcurrentQueue<T,EnqPolicy> const&);
00499   };
00500 
00501   //------------------------------------------------------------------
00502   // Implementation follows
00503   //------------------------------------------------------------------
00504 
00505   template <class T, class EnqPolicy>
00506   ConcurrentQueue<T,EnqPolicy>::ConcurrentQueue
00507   (
00508     SizeType maxSize,
00509     detail::MemoryType maxMemory
00510   ) :
00511     protectElements_(),
00512     elements_(),
00513     capacity_(maxSize),
00514     size_(0),
00515     memory_(maxMemory),
00516     used_(0),
00517     elementsDropped_(0)
00518   {}
00519 
00520   template <class T, class EnqPolicy>
00521   ConcurrentQueue<T,EnqPolicy>::~ConcurrentQueue()
00522   {
00523     LockType lock(protectElements_);
00524     elements_.clear();
00525     size_ = 0;
00526     used_ = 0;
00527     elementsDropped_ = 0;
00528   }
00529 
00530   template <class T, class EnqPolicy>
00531   typename EnqPolicy::ReturnType
00532   ConcurrentQueue<T,EnqPolicy>::enqNowait(T const& item)
00533   {
00534     LockType lock(protectElements_);
00535     return EnqPolicy::doEnq
00536       (item, elements_, size_, capacity_, used_, memory_,
00537         elementsDropped_, queueNotEmpty_);
00538   }
00539 
00540   template <class T, class EnqPolicy>
00541   void
00542   ConcurrentQueue<T,EnqPolicy>::enqWait(T const& item)
00543   {
00544     LockType lock(protectElements_);
00545     while ( isFull() ) queueNotFull_.wait(lock);
00546     EnqPolicy::doInsert(item, elements_, size_,
00547       detail::memoryUsage(item), used_, queueNotEmpty_);
00548   }
00549 
00550   template <class T, class EnqPolicy>
00551   bool
00552   ConcurrentQueue<T,EnqPolicy>::enqTimedWait
00553   (
00554     T const& item, 
00555     boost::posix_time::time_duration const& waitTime
00556   )
00557   {
00558     LockType lock(protectElements_);
00559     if ( isFull() )
00560     {
00561       queueNotFull_.timed_wait(lock, waitTime);
00562     }
00563     return insertIfPossible(item);
00564   }
00565 
00566   template <class T, class EnqPolicy>
00567   bool
00568   ConcurrentQueue<T,EnqPolicy>::deqNowait(ValueType& item)
00569   {
00570     LockType lock(protectElements_);
00571     return removeHeadIfPossible(item);
00572   }
00573 
00574   template <class T, class EnqPolicy>
00575   void
00576   ConcurrentQueue<T,EnqPolicy>::deqWait(ValueType& item)
00577   {
00578     LockType lock(protectElements_);
00579     while (size_ == 0) queueNotEmpty_.wait(lock);
00580     removeHead(item);
00581   }
00582 
00583   template <class T, class EnqPolicy>
00584   bool
00585   ConcurrentQueue<T,EnqPolicy>::deqTimedWait
00586   (
00587     ValueType& item,
00588     boost::posix_time::time_duration const& waitTime
00589   )
00590   {
00591     LockType lock(protectElements_);
00592     if (size_ == 0)
00593     {
00594       queueNotEmpty_.timed_wait(lock, waitTime);
00595     }
00596     return removeHeadIfPossible(item);
00597   }
00598 
00599   template <class T, class EnqPolicy>
00600   bool 
00601   ConcurrentQueue<T,EnqPolicy>::empty() const
00602   {
00603     // No lock is necessary: the read is atomic.
00604     return size_ == 0;
00605   }
00606 
00607   template <class T, class EnqPolicy>
00608   bool
00609   ConcurrentQueue<T,EnqPolicy>::full() const
00610   {
00611     LockType lock(protectElements_);
00612     return isFull();
00613   }
00614 
00615   template <class T, class EnqPolicy>
00616   typename ConcurrentQueue<T,EnqPolicy>::SizeType 
00617   ConcurrentQueue<T,EnqPolicy>::size() const
00618   {
00619     // No lock is necessary: the read is atomic.
00620     return size_;
00621   }
00622 
00623   template <class T, class EnqPolicy>
00624   typename ConcurrentQueue<T,EnqPolicy>::SizeType
00625   ConcurrentQueue<T,EnqPolicy>::capacity() const
00626   {
00627     // No lock is necessary: the read is atomic.
00628     return capacity_;
00629   }
00630 
00631   template <class T, class EnqPolicy>
00632   bool
00633   ConcurrentQueue<T,EnqPolicy>::setCapacity(SizeType newcapacity)
00634   {
00635     LockType lock(protectElements_);
00636     bool isEmpty = (size_ == 0);
00637     if (isEmpty) capacity_ = newcapacity;
00638     return isEmpty;
00639   }
00640 
00641   template <class T, class EnqPolicy>
00642   detail::MemoryType 
00643   ConcurrentQueue<T,EnqPolicy>::used() const
00644   {
00645     // No lock is necessary: the read is atomic.
00646     return used_;
00647   }
00648 
00649   template <class T, class EnqPolicy>
00650   detail::MemoryType
00651   ConcurrentQueue<T,EnqPolicy>::memory() const
00652   {
00653     // No lock is necessary: the read is atomic.
00654     return memory_;
00655   }
00656 
00657   template <class T, class EnqPolicy>
00658   bool
00659   ConcurrentQueue<T,EnqPolicy>::setMemory(detail::MemoryType newmemory)
00660   {
00661     LockType lock(protectElements_);
00662     bool isEmpty = (size_ == 0);
00663     if (isEmpty) memory_ = newmemory;
00664     return isEmpty;
00665   }
00666 
00667   template <class T, class EnqPolicy>
00668   typename ConcurrentQueue<T,EnqPolicy>::SizeType
00669   ConcurrentQueue<T,EnqPolicy>::clear()
00670   {
00671     LockType lock(protectElements_);
00672     SizeType clearedEvents = size_;
00673     elementsDropped_ += size_;
00674     elements_.clear();
00675     size_ = 0;
00676     used_ = 0;
00677     return clearedEvents;
00678   }
00679   
00680   template <class T, class EnqPolicy>
00681   void 
00682   ConcurrentQueue<T,EnqPolicy>::addExternallyDroppedEvents(SizeType n)
00683   {
00684     LockType lock(protectElements_);
00685     elementsDropped_ += n;
00686   }
00687 
00688   //-----------------------------------------------------------
00689   // Private member functions
00690   //-----------------------------------------------------------
00691 
00692   template <class T, class EnqPolicy>
00693   bool
00694   ConcurrentQueue<T,EnqPolicy>::insertIfPossible(T const& item)
00695   {
00696     if ( isFull() )
00697     {
00698       ++elementsDropped_;
00699       return false;
00700     }
00701     else
00702     {
00703       EnqPolicy::doInsert(item, elements_, size_,
00704       detail::memoryUsage(item), used_, queueNotEmpty_);
00705       return true;
00706     }
00707   }
00708 
00709   template <class T, class EnqPolicy>
00710   bool
00711   ConcurrentQueue<T,EnqPolicy>::removeHeadIfPossible(ValueType& item)
00712   {
00713     if (size_ == 0) return false;
00714 
00715     removeHead(item);
00716     return true;
00717   }
00718 
00719   template <class T, class EnqPolicy>
00720   void
00721   ConcurrentQueue<T,EnqPolicy>::removeHead(ValueType& item)
00722   {
00723     SequenceType holder;
00724     // Move the item out of elements_ in a manner that will not throw.
00725     holder.splice(holder.begin(), elements_, elements_.begin());
00726     // Record the change in the length of elements_.
00727     --size_;
00728     queueNotFull_.notify_one();
00729 
00730     assignItem(item, holder.front());
00731     used_ -= detail::memoryUsage( item );
00732   }
00733   
00734   template <class T, class EnqPolicy>
00735   void
00736   ConcurrentQueue<T,EnqPolicy>::assignItem(T& item, const T& element)
00737   {
00738     item = element;
00739   }
00740   
00741   template <class T, class EnqPolicy>
00742   void
00743   ConcurrentQueue<T,EnqPolicy>::assignItem(std::pair<T,size_t>& item, const T& element)
00744   {
00745     item.first = element;
00746     item.second = elementsDropped_;
00747     elementsDropped_ = 0;
00748   }
00749   
00750   template <class T, class EnqPolicy>
00751   bool
00752   ConcurrentQueue<T,EnqPolicy>::isFull() const
00753   {
00754     if (size_ >= capacity_ || used_ >= memory_) return true;
00755     return false;
00756   }
00757 
00758 } // namespace stor
00759 
00760 #endif // EventFilter_StorageManager_ConcurrentQueue_h
00761 
00768