CMS 3D CMS Logo

/afs/cern.ch/work/a/aaltunda/public/www/CMSSW_5_3_13_patch3/src/EventFilter/StorageManager/interface/QueueCollection.h

Go to the documentation of this file.
00001 // $Id: QueueCollection.h,v 1.12 2011/03/07 15:31:32 mommsen Exp $
00003 
00004 #ifndef EventFilter_StorageManager_QueueCollection_h
00005 #define EventFilter_StorageManager_QueueCollection_h
00006 
00007 #include <vector>
00008 #include <limits>
00009 
00010 #include "boost/bind.hpp"
00011 #include "boost/thread/mutex.hpp"
00012 #include "boost/shared_ptr.hpp"
00013 
00014 #include "FWCore/Utilities/interface/Algorithms.h"
00015 
00016 #include "EventFilter/StorageManager/interface/ConcurrentQueue.h"
00017 #include "EventFilter/StorageManager/interface/ConsumerID.h"
00018 #include "EventFilter/StorageManager/interface/EnquingPolicyTag.h"
00019 #include "EventFilter/StorageManager/interface/EventConsumerRegistrationInfo.h"
00020 #include "EventFilter/StorageManager/interface/Exception.h"
00021 #include "EventFilter/StorageManager/interface/ExpirableQueue.h"
00022 #include "EventFilter/StorageManager/interface/QueueID.h"
00023 #include "EventFilter/StorageManager/interface/RegistrationInfoBase.h"
00024 #include "EventFilter/StorageManager/interface/Utils.h"
00025 #include "EventFilter/StorageManager/interface/ConsumerMonitorCollection.h"
00026 
00027 
00028 namespace stor {
00029 
00043   template <class T>
00044   class QueueCollection
00045   {
00046   public:
00047     typedef typename ExpirableQueue<T, RejectNewest<T> >::SizeType SizeType;
00048     typedef typename ExpirableQueue<T, RejectNewest<T> >::ValueType ValueType;
00049 
00053     QueueCollection(ConsumerMonitorCollection&);
00054 
00059     void setExpirationInterval(const QueueID&, const utils::Duration_t&);
00060     utils::Duration_t getExpirationInterval(const QueueID& id) const;
00061 
00067     QueueID createQueue
00068     (
00069       const EventConsRegPtr,
00070       const utils::TimePoint_t& now = utils::getCurrentTime()
00071     );
00072     QueueID createQueue
00073     (
00074       const RegPtr,
00075       const utils::TimePoint_t& now = utils::getCurrentTime()
00076     );
00077     
00082     void removeQueues();
00083 
00087     SizeType size() const;
00088     
00092     void addEvent(T const&);
00093 
00099     ValueType popEvent(const QueueID&);
00100 
00106     ValueType popEvent(const ConsumerID&);
00107 
00111     void clearQueue(const QueueID&);
00112 
00116     bool clearStaleQueues(const utils::TimePoint_t&);
00117 
00121     void clearQueues();
00122 
00126     bool empty(const QueueID&) const;
00127 
00131     bool full(const QueueID&) const;
00132 
00137     bool stale(const QueueID&, const utils::TimePoint_t&) const;
00138 
00142     bool allQueuesStale(const utils::TimePoint_t&) const;
00143 
00147     SizeType size(const QueueID&) const;
00148 
00149 
00150   private:
00151     typedef ExpirableQueue<T, RejectNewest<T> > ExpirableDiscardNewQueue_t;
00152     typedef ExpirableQueue<T, KeepNewest<T> > ExpirableDiscardOldQueue_t;
00153 
00154     typedef boost::shared_ptr<ExpirableDiscardNewQueue_t> 
00155             ExpirableDiscardNewQueuePtr;
00156     typedef boost::shared_ptr<ExpirableDiscardOldQueue_t> 
00157             ExpirableDiscardOldQueuePtr;
00158 
00159     // These typedefs need to be changed when we move to Boost 1.38
00160     typedef boost::mutex::scoped_lock ReadLock_t;
00161     typedef boost::mutex::scoped_lock WriteLock_t;
00162     typedef boost::mutex ReadWriteMutex_t;
00163 
00164     // It is possible that one mutex would be better than these
00165     // three. Only profiling the application will tell for sure.
00166     mutable ReadWriteMutex_t protectDiscardNewQueues_;
00167     mutable ReadWriteMutex_t protectDiscardOldQueues_;
00168     mutable ReadWriteMutex_t protectLookup_;
00169 
00170     typedef std::vector<ExpirableDiscardNewQueuePtr> DiscardNewQueues_t;
00171     DiscardNewQueues_t discardNewQueues_;
00172     typedef std::vector<ExpirableDiscardOldQueuePtr> DiscardOldQueues_t;
00173     DiscardOldQueues_t discardOldQueues_;
00174 
00175     typedef std::map<ConsumerID, QueueID> IDLookup_t;
00176     IDLookup_t queueIdLookup_;
00177     typedef std::map<EventConsRegPtr, QueueID,
00178                      utils::ptrComp<EventConsumerRegistrationInfo>
00179                      > ReginfoLookup_t;
00180     ReginfoLookup_t queueReginfoLookup_;
00181     ConsumerMonitorCollection& consumerMonitorCollection_;
00182 
00183     /*
00184       These functions are declared private and not implemented to
00185       prevent their use.
00186     */
00187     QueueCollection(QueueCollection const&);
00188     QueueCollection& operator=(QueueCollection const&);
00189 
00190     /*
00191       These are helper functions used in the implementation.
00192     */
00193     
00194     SizeType enqueueEvent_(QueueID const&, T const&, utils::TimePoint_t const&);
00195     QueueID getQueue(const RegPtr, const utils::TimePoint_t&);
00196 
00197   };
00198 
00199   //------------------------------------------------------------------
00200   // Implementation follows
00201   //------------------------------------------------------------------
00202 
00209   namespace
00210   {
00211     void throwUnknownQueueid(const QueueID& id)
00212     {
00213       std::ostringstream msg;
00214       msg << "Unable to retrieve queue with signature: ";
00215       msg << id;
00216       XCEPT_RAISE(exception::UnknownQueueId, msg.str());
00217     }
00218   } // anonymous namespace
00219   
00220   template <class T>
00221   QueueCollection<T>::QueueCollection(ConsumerMonitorCollection& ccp ) :
00222   protectDiscardNewQueues_(),
00223   protectDiscardOldQueues_(),
00224   protectLookup_(),
00225   discardNewQueues_(),
00226   discardOldQueues_(),
00227   queueIdLookup_(),
00228   consumerMonitorCollection_( ccp )
00229   { }
00230   
00231   template <class T>
00232   void
00233   QueueCollection<T>::setExpirationInterval(
00234     const QueueID& id,
00235     const utils::Duration_t& interval
00236   )
00237   {
00238     switch (id.policy()) 
00239     {
00240       case enquing_policy::DiscardNew:
00241       {
00242         ReadLock_t lock(protectDiscardNewQueues_);
00243         try
00244         {
00245           discardNewQueues_.at(id.index())->setStalenessInterval(interval);
00246         }
00247         catch(std::out_of_range)
00248         {
00249           throwUnknownQueueid(id);
00250         }
00251         break;
00252       }
00253       case enquing_policy::DiscardOld:
00254       {
00255         ReadLock_t lock(protectDiscardOldQueues_);
00256         try
00257         {
00258           discardOldQueues_.at(id.index())->setStalenessInterval(interval);
00259         }
00260         catch(std::out_of_range)
00261         {
00262           throwUnknownQueueid(id);
00263         }
00264         break;
00265       }
00266       default:
00267       {
00268         throwUnknownQueueid(id);
00269         // does not return, no break needed
00270       }
00271     }
00272   }
00273   
00274   template <class T>
00275   utils::Duration_t
00276   QueueCollection<T>::getExpirationInterval(const QueueID& id) const
00277   {
00278     utils::Duration_t result = boost::posix_time::seconds(0);
00279     switch (id.policy()) 
00280     {
00281       case enquing_policy::DiscardNew:
00282       {
00283         ReadLock_t lock(protectDiscardNewQueues_);
00284         try
00285         {
00286           result = discardNewQueues_.at(id.index())->stalenessInterval();
00287         }
00288         catch(std::out_of_range)
00289         {
00290           throwUnknownQueueid(id);
00291         }
00292         break;
00293       }
00294       case enquing_policy::DiscardOld:
00295       {
00296         ReadLock_t lock(protectDiscardOldQueues_);
00297         try
00298         {
00299           result = discardOldQueues_.at(id.index())->stalenessInterval();
00300         }
00301         catch(std::out_of_range)
00302         {
00303           throwUnknownQueueid(id);
00304         }
00305         break;
00306       }
00307       default:
00308       {
00309         throwUnknownQueueid(id);
00310         // does not return, no break needed
00311       }
00312     }
00313     return result;
00314   }
00315   
00316   template <class T>
00317   QueueID
00318   QueueCollection<T>::createQueue
00319   (
00320     const EventConsRegPtr reginfo,
00321     const utils::TimePoint_t& now
00322   )
00323   {
00324     QueueID qid;
00325     const ConsumerID& cid = reginfo->consumerId();
00326     
00327     // We don't proceed if the given ConsumerID is invalid, or if
00328     // we've already seen that value before.
00329     if (!cid.isValid()) return qid;
00330     WriteLock_t lockLookup(protectLookup_);
00331     if (queueIdLookup_.find(cid) != queueIdLookup_.end()) return qid;
00332     
00333     if ( reginfo->uniqueEvents() )
00334     {
00335       // another consumer wants to share the
00336       // queue to get unique events.
00337       ReginfoLookup_t::const_iterator it =
00338         queueReginfoLookup_.find(reginfo);
00339       if ( it != queueReginfoLookup_.end() )
00340       {
00341         qid = it->second;
00342         queueIdLookup_[cid] = qid;
00343         return qid;
00344       }
00345     }
00346     
00347     qid = getQueue(reginfo, now);
00348     queueIdLookup_[cid] = qid;
00349     queueReginfoLookup_[reginfo] = qid;
00350     return qid;
00351   }
00352   
00353   template <class T>
00354   QueueID 
00355   QueueCollection<T>::createQueue
00356   (
00357     const RegPtr reginfo,
00358     const utils::TimePoint_t& now
00359   )
00360   {
00361     QueueID qid;
00362     const ConsumerID& cid = reginfo->consumerId();
00363 
00364     // We don't proceed if the given ConsumerID is invalid, or if
00365     // we've already seen that value before.
00366     if (!cid.isValid()) return qid;
00367     WriteLock_t lockLookup(protectLookup_);
00368     if (queueIdLookup_.find(cid) != queueIdLookup_.end()) return qid;
00369     qid = getQueue(reginfo, now);
00370     queueIdLookup_[cid] = qid;
00371     return qid;
00372   }
00373   
00374   template <class T>
00375   QueueID
00376   QueueCollection<T>::getQueue
00377   (
00378     const RegPtr reginfo,
00379     const utils::TimePoint_t& now
00380   )
00381   {
00382     if (reginfo->queuePolicy() == enquing_policy::DiscardNew)
00383     {
00384       WriteLock_t lock(protectDiscardNewQueues_);
00385       ExpirableDiscardNewQueuePtr newborn(
00386         new ExpirableDiscardNewQueue_t(
00387           reginfo->queueSize(),
00388           reginfo->secondsToStale(),
00389           now
00390         )
00391       );
00392       discardNewQueues_.push_back(newborn);
00393       return QueueID(
00394         enquing_policy::DiscardNew,
00395         discardNewQueues_.size()-1
00396       );
00397     }
00398     else if (reginfo->queuePolicy() == enquing_policy::DiscardOld)
00399     {
00400       WriteLock_t lock(protectDiscardOldQueues_);
00401       ExpirableDiscardOldQueuePtr newborn(
00402         new ExpirableDiscardOldQueue_t(
00403           reginfo->queueSize(),
00404           reginfo->secondsToStale(),
00405           now
00406         )
00407       );
00408       discardOldQueues_.push_back(newborn);
00409       return QueueID(
00410         enquing_policy::DiscardOld,
00411         discardOldQueues_.size()-1
00412       );
00413     }
00414     return QueueID();
00415   }
00416   
00417   template <class T>
00418   void
00419   QueueCollection<T>::removeQueues()
00420   {
00421     clearQueues();
00422     
00423     WriteLock_t lockDiscardNew(protectDiscardNewQueues_);
00424     WriteLock_t lockDiscardOld(protectDiscardOldQueues_);
00425     discardNewQueues_.clear();
00426     discardOldQueues_.clear();    
00427 
00428     WriteLock_t lockLookup(protectLookup_);
00429     queueIdLookup_.clear();
00430   }
00431   
00432   template <class T>
00433   typename QueueCollection<T>::SizeType
00434   QueueCollection<T>::size() const
00435   {
00436     // We obtain locks not because it is unsafe to read the sizes
00437     // without locking, but because we want consistent values.
00438     ReadLock_t lockDiscardNew(protectDiscardNewQueues_);
00439     ReadLock_t lockDiscardOld(protectDiscardOldQueues_);
00440     return discardNewQueues_.size() + discardOldQueues_.size();
00441   }
00442   
00443   template <class T>
00444   void 
00445   QueueCollection<T>::addEvent(T const& event)
00446   {
00447     ReadLock_t lockDiscardNew(protectDiscardNewQueues_);
00448     ReadLock_t lockDiscardOld(protectDiscardOldQueues_);
00449     
00450     utils::TimePoint_t now = utils::getCurrentTime();
00451     QueueIDs routes = event.getEventConsumerTags();
00452     
00453     for( QueueIDs::const_iterator it = routes.begin(), itEnd = routes.end();
00454          it != itEnd; ++it )
00455     {
00456       const SizeType droppedEvents = enqueueEvent_( *it, event, now );
00457       if ( droppedEvents > 0 )
00458         consumerMonitorCollection_.addDroppedEvents( *it, droppedEvents );
00459       else
00460         consumerMonitorCollection_.addQueuedEventSample( *it, event.totalDataSize() );
00461     }
00462   }
00463   
00464   template <class T>
00465   typename QueueCollection<T>::ValueType
00466   QueueCollection<T>::popEvent(const QueueID& id)
00467   {
00468     ValueType result;
00469     switch (id.policy()) 
00470     {
00471       case enquing_policy::DiscardNew:
00472       {
00473         ReadLock_t lock(protectDiscardNewQueues_);
00474         try
00475         {
00476           if ( discardNewQueues_.at(id.index())->deqNowait(result) )
00477             consumerMonitorCollection_.addServedEventSample(id,
00478               result.first.totalDataSize());
00479         }
00480         catch(std::out_of_range)
00481         {
00482           throwUnknownQueueid(id);
00483         }
00484         break;
00485       }
00486       case enquing_policy::DiscardOld:
00487       {
00488         ReadLock_t lock(protectDiscardOldQueues_);
00489         try
00490         {
00491           if ( discardOldQueues_.at(id.index())->deqNowait(result) )
00492             consumerMonitorCollection_.addServedEventSample(id,
00493               result.first.totalDataSize());
00494         }
00495         catch(std::out_of_range)
00496         {
00497           throwUnknownQueueid(id);
00498         }
00499         break;
00500       }
00501       default:
00502       {
00503         throwUnknownQueueid(id);
00504         // does not return, no break needed
00505       }
00506     }
00507     return result;
00508   }
00509   
00510   template <class T>
00511   typename QueueCollection<T>::ValueType
00512   QueueCollection<T>::popEvent(const ConsumerID& cid)
00513   {
00514     ValueType result;
00515     if (!cid.isValid()) return result;
00516     QueueID id;
00517     {
00518       // Scope to control lifetime of lock.
00519       ReadLock_t lock(protectLookup_);
00520       IDLookup_t::const_iterator i = queueIdLookup_.find(cid);
00521       if (i == queueIdLookup_.end()) return result;
00522       id = i->second;
00523     }
00524     return popEvent(id);
00525   }
00526   
00527   template <class T>
00528   void
00529   QueueCollection<T>::clearQueue(const QueueID& id)
00530   {
00531     switch (id.policy()) 
00532     {
00533       case enquing_policy::DiscardNew:
00534       {
00535         ReadLock_t lock(protectDiscardNewQueues_);
00536         try
00537         {
00538           const SizeType clearedEvents =
00539             discardNewQueues_.at(id.index())->clear();
00540           
00541           consumerMonitorCollection_.addDroppedEvents(
00542             id, clearedEvents);
00543         }
00544         catch(std::out_of_range)
00545         {
00546           throwUnknownQueueid(id);
00547         }
00548         break;
00549       }
00550       case enquing_policy::DiscardOld:
00551       {
00552         ReadLock_t lock(protectDiscardOldQueues_);
00553         try
00554         {
00555           const SizeType clearedEvents = 
00556             discardOldQueues_.at(id.index())->clear();
00557           
00558           consumerMonitorCollection_.addDroppedEvents(
00559             id, clearedEvents);
00560         }
00561         catch(std::out_of_range)
00562         {
00563           throwUnknownQueueid(id);
00564         }
00565         break;
00566       }
00567       default:
00568       {
00569         throwUnknownQueueid(id);
00570         // does not return, no break needed
00571       }
00572     }
00573   }
00574   
00575   template <class T>
00576   bool
00577   QueueCollection<T>::clearStaleQueues(const utils::TimePoint_t& now)
00578   {
00579     bool result(false);
00580     SizeType clearedEvents;
00581     
00582     {
00583       ReadLock_t lock(protectDiscardNewQueues_);
00584       const SizeType numQueues = discardNewQueues_.size();
00585       for (SizeType i = 0; i < numQueues; ++i)
00586       {
00587         if ( discardNewQueues_[i]->clearIfStale(now, clearedEvents) )
00588         {
00589           consumerMonitorCollection_.addDroppedEvents(
00590             QueueID(enquing_policy::DiscardNew, i),
00591             clearedEvents
00592           );
00593           result = true;
00594         }
00595       }
00596     }
00597     {
00598       ReadLock_t lock(protectDiscardOldQueues_);
00599       const SizeType numQueues = discardOldQueues_.size();
00600       for (SizeType i = 0; i < numQueues; ++i)
00601       {
00602         if ( discardOldQueues_[i]->clearIfStale(now, clearedEvents) )
00603         {
00604           consumerMonitorCollection_.addDroppedEvents(
00605             QueueID(enquing_policy::DiscardOld, i),
00606             clearedEvents
00607           );
00608           result = true;
00609         }
00610       }
00611     }
00612     return result;
00613   }
00614   
00615   template <class T>
00616   void
00617   QueueCollection<T>::clearQueues()
00618   {
00619     {
00620       ReadLock_t lock(protectDiscardNewQueues_);
00621       const SizeType numQueues = discardNewQueues_.size();
00622       for (SizeType i = 0; i < numQueues; ++i)
00623       {
00624         const SizeType clearedEvents =
00625           discardNewQueues_[i]->clear();
00626 
00627         consumerMonitorCollection_.addDroppedEvents(
00628           QueueID(enquing_policy::DiscardNew, i),
00629           clearedEvents
00630         );
00631       }
00632     }
00633     {
00634       ReadLock_t lock(protectDiscardOldQueues_);
00635       const SizeType numQueues = discardOldQueues_.size();
00636       for (SizeType i = 0; i < numQueues; ++i)
00637       {
00638         const SizeType clearedEvents =
00639           discardOldQueues_[i]->clear();
00640 
00641         consumerMonitorCollection_.addDroppedEvents(
00642           QueueID(enquing_policy::DiscardOld, i),
00643           clearedEvents
00644         );
00645       }
00646     }
00647   }
00648   
00649   template <class T>
00650   bool
00651   QueueCollection<T>::empty(const QueueID& id) const
00652   {
00653     bool result(true);
00654     switch (id.policy()) 
00655     {
00656       case enquing_policy::DiscardNew:
00657       {
00658         ReadLock_t lock(protectDiscardNewQueues_);
00659         try
00660         {
00661           result = discardNewQueues_.at(id.index())->empty();
00662         }
00663         catch(std::out_of_range)
00664         {
00665           throwUnknownQueueid(id);
00666         }
00667         break;
00668       }
00669       case enquing_policy::DiscardOld:
00670       {
00671         ReadLock_t lock(protectDiscardOldQueues_);
00672         try
00673         {
00674           result = discardOldQueues_.at(id.index())->empty();
00675         }
00676         catch(std::out_of_range)
00677         {
00678           throwUnknownQueueid(id);
00679         }
00680         break;
00681       }
00682       default:
00683       {
00684         throwUnknownQueueid(id);
00685         // does not return, no break needed
00686       }
00687     }
00688     return result;
00689   }
00690   
00691   template <class T>
00692   bool
00693   QueueCollection<T>::full(const QueueID& id) const
00694   {
00695     bool result(true);
00696     switch (id.policy()) 
00697     {
00698       case enquing_policy::DiscardNew:
00699       {
00700         ReadLock_t lock(protectDiscardNewQueues_);
00701         try
00702         {
00703           result = discardNewQueues_.at(id.index())->full();
00704         }
00705         catch(std::out_of_range)
00706         {
00707           throwUnknownQueueid(id);
00708         }
00709         break;
00710       }
00711       case enquing_policy::DiscardOld:
00712       {
00713         ReadLock_t lock(protectDiscardOldQueues_);
00714         try
00715         {
00716           result = discardOldQueues_.at(id.index())->full();
00717         }
00718         catch(std::out_of_range)
00719         {
00720           throwUnknownQueueid(id);
00721         }
00722         break;
00723       }
00724       default:
00725       {
00726         throwUnknownQueueid(id);
00727         // does not return, no break needed
00728       }
00729     }
00730     return result;
00731   }
00732   
00733   template <class T>
00734   bool
00735   QueueCollection<T>::stale(const QueueID& id, const utils::TimePoint_t& now) const
00736   {
00737     bool result(true);
00738     switch (id.policy()) 
00739     {
00740       case enquing_policy::DiscardNew:
00741       {
00742         ReadLock_t lock(protectDiscardNewQueues_);
00743         try
00744         {
00745           result = discardNewQueues_.at(id.index())->stale(now);
00746         }
00747         catch(std::out_of_range)
00748         {
00749           throwUnknownQueueid(id);
00750         }
00751         break;
00752       }
00753       case enquing_policy::DiscardOld:
00754       {
00755         ReadLock_t lock(protectDiscardOldQueues_);
00756         try
00757         {
00758           result = discardOldQueues_.at(id.index())->stale(now);
00759         }
00760         catch(std::out_of_range)
00761         {
00762           throwUnknownQueueid(id);
00763         }
00764         break;
00765       }
00766       default:
00767       {
00768         throwUnknownQueueid(id);
00769         // does not return, no break needed
00770       }
00771     }
00772     return result;
00773   }
00774   
00775   template <class T>
00776   bool
00777   QueueCollection<T>::allQueuesStale(const utils::TimePoint_t& now) const
00778   {
00779     {
00780       ReadLock_t lock(protectDiscardNewQueues_);
00781       const SizeType numQueues = discardNewQueues_.size();
00782       for (SizeType i = 0; i < numQueues; ++i)
00783       {
00784         if ( ! discardNewQueues_[i]->stale(now) ) return false;
00785       }
00786     }
00787     {
00788       ReadLock_t lock(protectDiscardOldQueues_);
00789       const SizeType numQueues = discardOldQueues_.size();
00790       for (SizeType i = 0; i < numQueues; ++i)
00791       {
00792         if ( ! discardOldQueues_[i]->stale(now) ) return false;
00793       }
00794     }
00795     return true;
00796   }
00797   
00798   template <class T>
00799   typename QueueCollection<T>::SizeType
00800   QueueCollection<T>::size(const QueueID& id) const
00801   {
00802     SizeType result = 0;
00803     switch (id.policy()) 
00804     {
00805       case enquing_policy::DiscardNew:
00806       {
00807         ReadLock_t lock(protectDiscardNewQueues_);
00808         try
00809         {
00810           result = discardNewQueues_.at(id.index())->size();
00811         }
00812         catch(std::out_of_range)
00813         {
00814           throwUnknownQueueid(id);
00815         }
00816         break;
00817       }
00818       case enquing_policy::DiscardOld:
00819       {
00820         ReadLock_t lock(protectDiscardOldQueues_);
00821         try
00822         {
00823           result = discardOldQueues_.at(id.index())->size();
00824         }
00825         catch(std::out_of_range)
00826         {
00827           throwUnknownQueueid(id);
00828         }
00829         break;
00830       }
00831       default:
00832       {
00833         throwUnknownQueueid( id );
00834         // does not return, no break needed
00835       }
00836     }
00837     return result;
00838   }
00839   
00840   template <class T>
00841   typename QueueCollection<T>::SizeType
00842   QueueCollection<T>::enqueueEvent_
00843   (
00844     QueueID const& id, 
00845     T const& event,
00846     utils::TimePoint_t const& now
00847   )
00848   {
00849     switch (id.policy())
00850     {
00851       case enquing_policy::DiscardNew:
00852       {
00853         try
00854         {
00855           return discardNewQueues_.at(id.index())->enqNowait(event,now);
00856         }
00857         catch(std::out_of_range)
00858         {
00859           throwUnknownQueueid(id);
00860         }
00861         break;
00862       }
00863       case enquing_policy::DiscardOld:
00864       {
00865         try
00866         {
00867           return discardOldQueues_.at(id.index())->enqNowait(event,now);
00868         }
00869         catch(std::out_of_range)
00870         {
00871           throwUnknownQueueid(id);
00872         }
00873         break;
00874       }
00875       default:
00876       {
00877         throwUnknownQueueid(id);
00878         // does not return, no break needed
00879       }
00880     }
00881     return 1; // event could not be entered
00882   }
00883   
00884 } // namespace stor
00885 
00886 #endif // EventFilter_StorageManager_QueueCollection_h 
00887