CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_6_1_2_SLHC2/src/FWCore/Utilities/interface/SingleConsumerQ.h

Go to the documentation of this file.
00001 #ifndef FWCore_Utilities_SingleConsumerQ_h
00002 #define FWCore_Utilities_SingleConsumerQ_h
00003 
00004 // -*- C++ -*-
00005 
00006 /*
00007  A bounded queue for use in a multi-threaded producer/consumer application.
00008  This is a simple design.  It is only meant to be used where there is
00009  one consumer and one or more producers using the a queue instance.
00010 
00011  The problem with multiple consumers is the separate front/pop
00012  member functions.  If they are pulled together into one function,
00013  multiple consumers may be possible, but exception safety would then
00014  be a problem - popping an item off the queue to be held as a local
00015  variable, followed by its removal from the queue.  Having fixed size
00016  buffers within a fixed size pool and using a circular buffer as the
00017  queue alleviates most of this problem because exceptions will not
00018  occur during manipulation.  The only problem left to be checked is
00019  how (or if) the boost mutex manipulation can throw and when.
00020 
00021  Note: the current implementation has no protection again unsigned int
00022  overflows
00023 
00024  Missing:
00025   - the ring buffer is really not used to its fullest extent
00026   - the buffer sizes are fixed and cannot grow
00027   - a simple Buffer object is returned that has the pointer and len
00028     separate.  The length should be stored as the first word of the
00029     buffer itself
00030   - timeouts for consumer
00031   - good way to signal to consumer to end
00032   - keeping the instance of this thing around until all using threads are
00033     done with it
00034 
00035 */
00036 
00037 #include <vector>
00038 #include "boost/thread/mutex.hpp"
00039 #include "boost/thread/condition.hpp"
00040 
00041 namespace edm {
00042 
00043   class SingleConsumerQ
00044   {
00045   public:
00046     struct Buffer
00047     {
00048       Buffer():ptr_(),len_() { }
00049       Buffer(void* p,int len):ptr_(p),len_(len) { }
00050 
00051       void* ptr_;
00052       int len_;
00053     };
00054 
00055     SingleConsumerQ(int max_event_size, int max_queue_depth);
00056     ~SingleConsumerQ();
00057 
00058     struct ConsumerType
00059     {
00060       static SingleConsumerQ::Buffer get(SingleConsumerQ& b)
00061       { return b.getConsumerBuffer(); }
00062       static void release(SingleConsumerQ& b, void* v)
00063       { b.releaseConsumerBuffer(v); }
00064       static void commit(SingleConsumerQ& b, void* v,int size)
00065       { b.commitConsumerBuffer(v,size); }
00066     };
00067     struct ProducerType
00068     {
00069       static SingleConsumerQ::Buffer get(SingleConsumerQ& b)
00070       { return b.getProducerBuffer(); }
00071       static void release(SingleConsumerQ& b, void* v)
00072       { b.releaseProducerBuffer(v); }
00073       static void commit(SingleConsumerQ& b, void* v,int size)
00074       { b.commitProducerBuffer(v,size); }
00075     };
00076 
00077     template <class T>
00078     class OperateBuffer
00079     {
00080     public:
00081       explicit OperateBuffer(SingleConsumerQ& b):
00082         b_(b),v_(T::get(b)),committed_(false) { }
00083       ~OperateBuffer()
00084       { if(!committed_) T::release(b_,v_.ptr_); }
00085 
00086       void* buffer() const { return v_.ptr_; }
00087       int size() const { return v_.len_; }
00088       void commit(int theSize=0) { T::commit(b_, v_.ptr_, theSize); committed_=true; }
00089 
00090     private:
00091       SingleConsumerQ& b_;
00092       SingleConsumerQ::Buffer v_;
00093       bool committed_;
00094     };
00095 
00096     typedef OperateBuffer<ConsumerType> ConsumerBuffer;
00097     typedef OperateBuffer<ProducerType> ProducerBuffer;
00098 
00099     Buffer getProducerBuffer();
00100     void releaseProducerBuffer(void*);
00101     void commitProducerBuffer(void*,int);
00102 
00103     Buffer getConsumerBuffer();
00104     void releaseConsumerBuffer(void*);
00105     void commitConsumerBuffer(void*,int);
00106 
00107     int maxEventSize() const { return max_event_size_; }
00108     int maxQueueDepth() const { return max_queue_depth_; }
00109 
00110   private:
00111     // no copy
00112     SingleConsumerQ(const SingleConsumerQ&);
00113 
00114     // the memory for the buffers
00115     typedef std::vector<char> ByteArray;
00116     // the pool of buffers
00117     typedef std::vector<void*> Pool;
00118     // the queue
00119     typedef std::vector<Buffer> Queue;
00120 
00121     int max_event_size_;
00122     int max_queue_depth_;
00123     int pos_; // use pool as stack of avaiable buffers
00124     ByteArray mem_;
00125     Pool buffer_pool_;
00126     Queue queue_;
00127     unsigned int fpos_, bpos_; // positions for queue - front and back
00128 
00129     boost::mutex pool_lock_;
00130     boost::mutex queue_lock_;
00131     boost::condition pool_cond_;
00132     boost::condition pop_cond_;
00133     boost::condition push_cond_;
00134 
00135   };
00136 
00137 
00138 }
00139 #endif