Go to the documentation of this file.00001
00003
00004
00005 #ifndef EventFilter_StorageManager_ExpirableQueue_h
00006 #define EventFilter_StorageManager_ExpirableQueue_h
00007
00008 #include "EventFilter/StorageManager/interface/ConcurrentQueue.h"
00009 #include "EventFilter/StorageManager/interface/EnquingPolicyTag.h"
00010 #include "EventFilter/StorageManager/interface/QueueID.h"
00011 #include "EventFilter/StorageManager/interface/Utils.h"
00012
00013 namespace stor
00014 {
00026 template <class T, class Policy>
00027 class ExpirableQueue
00028 {
00029 public:
00030 typedef Policy PolicyType;
00031 typedef typename ConcurrentQueue<T, Policy>::SizeType SizeType;
00032 typedef typename Policy::ValueType ValueType;
00033
00038 explicit ExpirableQueue
00039 (
00040 SizeType maxsize=std::numeric_limits<SizeType>::max(),
00041 utils::Duration_t stalenessInterval = boost::posix_time::seconds(120),
00042 utils::TimePoint_t now = utils::getCurrentTime()
00043 );
00052 bool deqNowait(ValueType&);
00053
00061 SizeType enqNowait
00062 (
00063 T const&,
00064 const utils::TimePoint_t& now = utils::getCurrentTime()
00065 );
00066
00070 void setStalenessInterval(const utils::Duration_t&);
00071
00075 utils::Duration_t stalenessInterval() const;
00076
00080 SizeType clear();
00081
00085 bool empty() const;
00086
00090 bool full() const;
00091
00095 bool stale(const utils::TimePoint_t&) const;
00096
00100 SizeType size() const;
00101
00108 bool clearIfStale(const utils::TimePoint_t&, SizeType& clearedEvents);
00109
00110 private:
00111 typedef ConcurrentQueue<T, Policy> queue_t;
00112
00113 queue_t events_;
00115 utils::Duration_t stalenessInterval_;
00117 utils::TimePoint_t stalenessTime_;
00118
00119
00120
00121
00122
00123 ExpirableQueue(ExpirableQueue&);
00124 ExpirableQueue& operator=(ExpirableQueue&);
00125 };
00126
00127
00128 template <class T, class Policy>
00129 ExpirableQueue<T, Policy>::ExpirableQueue
00130 (
00131 SizeType maxsize,
00132 utils::Duration_t stalenessInterval,
00133 utils::TimePoint_t now
00134 ) :
00135 events_(maxsize),
00136 stalenessInterval_(stalenessInterval),
00137 stalenessTime_(now+stalenessInterval_)
00138 {
00139 }
00140
00141 template <class T, class Policy>
00142 bool
00143 ExpirableQueue<T, Policy>::deqNowait(ValueType& event)
00144 {
00145 stalenessTime_ = utils::getCurrentTime() + stalenessInterval_;
00146 return events_.deqNowait(event);
00147 }
00148
00149 template <class T, class Policy>
00150 typename ExpirableQueue<T, Policy>::SizeType
00151 ExpirableQueue<T, Policy>::enqNowait(T const& event, const utils::TimePoint_t& now)
00152 {
00153 if ( stale(now) )
00154 {
00155 events_.addExternallyDroppedEvents(1);
00156 return 1;
00157 }
00158 return events_.enqNowait(event);
00159 }
00160
00161 template <class T, class Policy>
00162 inline
00163 void
00164 ExpirableQueue<T, Policy>::setStalenessInterval(const utils::Duration_t& t)
00165 {
00166 stalenessInterval_ = t;
00167 }
00168
00169 template <class T, class Policy>
00170 inline
00171 utils::Duration_t
00172 ExpirableQueue<T, Policy>::stalenessInterval() const
00173 {
00174 return stalenessInterval_;
00175 }
00176
00177 template <class T, class Policy>
00178 inline
00179 typename ExpirableQueue<T, Policy>::SizeType
00180 ExpirableQueue<T, Policy>::clear()
00181 {
00182 return events_.clear();
00183 }
00184
00185 template <class T, class Policy>
00186 inline
00187 bool
00188 ExpirableQueue<T, Policy>::empty() const
00189 {
00190 return events_.empty();
00191 }
00192
00193 template <class T, class Policy>
00194 inline
00195 typename ExpirableQueue<T, Policy>::SizeType
00196 ExpirableQueue<T, Policy>::size() const
00197 {
00198 return events_.size();
00199 }
00200
00201 template <class T, class Policy>
00202 inline
00203 bool
00204 ExpirableQueue<T, Policy>::full() const
00205 {
00206 return events_.full();
00207 }
00208
00209 template <class T, class Policy>
00210 inline
00211 bool
00212 ExpirableQueue<T, Policy>::stale(const utils::TimePoint_t& now) const
00213 {
00214 return (stalenessTime_ < now);
00215 }
00216
00217 template <class T, class Policy>
00218 inline
00219 bool
00220 ExpirableQueue<T, Policy>::clearIfStale
00221 (
00222 const utils::TimePoint_t& now,
00223 SizeType& clearedEvents
00224 )
00225 {
00226 if (stalenessTime_ < now)
00227 {
00228 clearedEvents = clear();
00229 return true;
00230 }
00231 else
00232 {
00233 clearedEvents = 0;
00234 return false;
00235 }
00236 }
00237
00238 }
00239
00240
00241 #endif // EventFilter_StorageManager_ExpirableQueue_h
00242
00249