CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
SingleConsumerQ.h
Go to the documentation of this file.
1 #ifndef FWCore_Utilities_SingleConsumerQ_h
2 #define FWCore_Utilities_SingleConsumerQ_h
3 
4 // -*- C++ -*-
5 
6 /*
7  A bounded queue for use in a multi-threaded producer/consumer application.
8  This is a simple design. It is only meant to be used where there is
9  one consumer and one or more producers using the a queue instance.
10 
11  The problem with multiple consumers is the separate front/pop
12  member functions. If they are pulled together into one function,
13  multiple consumers may be possible, but exception safety would then
14  be a problem - popping an item off the queue to be held as a local
15  variable, followed by its removal from the queue. Having fixed size
16  buffers within a fixed size pool and using a circular buffer as the
17  queue alleviates most of this problem because exceptions will not
18  occur during manipulation. The only problem left to be checked is
19  how (or if) the boost mutex manipulation can throw and when.
20 
21  Note: the current implementation has no protection again unsigned int
22  overflows
23 
24  Missing:
25  - the ring buffer is really not used to its fullest extent
26  - the buffer sizes are fixed and cannot grow
27  - a simple Buffer object is returned that has the pointer and len
28  separate. The length should be stored as the first word of the
29  buffer itself
30  - timeouts for consumer
31  - good way to signal to consumer to end
32  - keeping the instance of this thing around until all using threads are
33  done with it
34 
35 */
36 
37 #include <vector>
38 #include <mutex>
39 #include <condition_variable>
40 
41 namespace edm {
42 
44  public:
45  struct Buffer {
46  Buffer() : ptr_(), len_() {}
47  Buffer(void* p, int len) : ptr_(p), len_(len) {}
48 
49  void* ptr_;
50  int len_;
51  };
52  // no copy
53  SingleConsumerQ(const SingleConsumerQ&) = delete;
54 
57 
58  struct ConsumerType {
59  static SingleConsumerQ::Buffer get(SingleConsumerQ& b) { return b.getConsumerBuffer(); }
60  static void release(SingleConsumerQ& b, void* v) { b.releaseConsumerBuffer(v); }
61  static void commit(SingleConsumerQ& b, void* v, int size) { b.commitConsumerBuffer(v, size); }
62  };
63  struct ProducerType {
64  static SingleConsumerQ::Buffer get(SingleConsumerQ& b) { return b.getProducerBuffer(); }
65  static void release(SingleConsumerQ& b, void* v) { b.releaseProducerBuffer(v); }
66  static void commit(SingleConsumerQ& b, void* v, int size) { b.commitProducerBuffer(v, size); }
67  };
68 
69  template <class T>
70  class OperateBuffer {
71  public:
72  explicit OperateBuffer(SingleConsumerQ& b) : b_(b), v_(T::get(b)), committed_(false) {}
74  if (!committed_)
75  T::release(b_, v_.ptr_);
76  }
77 
78  void* buffer() const { return v_.ptr_; }
79  int size() const { return v_.len_; }
80  void commit(int theSize = 0) {
81  T::commit(b_, v_.ptr_, theSize);
82  committed_ = true;
83  }
84 
85  private:
88  bool committed_;
89  };
90 
93 
95  void releaseProducerBuffer(void*);
96  void commitProducerBuffer(void*, int);
97 
99  void releaseConsumerBuffer(void*);
100  void commitConsumerBuffer(void*, int);
101 
102  int maxEventSize() const { return max_event_size_; }
103  int maxQueueDepth() const { return max_queue_depth_; }
104 
105  private:
106  // the memory for the buffers
107  typedef std::vector<char> ByteArray;
108  // the pool of buffers
109  typedef std::vector<void*> Pool;
110  // the queue
111  typedef std::vector<Buffer> Queue;
112 
115  int pos_; // use pool as stack of avaiable buffers
119  unsigned int fpos_, bpos_; // positions for queue - front and back
120 
123  std::condition_variable pool_cond_;
124  std::condition_variable pop_cond_;
125  std::condition_variable push_cond_;
126  };
127 
128 } // namespace edm
129 #endif
std::condition_variable pop_cond_
static void commit(SingleConsumerQ &b, void *v, int size)
void releaseConsumerBuffer(void *)
static void release(SingleConsumerQ &b, void *v)
std::vector< void * > Pool
static std::mutex mutex
Definition: Proxy.cc:8
std::vector< Buffer > Queue
void releaseProducerBuffer(void *)
std::condition_variable pool_cond_
OperateBuffer< ProducerType > ProducerBuffer
OperateBuffer< ConsumerType > ConsumerBuffer
std::condition_variable push_cond_
static void commit(SingleConsumerQ &b, void *v, int size)
std::vector< char > ByteArray
int maxEventSize() const
double b
Definition: hdecay.h:118
void commitProducerBuffer(void *, int)
void commitConsumerBuffer(void *, int)
T const & get(Event const &event, InputTag const &tag) noexcept(false)
Definition: Event.h:679
int maxQueueDepth() const
long double T
tuple size
Write out results.
SingleConsumerQ(const SingleConsumerQ &)=delete
static void release(SingleConsumerQ &b, void *v)