18 #include <condition_variable> 46 auto const&
cv = p.
getParameter<std::vector<edm::InputTag>>(
"consumes");
48 for(
auto const&
c:
cv) {
49 tokens_.emplace_back( iCol.consumes<
int>(
c));
52 auto const& tv = p.
getParameter<std::vector<double>>(
"eventTimes");
70 desc.
add<std::vector<edm::InputTag>>(
"consumes", {})->setComment(
"What event int data products to consume");
71 desc.
add<std::vector<double>>(
"eventTimes")->setComment(
"The time, in seconds, for how long the module should sleep each event. The index to use is based on a modulo of size of the list applied to the Event ID number.");
75 std::vector<edm::EDGetTokenT<int>>
tokens_;
87 value_(p.getParameter<
int>(
"ivalue")),
88 sleeper_(p, consumesCollector()),
89 token_{produces<int>()}
105 sleeper_.getAndSleep(e);
114 desc.
add<
int>(
"ivalue")->setComment(
"Value to put into Event");
115 Sleeper::fillDescription(desc);
123 value_(p.getParameter<
int>(
"ivalue")),
124 sleeper_(p, consumesCollector()),
125 token_{produces<int>()}
142 sleeper_.getAndSleep(e);
151 desc.
add<
int>(
"ivalue")->setComment(
"Value to put into Event");
153 Sleeper::fillDescription(desc);
161 sleeper_(p, consumesCollector())
175 sleeper_.getAndSleep(e);
182 Sleeper::fillDescription(desc);
205 nWaitingEvents_(iPS.getUntrackedParameter<unsigned
int>(
"nWaitingEvents"))
209 waitingStreams_.reserve(nStreams);
210 waitTimesPerStream_.resize(nStreams);
211 waitingTaskPerStream_.resize(nStreams);
215 stopProcessing_ =
true;
216 condition_.notify_one();
217 serverThread_->join();
224 condition_.notify_one();
227 serverThread_ = std::make_unique<std::thread>([
this]() { threadWork(); } );
231 waitTimesPerStream_[
id.value()]={{initTime,workTime,finishTime}};
232 waitingTaskPerStream_[
id.value()]=
std::move(iTask);
234 std::lock_guard<std::mutex> lk{mutex_};
235 waitingStreams_.push_back(
id.
value());
237 condition_.notify_one();
242 if(stopProcessing_) {
245 if(waitingStreams_.size() >= nWaitingEvents_) {
249 return waitingStreams_.size() == activeStreams_;
253 while(not stopProcessing_.load()) {
254 std::vector<int> streamsToProcess;
256 std::unique_lock<std::mutex> lk(mutex_);
257 condition_.wait(lk, [
this]() {
258 return readyToDoSomething();
260 swap(streamsToProcess,waitingStreams_);
262 if(stopProcessing_) {
265 long longestTime = 0;
267 for(
auto i: streamsToProcess) {
268 auto const&
v=waitTimesPerStream_[
i];
269 if(
v[1]>longestTime) {
278 for(
auto i: streamsToProcess) {
279 auto const&
v=waitTimesPerStream_[
i];
281 waitingTaskPerStream_[
i].doneWaiting(std::exception_ptr());
284 waitingTaskPerStream_.clear();
293 std::atomic<unsigned int> activeStreams_{0};
294 std::atomic<bool> stopProcessing_{
false};
300 value_(p.getParameter<
int>(
"ivalue")),
301 sleeper_(p, consumesCollector()),
302 token_{produces<int>()}
305 auto const& tv =
p.getParameter<std::vector<double>>(
"serviceInitTimes");
306 initTimes_.reserve(tv.size());
308 initTimes_.push_back( static_cast<useconds_t>(
t*1E6));
312 auto const& tv =
p.getParameter<std::vector<double>>(
"serviceWorkTimes");
313 workTimes_.reserve(tv.size());
315 workTimes_.push_back( static_cast<useconds_t>(
t*1E6));
319 auto const& tv =
p.getParameter<std::vector<double>>(
"serviceFinishTimes");
320 finishTimes_.reserve(tv.size());
322 finishTimes_.push_back( static_cast<useconds_t>(
t*1E6));
325 assert(finishTimes_.size() == initTimes_.size());
326 assert(workTimes_.size() == initTimes_.size());
346 sleeper_.getAndSleep(e);
349 server->asyncWork(
id,
std::move(holder), initTimes_[
index], workTimes_[index], finishTimes_[index]);
361 desc.
add<
int>(
"ivalue")->setComment(
"Value to put into Event");
362 desc.
add<std::vector<double>>(
"serviceInitTimes");
363 desc.
add<std::vector<double>>(
"serviceWorkTimes");
364 desc.
add<std::vector<double>>(
"serviceFinishTimes");
365 Sleeper::fillDescription(desc);
T getParameter(std::string const &) const
EventNumber_t event() const
static boost::mutex mutex
SleepingServer(edm::ParameterSet const &iPS, edm::ActivityRegistry &iAR)
std::vector< long > initTimes_
void watchPreallocate(Preallocate::slot_type const &iSlot)
ExternalWorkSleepingProducer(edm::ParameterSet const &p)
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
OneSleepingAnalyzer(edm::ParameterSet const &p)
std::vector< edm::WaitingTaskWithArenaHolder > waitingTaskPerStream_
bool getByToken(EDGetToken token, Handle< PROD > &result) const
#define DEFINE_FWK_MODULE(type)
void produce(edm::StreamID, edm::Event &e, edm::EventSetup const &c) const override
void watchPreStreamEndLumi(PreStreamEndLumi::slot_type const &iSlot)
std::vector< useconds_t > eventTimes_
void swap(Association< C > &lhs, Association< C > &rhs)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void produce(edm::Event &e, edm::EventSetup const &c) override
unsigned int maxNumberOfStreams() const
void addDefault(ParameterSetDescription const &psetDescription)
SleepingProducer(edm::ParameterSet const &p)
void acquire(edm::StreamID, edm::Event const &e, edm::EventSetup const &c, edm::WaitingTaskWithArenaHolder holder) const override
std::vector< std::array< long, 3 > > waitTimesPerStream_
std::unique_ptr< std::thread > serverThread_
const edm::EDPutTokenT< int > token_
const edm::EDPutTokenT< int > token_
ParameterDescriptionBase * add(U const &iLabel, T const &value)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
const unsigned int nWaitingEvents_
std::vector< edm::EDGetTokenT< int > > tokens_
#define DEFINE_FWK_SERVICE(type)
std::condition_variable condition_
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
OrphanHandle< PROD > emplace(EDPutTokenT< PROD > token, Args &&...args)
puts a new product
OneSleepingProducer(edm::ParameterSet const &p)
void watchPreStreamBeginLumi(PreStreamBeginLumi::slot_type const &iSlot)
void watchPreEndJob(PreEndJob::slot_type const &iSlot)
std::vector< int > waitingStreams_
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
bool readyToDoSomething()
std::vector< long > finishTimes_
void produce(edm::StreamID, edm::Event &e, edm::EventSetup const &c) const override
void analyze(edm::Event const &e, edm::EventSetup const &c) override
std::vector< long > workTimes_
const edm::EDPutTokenT< int > token_
virtual example_stream void analyze(const edm::Event &, const edm::EventSetup &) override
void asyncWork(edm::StreamID id, edm::WaitingTaskWithArenaHolder iTask, long initTime, long workTime, long finishTime)