18 #include <condition_variable>
45 auto const&
cv = p.
getParameter<std::vector<edm::InputTag>>(
"consumes");
47 for (
auto const&
c :
cv) {
48 tokens_.emplace_back(iCol.consumes<
int>(
c));
51 auto const& tv = p.
getParameter<std::vector<double>>(
"eventTimes");
67 desc.
add<std::vector<edm::InputTag>>(
"consumes", {})->setComment(
"What event int data products to consume");
68 desc.
add<std::vector<double>>(
"eventTimes")
70 "The time, in seconds, for how long the module should sleep each event. The index to use is based on a "
71 "modulo of size of the list applied to the Event ID number.");
75 std::vector<edm::EDGetTokenT<int>>
tokens_;
107 desc.
add<
int>(
"ivalue")->setComment(
"Value to put into Event");
108 Sleeper::fillDescription(desc);
139 desc.
add<
int>(
"ivalue")->setComment(
"Value to put into Event");
141 Sleeper::fillDescription(desc);
165 Sleeper::fillDescription(desc);
188 : nWaitingEvents_(iPS.getUntrackedParameter<unsigned int>(
"nWaitingEvents")) {
191 waitingStreams_.reserve(nStreams);
192 waitTimesPerStream_.resize(nStreams);
193 waitingTaskPerStream_.resize(nStreams);
197 stopProcessing_ =
true;
198 condition_.notify_one();
199 serverThread_->join();
204 condition_.notify_one();
207 serverThread_ = std::make_unique<std::thread>([
this]() { threadWork(); });
212 waitTimesPerStream_[
id.value()] = {{initTime, workTime, finishTime}};
213 waitingTaskPerStream_[
id.value()] =
std::move(iTask);
215 std::lock_guard<std::mutex> lk{mutex_};
216 waitingStreams_.push_back(
id.
value());
218 condition_.notify_one();
223 if (stopProcessing_) {
226 if (waitingStreams_.size() >= nWaitingEvents_) {
230 return waitingStreams_.size() == activeStreams_;
234 while (not stopProcessing_.load()) {
235 std::vector<int> streamsToProcess;
237 std::unique_lock<std::mutex> lk(mutex_);
238 condition_.wait(lk, [
this]() {
return readyToDoSomething(); });
239 swap(streamsToProcess, waitingStreams_);
241 if (stopProcessing_) {
244 long longestTime = 0;
246 for (
auto i : streamsToProcess) {
247 auto const&
v = waitTimesPerStream_[
i];
248 if (
v[1] > longestTime) {
257 for (
auto i : streamsToProcess) {
258 auto const&
v = waitTimesPerStream_[
i];
260 waitingTaskPerStream_[
i].doneWaiting(std::exception_ptr());
263 waitingTaskPerStream_.clear();
272 std::atomic<unsigned int> activeStreams_{0};
273 std::atomic<bool> stopProcessing_{
false};
281 auto const& tv = p.
getParameter<std::vector<double>>(
"serviceInitTimes");
282 initTimes_.reserve(tv.size());
284 initTimes_.push_back(static_cast<useconds_t>(
t * 1E6));
288 auto const& tv = p.
getParameter<std::vector<double>>(
"serviceWorkTimes");
289 workTimes_.reserve(tv.size());
291 workTimes_.push_back(static_cast<useconds_t>(
t * 1E6));
295 auto const& tv = p.
getParameter<std::vector<double>>(
"serviceFinishTimes");
296 finishTimes_.reserve(tv.size());
298 finishTimes_.push_back(static_cast<useconds_t>(
t * 1E6));
301 assert(finishTimes_.size() == initTimes_.size());
302 assert(workTimes_.size() == initTimes_.size());
330 server->asyncWork(
id,
std::move(holder), initTimes_[
index], workTimes_[index], finishTimes_[index]);
340 desc.
add<
int>(
"ivalue")->setComment(
"Value to put into Event");
341 desc.
add<std::vector<double>>(
"serviceInitTimes");
342 desc.
add<std::vector<double>>(
"serviceWorkTimes");
343 desc.
add<std::vector<double>>(
"serviceFinishTimes");
344 Sleeper::fillDescription(desc);
EventNumber_t event() const
SleepingServer(edm::ParameterSet const &iPS, edm::ActivityRegistry &iAR)
const edm::EventSetup & c
std::vector< long > initTimes_
void watchPreallocate(Preallocate::slot_type const &iSlot)
ExternalWorkSleepingProducer(edm::ParameterSet const &p)
OneSleepingAnalyzer(edm::ParameterSet const &p)
std::vector< edm::WaitingTaskWithArenaHolder > waitingTaskPerStream_
#define DEFINE_FWK_MODULE(type)
std::vector< std::array< long, 3 > > waitTimesPerStream_
void watchPreStreamEndLumi(PreStreamEndLumi::slot_type const &iSlot)
example_stream void analyze(const edm::Event &, const edm::EventSetup &) override
std::vector< useconds_t > eventTimes_
void swap(Association< C > &lhs, Association< C > &rhs)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void produce(edm::Event &e, edm::EventSetup const &c) override
unsigned int maxNumberOfStreams() const
Handle< PROD > getHandle(EDGetTokenT< PROD > token) const
void addDefault(ParameterSetDescription const &psetDescription)
ConsumesCollector consumesCollector()
Use a ConsumesCollector to gather consumes information from helper functions.
SleepingProducer(edm::ParameterSet const &p)
std::unique_ptr< std::thread > serverThread_
const edm::EDPutTokenT< int > token_
const edm::EDPutTokenT< int > token_
void acquire(edm::StreamID, edm::Event const &e, edm::EventSetup const &c, edm::WaitingTaskWithArenaHolder holder) const override
ParameterDescriptionBase * add(U const &iLabel, T const &value)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
const unsigned int nWaitingEvents_
std::vector< edm::EDGetTokenT< int > > tokens_
#define DEFINE_FWK_SERVICE(type)
std::condition_variable condition_
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
OrphanHandle< PROD > emplace(EDPutTokenT< PROD > token, Args &&...args)
puts a new product
void produce(edm::StreamID, edm::Event &e, edm::EventSetup const &c) const override
OneSleepingProducer(edm::ParameterSet const &p)
T getParameter(std::string const &) const
void watchPreStreamBeginLumi(PreStreamBeginLumi::slot_type const &iSlot)
void watchPreEndJob(PreEndJob::slot_type const &iSlot)
std::vector< int > waitingStreams_
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
bool readyToDoSomething()
std::vector< long > finishTimes_
void analyze(edm::Event const &e, edm::EventSetup const &c) override
void produce(edm::StreamID, edm::Event &e, edm::EventSetup const &c) const override
std::vector< long > workTimes_
const edm::EDPutTokenT< int > token_
void asyncWork(edm::StreamID id, edm::WaitingTaskWithArenaHolder iTask, long initTime, long workTime, long finishTime)