CMS 3D CMS Logo

SingleConsumerQ.h
Go to the documentation of this file.
1 #ifndef FWCore_Utilities_SingleConsumerQ_h
2 #define FWCore_Utilities_SingleConsumerQ_h
3 
4 // -*- C++ -*-
5 
6 /*
7  A bounded queue for use in a multi-threaded producer/consumer application.
8  This is a simple design. It is only meant to be used where there is
9  one consumer and one or more producers using the a queue instance.
10 
11  The problem with multiple consumers is the separate front/pop
12  member functions. If they are pulled together into one function,
13  multiple consumers may be possible, but exception safety would then
14  be a problem - popping an item off the queue to be held as a local
15  variable, followed by its removal from the queue. Having fixed size
16  buffers within a fixed size pool and using a circular buffer as the
17  queue alleviates most of this problem because exceptions will not
18  occur during manipulation. The only problem left to be checked is
19  how (or if) the boost mutex manipulation can throw and when.
20 
21  Note: the current implementation has no protection again unsigned int
22  overflows
23 
24  Missing:
25  - the ring buffer is really not used to its fullest extent
26  - the buffer sizes are fixed and cannot grow
27  - a simple Buffer object is returned that has the pointer and len
28  separate. The length should be stored as the first word of the
29  buffer itself
30  - timeouts for consumer
31  - good way to signal to consumer to end
32  - keeping the instance of this thing around until all using threads are
33  done with it
34 
35 */
36 
37 #include <vector>
38 #include <mutex>
39 #include <condition_variable>
40 
41 namespace edm {
42 
44  public:
45  struct Buffer {
46  Buffer() : ptr_(), len_() {}
47  Buffer(void* p, int len) : ptr_(p), len_(len) {}
48 
49  void* ptr_;
50  int len_;
51  };
52 
55 
56  struct ConsumerType {
57  static SingleConsumerQ::Buffer get(SingleConsumerQ& b) { return b.getConsumerBuffer(); }
58  static void release(SingleConsumerQ& b, void* v) { b.releaseConsumerBuffer(v); }
59  static void commit(SingleConsumerQ& b, void* v, int size) { b.commitConsumerBuffer(v, size); }
60  };
61  struct ProducerType {
62  static SingleConsumerQ::Buffer get(SingleConsumerQ& b) { return b.getProducerBuffer(); }
63  static void release(SingleConsumerQ& b, void* v) { b.releaseProducerBuffer(v); }
64  static void commit(SingleConsumerQ& b, void* v, int size) { b.commitProducerBuffer(v, size); }
65  };
66 
67  template <class T>
68  class OperateBuffer {
69  public:
72  if (!committed_)
73  T::release(b_, v_.ptr_);
74  }
75 
76  void* buffer() const { return v_.ptr_; }
77  int size() const { return v_.len_; }
78  void commit(int theSize = 0) {
79  T::commit(b_, v_.ptr_, theSize);
80  committed_ = true;
81  }
82 
83  private:
86  bool committed_;
87  };
88 
91 
93  void releaseProducerBuffer(void*);
94  void commitProducerBuffer(void*, int);
95 
97  void releaseConsumerBuffer(void*);
98  void commitConsumerBuffer(void*, int);
99 
100  int maxEventSize() const { return max_event_size_; }
101  int maxQueueDepth() const { return max_queue_depth_; }
102 
103  private:
104  // no copy
105  SingleConsumerQ(const SingleConsumerQ&) = delete;
106 
107  // the memory for the buffers
108  typedef std::vector<char> ByteArray;
109  // the pool of buffers
110  typedef std::vector<void*> Pool;
111  // the queue
112  typedef std::vector<Buffer> Queue;
113 
116  int pos_; // use pool as stack of avaiable buffers
120  unsigned int fpos_, bpos_; // positions for queue - front and back
121 
124  std::condition_variable pool_cond_;
125  std::condition_variable pop_cond_;
126  std::condition_variable push_cond_;
127  };
128 
129 } // namespace edm
130 #endif
edm::SingleConsumerQ::ProducerType
Definition: SingleConsumerQ.h:61
edm::SingleConsumerQ::pool_cond_
std::condition_variable pool_cond_
Definition: SingleConsumerQ.h:124
edm::SingleConsumerQ::Buffer::Buffer
Buffer()
Definition: SingleConsumerQ.h:46
edm::SingleConsumerQ::maxEventSize
int maxEventSize() const
Definition: SingleConsumerQ.h:100
edm::SingleConsumerQ::Buffer::Buffer
Buffer(void *p, int len)
Definition: SingleConsumerQ.h:47
funct::false
false
Definition: Factorize.h:29
edm::SingleConsumerQ::~SingleConsumerQ
~SingleConsumerQ()
Definition: SingleConsumerQ.cc:25
edm::SingleConsumerQ::pos_
int pos_
Definition: SingleConsumerQ.h:116
edm::SingleConsumerQ::ConsumerType::release
static void release(SingleConsumerQ &b, void *v)
Definition: SingleConsumerQ.h:58
edm
HLT enums.
Definition: AlignableModifier.h:19
AlCaHLTBitMon_ParallelJobs.p
p
Definition: AlCaHLTBitMon_ParallelJobs.py:153
edm::SingleConsumerQ::OperateBuffer::b_
SingleConsumerQ & b_
Definition: SingleConsumerQ.h:84
edm::SingleConsumerQ::Buffer::ptr_
void * ptr_
Definition: SingleConsumerQ.h:49
edm::SingleConsumerQ::Buffer::len_
int len_
Definition: SingleConsumerQ.h:50
findQualityFiles.v
v
Definition: findQualityFiles.py:179
edm::SingleConsumerQ::commitConsumerBuffer
void commitConsumerBuffer(void *, int)
Definition: SingleConsumerQ.cc:87
edm::SingleConsumerQ::SingleConsumerQ
SingleConsumerQ(int max_event_size, int max_queue_depth)
Definition: SingleConsumerQ.cc:5
edm::SingleConsumerQ::ConsumerType::get
static SingleConsumerQ::Buffer get(SingleConsumerQ &b)
Definition: SingleConsumerQ.h:57
edm::SingleConsumerQ::OperateBuffer
Definition: SingleConsumerQ.h:68
edm::SingleConsumerQ::Buffer
Definition: SingleConsumerQ.h:45
edm::SingleConsumerQ::OperateBuffer::~OperateBuffer
~OperateBuffer()
Definition: SingleConsumerQ.h:71
edm::SingleConsumerQ::max_queue_depth_
int max_queue_depth_
Definition: SingleConsumerQ.h:115
edm::SingleConsumerQ::ProducerType::commit
static void commit(SingleConsumerQ &b, void *v, int size)
Definition: SingleConsumerQ.h:64
edm::SingleConsumerQ::ProducerType::get
static SingleConsumerQ::Buffer get(SingleConsumerQ &b)
Definition: SingleConsumerQ.h:62
edm::SingleConsumerQ::ConsumerType
Definition: SingleConsumerQ.h:56
edm::SingleConsumerQ::ProducerType::release
static void release(SingleConsumerQ &b, void *v)
Definition: SingleConsumerQ.h:63
b
double b
Definition: hdecay.h:118
edm::SingleConsumerQ::ConsumerType::commit
static void commit(SingleConsumerQ &b, void *v, int size)
Definition: SingleConsumerQ.h:59
edm::SingleConsumerQ::releaseProducerBuffer
void releaseProducerBuffer(void *)
Definition: SingleConsumerQ.cc:39
edm::SingleConsumerQ::OperateBuffer::committed_
bool committed_
Definition: SingleConsumerQ.h:86
edm::SingleConsumerQ::getConsumerBuffer
Buffer getConsumerBuffer()
Definition: SingleConsumerQ.cc:62
fetchall_from_DQM_v2.release
release
Definition: fetchall_from_DQM_v2.py:92
edm::SingleConsumerQ::pop_cond_
std::condition_variable pop_cond_
Definition: SingleConsumerQ.h:125
edm::SingleConsumerQ::queue_lock_
std::mutex queue_lock_
Definition: SingleConsumerQ.h:123
edm::SingleConsumerQ::commitProducerBuffer
void commitProducerBuffer(void *, int)
Definition: SingleConsumerQ.cc:47
edm::SingleConsumerQ::releaseConsumerBuffer
void releaseConsumerBuffer(void *)
Definition: SingleConsumerQ.cc:78
edm::get
T const & get(Event const &event, InputTag const &tag) noexcept(false)
Definition: Event.h:675
reco_skim_cfg_mod.max_event_size
max_event_size
Definition: reco_skim_cfg_mod.py:124
mutex
static std::mutex mutex
Definition: Proxy.cc:8
edm::SingleConsumerQ::mem_
ByteArray mem_
Definition: SingleConsumerQ.h:117
edm::SingleConsumerQ
Definition: SingleConsumerQ.h:43
edm::SingleConsumerQ::buffer_pool_
Pool buffer_pool_
Definition: SingleConsumerQ.h:118
edm::SingleConsumerQ::queue_
Queue queue_
Definition: SingleConsumerQ.h:119
edm::SingleConsumerQ::fpos_
unsigned int fpos_
Definition: SingleConsumerQ.h:120
edm::SingleConsumerQ::push_cond_
std::condition_variable push_cond_
Definition: SingleConsumerQ.h:126
edm::SingleConsumerQ::OperateBuffer::commit
void commit(int theSize=0)
Definition: SingleConsumerQ.h:78
edm::SingleConsumerQ::ConsumerBuffer
OperateBuffer< ConsumerType > ConsumerBuffer
Definition: SingleConsumerQ.h:89
edm::SingleConsumerQ::getProducerBuffer
Buffer getProducerBuffer()
Definition: SingleConsumerQ.cc:27
edm::SingleConsumerQ::ByteArray
std::vector< char > ByteArray
Definition: SingleConsumerQ.h:108
edm::SingleConsumerQ::ProducerBuffer
OperateBuffer< ProducerType > ProducerBuffer
Definition: SingleConsumerQ.h:90
T
long double T
Definition: Basic3DVectorLD.h:48
edm::SingleConsumerQ::max_event_size_
int max_event_size_
Definition: SingleConsumerQ.h:114
edm::SingleConsumerQ::OperateBuffer::v_
SingleConsumerQ::Buffer v_
Definition: SingleConsumerQ.h:85
edm::SingleConsumerQ::OperateBuffer::size
int size() const
Definition: SingleConsumerQ.h:77
edm::SingleConsumerQ::OperateBuffer::OperateBuffer
OperateBuffer(SingleConsumerQ &b)
Definition: SingleConsumerQ.h:70
edm::SingleConsumerQ::Pool
std::vector< void * > Pool
Definition: SingleConsumerQ.h:110
edm::SingleConsumerQ::Queue
std::vector< Buffer > Queue
Definition: SingleConsumerQ.h:112
edm::SingleConsumerQ::maxQueueDepth
int maxQueueDepth() const
Definition: SingleConsumerQ.h:101
edm::SingleConsumerQ::bpos_
unsigned int bpos_
Definition: SingleConsumerQ.h:120
edm::SingleConsumerQ::pool_lock_
std::mutex pool_lock_
Definition: SingleConsumerQ.h:122
edm::SingleConsumerQ::OperateBuffer::buffer
void * buffer() const
Definition: SingleConsumerQ.h:76
reco_skim_cfg_mod.max_queue_depth
max_queue_depth
Definition: reco_skim_cfg_mod.py:128
findQualityFiles.size
size
Write out results.
Definition: findQualityFiles.py:443