CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
EventBuffer.h
Go to the documentation of this file.
1 #ifndef IOPool_Streamer_EventBuffer_h
2 #define IOPool_Streamer_EventBuffer_h
3 
4 // -*- C++ -*-
5 
6 /*
7  A bounded queue for use in a multi-threaded producer/consumer application.
8  This is a simple design. It is only meant to be used where there is
9  one consumer and one or more producers using the a queue instance.
10 
11  The problem with multiple consumers is the separate front/pop
12  member functions. If they are pulled together into one function,
13  multiple consumers may be possible, but exception safety would then
14  be a problem - popping an item off the queue to be held as a local
15  variable, followed by its removal from the queue. Having fixed size
16  buffers within a fixed size pool and using a circular buffer as the
17  queue alleviates most of this problem because exceptions will not
18  occur during manipulation. The only problem left to be checked is
19  how (or if) the boost mutex manipulation can throw and when.
20 
21  Note: the current implementation has no protection again unsigned int
22  overflows
23 
24  Missing:
25  - the ring buffer is really not used to its fullest extent
26  - the buffer sizes are fixed and cannot grow
27  - a simple Buffer object is returned that has the pointer and len
28  separate. The length should be stored as the first word of the
29  buffer itself
30  - timeouts for consumer
31  - good way to signal to consumer to end
32  - keeping the instance of this thing around until all using threads are
33  done with it
34 
35 */
36 
37 #include <vector>
38 #include "boost/thread/mutex.hpp"
39 #include "boost/thread/condition.hpp"
40 
41 namespace edm {
42 
44  {
45  public:
46  struct Buffer
47  {
48  Buffer():ptr_(),len_() { }
49  Buffer(void* p,int len):ptr_(p),len_(len) { }
50 
51  void* ptr_;
52  int len_;
53  };
54 
56  ~EventBuffer();
57 
58  struct ConsumerType
59  {
61  { return b.getConsumerBuffer(); }
62  static void release(EventBuffer& b, void* v)
63  { b.releaseConsumerBuffer(v); }
64  static void commit(EventBuffer& b, void* v,int size)
65  { b.commitConsumerBuffer(v,size); }
66  };
67  struct ProducerType
68  {
70  { return b.getProducerBuffer(); }
71  static void release(EventBuffer& b, void* v)
72  { b.releaseProducerBuffer(v); }
73  static void commit(EventBuffer& b, void* v,int size)
74  { b.commitProducerBuffer(v,size); }
75  };
76 
77  template <class T>
79  {
80  public:
82  b_(b),v_(T::get(b)),committed_(false) { }
84  { if(!committed_) T::release(b_,v_.ptr_); }
85 
86  void* buffer() const { return v_.ptr_; }
87  int size() const { return v_.len_; }
88  void commit(int size=0) { T::commit(b_,v_.ptr_,size); committed_=true; }
89 
90  private:
93  bool committed_;
94  };
95 
98 
100  void releaseProducerBuffer(void*);
101  void commitProducerBuffer(void*,int);
102 
104  void releaseConsumerBuffer(void*);
105  void commitConsumerBuffer(void*,int);
106 
107  int maxEventSize() const { return max_event_size_; }
108  int maxQueueDepth() const { return max_queue_depth_; }
109 
110  private:
111  // no copy
113 
114  // the memory for the buffers
115  typedef std::vector<char> ByteArray;
116  // the pool of buffers
117  typedef std::vector<void*> Pool;
118  // the queue
119  typedef std::vector<Buffer> Queue;
120 
123  int pos_; // use pool as stack of avaiable buffers
127  unsigned int fpos_, bpos_; // positions for queue - front and back
128 
131  boost::condition pool_cond_;
132  boost::condition pop_cond_;
133  boost::condition push_cond_;
134 
135  public:
136  bool empty() { boost::mutex::scoped_lock sl(queue_lock_); return (bpos_==fpos_); }
137 
138  };
139 
140 
141 }
142 #endif
143 
144 
145 
Buffer getConsumerBuffer()
Definition: EventBuffer.cc:62
OperateBuffer< ConsumerType > ConsumerBuffer
Definition: EventBuffer.h:96
static void commit(EventBuffer &b, void *v, int size)
Definition: EventBuffer.h:73
EventBuffer(int max_event_size, int max_queue_depth)
Definition: EventBuffer.cc:7
static boost::mutex mutex
Definition: LHEProxy.cc:11
OperateBuffer< ProducerType > ProducerBuffer
Definition: EventBuffer.h:97
void commitProducerBuffer(void *, int)
Definition: EventBuffer.cc:45
static void release(EventBuffer &b, void *v)
Definition: EventBuffer.h:62
boost::mutex pool_lock_
Definition: EventBuffer.h:129
EventBuffer(const EventBuffer &)
Definition: EventBuffer.h:112
static void commit(EventBuffer &b, void *v, int size)
Definition: EventBuffer.h:64
unsigned int fpos_
Definition: EventBuffer.h:127
static void release(EventBuffer &b, void *v)
Definition: EventBuffer.h:71
std::vector< void * > Pool
Definition: EventBuffer.h:117
Buffer(void *p, int len)
Definition: EventBuffer.h:49
EventBuffer::Buffer v_
Definition: EventBuffer.h:92
boost::mutex queue_lock_
Definition: EventBuffer.h:130
unsigned int bpos_
Definition: EventBuffer.h:127
boost::condition pool_cond_
Definition: EventBuffer.h:131
boost::condition push_cond_
Definition: EventBuffer.h:133
boost::condition pop_cond_
Definition: EventBuffer.h:132
Buffer getProducerBuffer()
Definition: EventBuffer.cc:22
std::vector< Buffer > Queue
Definition: EventBuffer.h:119
void releaseConsumerBuffer(void *)
Definition: EventBuffer.cc:80
double b
Definition: hdecay.h:120
int maxEventSize() const
Definition: EventBuffer.h:107
ByteArray mem_
Definition: EventBuffer.h:124
std::vector< char > ByteArray
Definition: EventBuffer.h:115
void commitConsumerBuffer(void *, int)
Definition: EventBuffer.cc:90
volatile std::atomic< bool > shutdown_flag false
long double T
int maxQueueDepth() const
Definition: EventBuffer.h:108
tuple size
Write out results.
T get(const Candidate &c)
Definition: component.h:55
void releaseProducerBuffer(void *)
Definition: EventBuffer.cc:35