CMS 3D CMS Logo

/afs/cern.ch/work/a/aaltunda/public/www/CMSSW_6_2_7/src/EventFilter/StorageManager/interface/ExpirableQueue.h

Go to the documentation of this file.
00001 // $Id: ExpirableQueue.h,v 1.8 2011/03/07 15:31:31 mommsen Exp $
00003 
00004 
00005 #ifndef EventFilter_StorageManager_ExpirableQueue_h
00006 #define EventFilter_StorageManager_ExpirableQueue_h
00007 
00008 #include "EventFilter/StorageManager/interface/ConcurrentQueue.h"
00009 #include "EventFilter/StorageManager/interface/EnquingPolicyTag.h"
00010 #include "EventFilter/StorageManager/interface/QueueID.h"
00011 #include "EventFilter/StorageManager/interface/Utils.h"
00012 
00013 namespace stor
00014 {
00026   template <class T, class Policy>
00027   class ExpirableQueue
00028   {
00029   public:
00030     typedef Policy PolicyType; // publish template parameter
00031     typedef typename ConcurrentQueue<T, Policy>::SizeType SizeType;
00032     typedef typename Policy::ValueType ValueType;
00033 
00038     explicit ExpirableQueue
00039     (
00040       SizeType maxsize=std::numeric_limits<SizeType>::max(),
00041       utils::Duration_t stalenessInterval = boost::posix_time::seconds(120),
00042       utils::TimePoint_t now = utils::getCurrentTime()
00043     );
00052     bool deqNowait(ValueType&);
00053 
00061     SizeType enqNowait
00062     (
00063       T const&,
00064       const utils::TimePoint_t& now = utils::getCurrentTime()
00065     );
00066 
00070     void setStalenessInterval(const utils::Duration_t&);
00071 
00075     utils::Duration_t stalenessInterval() const;    
00076 
00080     SizeType clear();
00081 
00085     bool empty() const;
00086 
00090     bool full() const;
00091 
00095     bool stale(const utils::TimePoint_t&) const;
00096 
00100     SizeType size() const;
00101 
00108     bool clearIfStale(const utils::TimePoint_t&, SizeType& clearedEvents);
00109 
00110   private:
00111     typedef ConcurrentQueue<T, Policy> queue_t;
00112 
00113     queue_t events_;
00115     utils::Duration_t stalenessInterval_;
00117     utils::TimePoint_t stalenessTime_;
00118 
00119     /*
00120       The following are not implemented, to prevent copying and
00121       assignment.
00122      */
00123     ExpirableQueue(ExpirableQueue&);
00124     ExpirableQueue& operator=(ExpirableQueue&);
00125   };
00126 
00127   
00128   template <class T, class Policy>
00129   ExpirableQueue<T, Policy>::ExpirableQueue
00130   (
00131     SizeType maxsize,
00132     utils::Duration_t stalenessInterval,
00133     utils::TimePoint_t now
00134   ) :
00135     events_(maxsize),
00136     stalenessInterval_(stalenessInterval),
00137     stalenessTime_(now+stalenessInterval_)
00138   {
00139   }
00140 
00141   template <class T, class Policy>
00142   bool
00143   ExpirableQueue<T, Policy>::deqNowait(ValueType& event)
00144   {
00145     stalenessTime_ = utils::getCurrentTime() + stalenessInterval_;
00146     return events_.deqNowait(event);
00147   }
00148 
00149   template <class T, class Policy>
00150   typename ExpirableQueue<T, Policy>::SizeType
00151   ExpirableQueue<T, Policy>::enqNowait(T const& event, const utils::TimePoint_t& now)
00152   {
00153     if ( stale(now) )
00154     {
00155       events_.addExternallyDroppedEvents(1);
00156       return 1;
00157     }
00158     return events_.enqNowait(event);
00159   }  
00160 
00161   template <class T, class Policy>
00162   inline
00163   void
00164   ExpirableQueue<T, Policy>::setStalenessInterval(const utils::Duration_t& t)
00165   {
00166     stalenessInterval_ = t;
00167   }
00168 
00169   template <class T, class Policy>
00170   inline
00171   utils::Duration_t
00172   ExpirableQueue<T, Policy>::stalenessInterval() const
00173   {
00174     return stalenessInterval_;
00175   }
00176 
00177   template <class T, class Policy>
00178   inline
00179   typename ExpirableQueue<T, Policy>::SizeType
00180   ExpirableQueue<T, Policy>::clear()
00181   {
00182     return events_.clear();
00183   }  
00184 
00185   template <class T, class Policy>
00186   inline
00187   bool
00188   ExpirableQueue<T, Policy>::empty() const
00189   {
00190     return events_.empty();
00191   }
00192 
00193   template <class T, class Policy>
00194   inline
00195   typename ExpirableQueue<T, Policy>::SizeType
00196   ExpirableQueue<T, Policy>::size() const
00197   {
00198     return events_.size();
00199   }
00200 
00201   template <class T, class Policy>
00202   inline
00203   bool
00204   ExpirableQueue<T, Policy>::full() const
00205   {
00206     return events_.full();
00207   }
00208 
00209   template <class T, class Policy>
00210   inline
00211   bool
00212   ExpirableQueue<T, Policy>::stale(const utils::TimePoint_t& now) const
00213   {
00214     return (stalenessTime_ < now);
00215   }
00216 
00217   template <class T, class Policy>
00218   inline
00219   bool
00220   ExpirableQueue<T, Policy>::clearIfStale
00221   (
00222     const utils::TimePoint_t& now,
00223     SizeType& clearedEvents
00224   )
00225   {
00226     if (stalenessTime_ < now)
00227     {
00228       clearedEvents = clear();
00229       return true;
00230     }
00231     else
00232     {
00233       clearedEvents = 0;
00234       return false;
00235     }
00236   }
00237 
00238 } // namespace stor
00239   
00240 
00241 #endif // EventFilter_StorageManager_ExpirableQueue_h
00242 
00249