Go to the documentation of this file.00001 #ifndef FWCore_Utilities_SingleConsumerQ_h
00002 #define FWCore_Utilities_SingleConsumerQ_h
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037 #include <vector>
00038 #include "boost/thread/mutex.hpp"
00039 #include "boost/thread/condition.hpp"
00040
00041 namespace edm {
00042
00043 class SingleConsumerQ
00044 {
00045 public:
00046 struct Buffer
00047 {
00048 Buffer():ptr_(),len_() { }
00049 Buffer(void* p,int len):ptr_(p),len_(len) { }
00050
00051 void* ptr_;
00052 int len_;
00053 };
00054
00055 SingleConsumerQ(int max_event_size, int max_queue_depth);
00056 ~SingleConsumerQ();
00057
00058 struct ConsumerType
00059 {
00060 static SingleConsumerQ::Buffer get(SingleConsumerQ& b)
00061 { return b.getConsumerBuffer(); }
00062 static void release(SingleConsumerQ& b, void* v)
00063 { b.releaseConsumerBuffer(v); }
00064 static void commit(SingleConsumerQ& b, void* v,int size)
00065 { b.commitConsumerBuffer(v,size); }
00066 };
00067 struct ProducerType
00068 {
00069 static SingleConsumerQ::Buffer get(SingleConsumerQ& b)
00070 { return b.getProducerBuffer(); }
00071 static void release(SingleConsumerQ& b, void* v)
00072 { b.releaseProducerBuffer(v); }
00073 static void commit(SingleConsumerQ& b, void* v,int size)
00074 { b.commitProducerBuffer(v,size); }
00075 };
00076
00077 template <class T>
00078 class OperateBuffer
00079 {
00080 public:
00081 explicit OperateBuffer(SingleConsumerQ& b):
00082 b_(b),v_(T::get(b)),committed_(false) { }
00083 ~OperateBuffer()
00084 { if(!committed_) T::release(b_,v_.ptr_); }
00085
00086 void* buffer() const { return v_.ptr_; }
00087 int size() const { return v_.len_; }
00088 void commit(int theSize=0) { T::commit(b_, v_.ptr_, theSize); committed_=true; }
00089
00090 private:
00091 SingleConsumerQ& b_;
00092 SingleConsumerQ::Buffer v_;
00093 bool committed_;
00094 };
00095
00096 typedef OperateBuffer<ConsumerType> ConsumerBuffer;
00097 typedef OperateBuffer<ProducerType> ProducerBuffer;
00098
00099 Buffer getProducerBuffer();
00100 void releaseProducerBuffer(void*);
00101 void commitProducerBuffer(void*,int);
00102
00103 Buffer getConsumerBuffer();
00104 void releaseConsumerBuffer(void*);
00105 void commitConsumerBuffer(void*,int);
00106
00107 int maxEventSize() const { return max_event_size_; }
00108 int maxQueueDepth() const { return max_queue_depth_; }
00109
00110 private:
00111
00112 SingleConsumerQ(const SingleConsumerQ&);
00113
00114
00115 typedef std::vector<char> ByteArray;
00116
00117 typedef std::vector<void*> Pool;
00118
00119 typedef std::vector<Buffer> Queue;
00120
00121 int max_event_size_;
00122 int max_queue_depth_;
00123 int pos_;
00124 ByteArray mem_;
00125 Pool buffer_pool_;
00126 Queue queue_;
00127 unsigned int fpos_, bpos_;
00128
00129 boost::mutex pool_lock_;
00130 boost::mutex queue_lock_;
00131 boost::condition pool_cond_;
00132 boost::condition pop_cond_;
00133 boost::condition push_cond_;
00134
00135 };
00136
00137
00138 }
00139 #endif