18 #include <condition_variable> 45 auto const&
cv = p.
getParameter<std::vector<edm::InputTag>>(
"consumes");
47 for(
auto const&
c:
cv) {
48 tokens_.emplace_back( iCol.consumes<
int>(
c));
51 auto const& tv = p.
getParameter<std::vector<double>>(
"eventTimes");
69 desc.
add<std::vector<edm::InputTag>>(
"consumes", {})->setComment(
"What event int data products to consume");
70 desc.
add<std::vector<double>>(
"eventTimes")->setComment(
"The time, in seconds, for how long the module should sleep each event. The index to use is based on a modulo of size of the list applied to the Event ID number.");
74 std::vector<edm::EDGetTokenT<int>>
tokens_;
86 value_(p.getParameter<
int>(
"ivalue")),
87 sleeper_(p, consumesCollector())
103 sleeper_.getAndSleep(e);
105 e.
put(std::make_unique<int>(value_));
112 desc.
add<
int>(
"ivalue")->setComment(
"Value to put into Event");
113 Sleeper::fillDescription(desc);
121 value_(p.getParameter<
int>(
"ivalue")),
122 sleeper_(p, consumesCollector())
139 sleeper_.getAndSleep(e);
141 e.
put(std::make_unique<int>(value_));
148 desc.
add<
int>(
"ivalue")->setComment(
"Value to put into Event");
150 Sleeper::fillDescription(desc);
158 sleeper_(p, consumesCollector())
173 sleeper_.getAndSleep(e);
180 Sleeper::fillDescription(desc);
203 nWaitingEvents_(iPS.getUntrackedParameter<unsigned
int>(
"nWaitingEvents"))
207 waitingStreams_.reserve(nStreams);
208 waitTimesPerStream_.resize(nStreams);
209 waitingTaskPerStream_.resize(nStreams);
213 stopProcessing_ =
true;
214 condition_.notify_one();
215 serverThread_->join();
222 condition_.notify_one();
225 serverThread_ = std::make_unique<std::thread>([
this]() { threadWork(); } );
229 waitTimesPerStream_[
id.value()]={{initTime,workTime,finishTime}};
230 waitingTaskPerStream_[
id.value()]=
std::move(iTask);
232 std::lock_guard<std::mutex> lk{mutex_};
233 waitingStreams_.push_back(
id.
value());
235 condition_.notify_one();
240 if(stopProcessing_) {
243 if(waitingStreams_.size() >= nWaitingEvents_) {
247 return waitingStreams_.size() == activeStreams_;
251 while(not stopProcessing_.load()) {
252 std::vector<int> streamsToProcess;
254 std::unique_lock<std::mutex> lk(mutex_);
255 condition_.wait(lk, [
this]() {
256 return readyToDoSomething();
258 swap(streamsToProcess,waitingStreams_);
260 if(stopProcessing_) {
263 long longestTime = 0;
265 for(
auto i: streamsToProcess) {
266 auto const&
v=waitTimesPerStream_[
i];
267 if(
v[1]>longestTime) {
276 for(
auto i: streamsToProcess) {
277 auto const&
v=waitTimesPerStream_[
i];
279 waitingTaskPerStream_[
i].doneWaiting(std::exception_ptr());
282 waitingTaskPerStream_.clear();
291 std::atomic<unsigned int> activeStreams_{0};
292 std::atomic<bool> stopProcessing_{
false};
298 value_(p.getParameter<
int>(
"ivalue")),
299 sleeper_(p, consumesCollector())
302 auto const& tv = p.
getParameter<std::vector<double>>(
"serviceInitTimes");
303 initTimes_.reserve(tv.size());
305 initTimes_.push_back( static_cast<useconds_t>(
t*1E6));
309 auto const& tv = p.
getParameter<std::vector<double>>(
"serviceWorkTimes");
310 workTimes_.reserve(tv.size());
312 workTimes_.push_back( static_cast<useconds_t>(
t*1E6));
316 auto const& tv = p.
getParameter<std::vector<double>>(
"serviceFinishTimes");
317 finishTimes_.reserve(tv.size());
319 finishTimes_.push_back( static_cast<useconds_t>(
t*1E6));
322 assert(finishTimes_.size() == initTimes_.size());
323 assert(workTimes_.size() == initTimes_.size());
344 sleeper_.getAndSleep(e);
347 server->asyncWork(
id,
std::move(holder), initTimes_[
index], workTimes_[index], finishTimes_[index]);
352 e.
put(std::make_unique<int>(value_));
359 desc.
add<
int>(
"ivalue")->setComment(
"Value to put into Event");
360 desc.
add<std::vector<double>>(
"serviceInitTimes");
361 desc.
add<std::vector<double>>(
"serviceWorkTimes");
362 desc.
add<std::vector<double>>(
"serviceFinishTimes");
363 Sleeper::fillDescription(desc);
T getParameter(std::string const &) const
EventNumber_t event() const
static boost::mutex mutex
SleepingServer(edm::ParameterSet const &iPS, edm::ActivityRegistry &iAR)
OrphanHandle< PROD > put(std::unique_ptr< PROD > product)
Put a new product.
std::vector< long > initTimes_
void watchPreallocate(Preallocate::slot_type const &iSlot)
ExternalWorkSleepingProducer(edm::ParameterSet const &p)
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
virtual example_stream void analyze(const edm::Event &, const edm::EventSetup &) override
OneSleepingAnalyzer(edm::ParameterSet const &p)
std::vector< edm::WaitingTaskWithArenaHolder > waitingTaskPerStream_
bool getByToken(EDGetToken token, Handle< PROD > &result) const
#define DEFINE_FWK_MODULE(type)
void produce(edm::StreamID, edm::Event &e, edm::EventSetup const &c) const override
void watchPreStreamEndLumi(PreStreamEndLumi::slot_type const &iSlot)
std::vector< useconds_t > eventTimes_
void swap(Association< C > &lhs, Association< C > &rhs)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void produce(edm::Event &e, edm::EventSetup const &c) override
unsigned int maxNumberOfStreams() const
void addDefault(ParameterSetDescription const &psetDescription)
SleepingProducer(edm::ParameterSet const &p)
void acquire(edm::StreamID, edm::Event const &e, edm::EventSetup const &c, edm::WaitingTaskWithArenaHolder holder) const override
std::vector< std::array< long, 3 > > waitTimesPerStream_
std::unique_ptr< std::thread > serverThread_
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
ParameterDescriptionBase * add(U const &iLabel, T const &value)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
const unsigned int nWaitingEvents_
std::vector< edm::EDGetTokenT< int > > tokens_
#define DEFINE_FWK_SERVICE(type)
std::condition_variable condition_
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
OneSleepingProducer(edm::ParameterSet const &p)
void watchPreStreamBeginLumi(PreStreamBeginLumi::slot_type const &iSlot)
void watchPreEndJob(PreEndJob::slot_type const &iSlot)
std::vector< int > waitingStreams_
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
bool readyToDoSomething()
std::vector< long > finishTimes_
void produce(edm::StreamID, edm::Event &e, edm::EventSetup const &c) const override
void analyze(edm::Event const &e, edm::EventSetup const &c) override
std::vector< long > workTimes_
void asyncWork(edm::StreamID id, edm::WaitingTaskWithArenaHolder iTask, long initTime, long workTime, long finishTime)