FWCore
Utilities
src
SingleConsumerQ.cc
Go to the documentation of this file.
1
#include "
FWCore/Utilities/interface/SingleConsumerQ.h
"
2
3
namespace
edm
{
4
5
SingleConsumerQ::SingleConsumerQ
(
int
max_event_size
,
int
max_queue_depth
)
6
: max_event_size_(
max_event_size
),
7
max_queue_depth_(
max_queue_depth
),
8
pos_(
max_queue_depth
- 1),
9
mem_(
max_event_size
*
max_queue_depth
),
10
buffer_pool_(),
11
queue_(
max_queue_depth
),
12
fpos_(),
13
bpos_(),
14
pool_lock_(),
15
queue_lock_(),
16
pool_cond_(),
17
pop_cond_(),
18
push_cond_() {
19
// throw if event size 0 or queue depth 0
20
21
for
(
char
*
i
= &
mem_
[0];
i
< &
mem_
[
mem_
.size()];
i
+=
max_event_size
)
22
buffer_pool_
.push_back(
i
);
23
}
24
25
SingleConsumerQ::~SingleConsumerQ
() {}
26
27
SingleConsumerQ::Buffer
SingleConsumerQ::getProducerBuffer
() {
28
// get lock
29
std::unique_lock<std::mutex> sl(
pool_lock_
);
30
// wait for buffer to appear
31
while
(
pos_
< 0) {
32
pool_cond_
.wait(sl);
33
}
34
void
*
v
=
buffer_pool_
[
pos_
];
35
--
pos_
;
36
return
Buffer
(
v
,
max_event_size_
);
37
}
38
39
void
SingleConsumerQ::releaseProducerBuffer
(
void
*
v
) {
40
// get lock
41
std::lock_guard<std::mutex> sl(
pool_lock_
);
42
++
pos_
;
43
buffer_pool_
[
pos_
] =
v
;
44
pool_cond_
.notify_all();
45
}
46
47
void
SingleConsumerQ::commitProducerBuffer
(
void
*
v
,
int
len) {
48
// get lock
49
std::unique_lock<std::mutex> sl(
queue_lock_
);
50
// if full, wait for item to be removed
51
while
((
bpos_
+
max_queue_depth_
) ==
fpos_
) {
52
push_cond_
.wait(sl);
53
}
54
55
// put buffer into queue
56
queue_
[
fpos_
%
max_queue_depth_
] =
Buffer
(
v
, len);
57
++
fpos_
;
58
// signal consumer
59
pop_cond_
.notify_all();
60
}
61
62
SingleConsumerQ::Buffer
SingleConsumerQ::getConsumerBuffer
() {
63
// get lock
64
std::unique_lock<std::mutex> sl(
queue_lock_
);
65
// if empty, wait for item to appear
66
while
(
bpos_
==
fpos_
) {
67
pop_cond_
.wait(sl);
68
}
69
// get a buffer from the queue and return it
70
Buffer
v
=
queue_
[
bpos_
%
max_queue_depth_
];
71
++
bpos_
;
72
// note that these operations cannot throw
73
// signal producer
74
push_cond_
.notify_all();
75
return
v
;
76
}
77
78
void
SingleConsumerQ::releaseConsumerBuffer
(
void
*
v
) {
79
// should the buffer be placed back onto the queue and not released?
80
// we got here because a commit did to occur in the consumer.
81
// we will allow consumers to call or not call commit for now, meaning
82
// that we cannot distinguish between exception conditions and normal
83
// return. The buffer will always be released
84
releaseProducerBuffer
(
v
);
85
}
86
87
void
SingleConsumerQ::commitConsumerBuffer
(
void
*
v
,
int
) {
releaseProducerBuffer
(
v
); }
88
}
// namespace edm
edm::SingleConsumerQ::pool_cond_
std::condition_variable pool_cond_
Definition:
SingleConsumerQ.h:123
mps_fire.i
i
Definition:
mps_fire.py:428
edm::SingleConsumerQ::~SingleConsumerQ
~SingleConsumerQ()
Definition:
SingleConsumerQ.cc:25
edm::SingleConsumerQ::pos_
int pos_
Definition:
SingleConsumerQ.h:115
edm
HLT enums.
Definition:
AlignableModifier.h:19
SingleConsumerQ.h
findQualityFiles.v
v
Definition:
findQualityFiles.py:179
edm::SingleConsumerQ::commitConsumerBuffer
void commitConsumerBuffer(void *, int)
Definition:
SingleConsumerQ.cc:87
edm::SingleConsumerQ::SingleConsumerQ
SingleConsumerQ(const SingleConsumerQ &)=delete
edm::SingleConsumerQ::Buffer
Definition:
SingleConsumerQ.h:45
edm::SingleConsumerQ::max_queue_depth_
int max_queue_depth_
Definition:
SingleConsumerQ.h:114
edm::SingleConsumerQ::releaseProducerBuffer
void releaseProducerBuffer(void *)
Definition:
SingleConsumerQ.cc:39
edm::SingleConsumerQ::getConsumerBuffer
Buffer getConsumerBuffer()
Definition:
SingleConsumerQ.cc:62
edm::SingleConsumerQ::pop_cond_
std::condition_variable pop_cond_
Definition:
SingleConsumerQ.h:124
edm::SingleConsumerQ::queue_lock_
std::mutex queue_lock_
Definition:
SingleConsumerQ.h:122
edm::SingleConsumerQ::commitProducerBuffer
void commitProducerBuffer(void *, int)
Definition:
SingleConsumerQ.cc:47
edm::SingleConsumerQ::releaseConsumerBuffer
void releaseConsumerBuffer(void *)
Definition:
SingleConsumerQ.cc:78
reco_skim_cfg_mod.max_event_size
max_event_size
Definition:
reco_skim_cfg_mod.py:124
edm::SingleConsumerQ::mem_
ByteArray mem_
Definition:
SingleConsumerQ.h:116
edm::SingleConsumerQ::buffer_pool_
Pool buffer_pool_
Definition:
SingleConsumerQ.h:117
edm::SingleConsumerQ::queue_
Queue queue_
Definition:
SingleConsumerQ.h:118
edm::SingleConsumerQ::fpos_
unsigned int fpos_
Definition:
SingleConsumerQ.h:119
edm::SingleConsumerQ::push_cond_
std::condition_variable push_cond_
Definition:
SingleConsumerQ.h:125
edm::SingleConsumerQ::getProducerBuffer
Buffer getProducerBuffer()
Definition:
SingleConsumerQ.cc:27
edm::SingleConsumerQ::max_event_size_
int max_event_size_
Definition:
SingleConsumerQ.h:113
edm::SingleConsumerQ::bpos_
unsigned int bpos_
Definition:
SingleConsumerQ.h:119
edm::SingleConsumerQ::pool_lock_
std::mutex pool_lock_
Definition:
SingleConsumerQ.h:121
reco_skim_cfg_mod.max_queue_depth
max_queue_depth
Definition:
reco_skim_cfg_mod.py:128
Generated for CMSSW Reference Manual by
1.8.16