CMS 3D CMS Logo

SingleConsumerQ.cc
Go to the documentation of this file.
2 
3 namespace edm {
4 
6  : max_event_size_(max_event_size),
7  max_queue_depth_(max_queue_depth),
8  pos_(max_queue_depth - 1),
10  buffer_pool_(),
11  queue_(max_queue_depth),
12  fpos_(),
13  bpos_(),
14  pool_lock_(),
15  queue_lock_(),
16  pool_cond_(),
17  pop_cond_(),
18  push_cond_() {
19  // throw if event size 0 or queue depth 0
20 
21  for (char* i = &mem_[0]; i < &mem_[mem_.size()]; i += max_event_size)
22  buffer_pool_.push_back(i);
23  }
24 
26 
28  // get lock
29  std::unique_lock<std::mutex> sl(pool_lock_);
30  // wait for buffer to appear
31  while (pos_ < 0) {
32  pool_cond_.wait(sl);
33  }
34  void* v = buffer_pool_[pos_];
35  --pos_;
36  return Buffer(v, max_event_size_);
37  }
38 
40  // get lock
41  std::lock_guard<std::mutex> sl(pool_lock_);
42  ++pos_;
43  buffer_pool_[pos_] = v;
44  pool_cond_.notify_all();
45  }
46 
48  // get lock
49  std::unique_lock<std::mutex> sl(queue_lock_);
50  // if full, wait for item to be removed
51  while ((bpos_ + max_queue_depth_) == fpos_) {
52  push_cond_.wait(sl);
53  }
54 
55  // put buffer into queue
57  ++fpos_;
58  // signal consumer
59  pop_cond_.notify_all();
60  }
61 
63  // get lock
64  std::unique_lock<std::mutex> sl(queue_lock_);
65  // if empty, wait for item to appear
66  while (bpos_ == fpos_) {
67  pop_cond_.wait(sl);
68  }
69  // get a buffer from the queue and return it
71  ++bpos_;
72  // note that these operations cannot throw
73  // signal producer
74  push_cond_.notify_all();
75  return v;
76  }
77 
79  // should the buffer be placed back onto the queue and not released?
80  // we got here because a commit did to occur in the consumer.
81  // we will allow consumers to call or not call commit for now, meaning
82  // that we cannot distinguish between exception conditions and normal
83  // return. The buffer will always be released
85  }
86 
88 } // namespace edm
edm::SingleConsumerQ::pool_cond_
std::condition_variable pool_cond_
Definition: SingleConsumerQ.h:124
mps_fire.i
i
Definition: mps_fire.py:355
edm::SingleConsumerQ::~SingleConsumerQ
~SingleConsumerQ()
Definition: SingleConsumerQ.cc:25
edm::SingleConsumerQ::pos_
int pos_
Definition: SingleConsumerQ.h:116
edm
HLT enums.
Definition: AlignableModifier.h:19
SingleConsumerQ.h
findQualityFiles.v
v
Definition: findQualityFiles.py:179
edm::SingleConsumerQ::commitConsumerBuffer
void commitConsumerBuffer(void *, int)
Definition: SingleConsumerQ.cc:87
edm::SingleConsumerQ::SingleConsumerQ
SingleConsumerQ(int max_event_size, int max_queue_depth)
Definition: SingleConsumerQ.cc:5
edm::SingleConsumerQ::Buffer
Definition: SingleConsumerQ.h:45
edm::SingleConsumerQ::max_queue_depth_
int max_queue_depth_
Definition: SingleConsumerQ.h:115
edm::SingleConsumerQ::releaseProducerBuffer
void releaseProducerBuffer(void *)
Definition: SingleConsumerQ.cc:39
edm::SingleConsumerQ::getConsumerBuffer
Buffer getConsumerBuffer()
Definition: SingleConsumerQ.cc:62
edm::SingleConsumerQ::pop_cond_
std::condition_variable pop_cond_
Definition: SingleConsumerQ.h:125
edm::SingleConsumerQ::queue_lock_
std::mutex queue_lock_
Definition: SingleConsumerQ.h:123
edm::SingleConsumerQ::commitProducerBuffer
void commitProducerBuffer(void *, int)
Definition: SingleConsumerQ.cc:47
edm::SingleConsumerQ::releaseConsumerBuffer
void releaseConsumerBuffer(void *)
Definition: SingleConsumerQ.cc:78
reco_skim_cfg_mod.max_event_size
max_event_size
Definition: reco_skim_cfg_mod.py:124
edm::SingleConsumerQ::mem_
ByteArray mem_
Definition: SingleConsumerQ.h:117
edm::SingleConsumerQ::buffer_pool_
Pool buffer_pool_
Definition: SingleConsumerQ.h:118
edm::SingleConsumerQ::queue_
Queue queue_
Definition: SingleConsumerQ.h:119
edm::SingleConsumerQ::fpos_
unsigned int fpos_
Definition: SingleConsumerQ.h:120
edm::SingleConsumerQ::push_cond_
std::condition_variable push_cond_
Definition: SingleConsumerQ.h:126
edm::SingleConsumerQ::getProducerBuffer
Buffer getProducerBuffer()
Definition: SingleConsumerQ.cc:27
edm::SingleConsumerQ::max_event_size_
int max_event_size_
Definition: SingleConsumerQ.h:114
edm::SingleConsumerQ::bpos_
unsigned int bpos_
Definition: SingleConsumerQ.h:120
edm::SingleConsumerQ::pool_lock_
std::mutex pool_lock_
Definition: SingleConsumerQ.h:122
reco_skim_cfg_mod.max_queue_depth
max_queue_depth
Definition: reco_skim_cfg_mod.py:128