17 #include <condition_variable> 45 auto const&
cv =
p.getParameter<std::vector<edm::InputTag>>(
"consumes");
47 for (
auto const&
c :
cv) {
48 tokens_.emplace_back(iCol.consumes<
int>(
c));
51 auto const& tv =
p.getParameter<std::vector<double>>(
"eventTimes");
67 desc.add<std::vector<edm::InputTag>>(
"consumes", {})->setComment(
"What event int data products to consume");
68 desc.add<std::vector<double>>(
"eventTimes")
70 "The time, in seconds, for how long the module should sleep each event. The index to use is based on a " 71 "modulo of size of the list applied to the Event ID number.");
75 std::vector<edm::EDGetTokenT<int>>
tokens_;
107 desc.add<
int>(
"ivalue")->setComment(
"Value to put into Event");
108 Sleeper::fillDescription(
desc);
139 desc.add<
int>(
"ivalue")->setComment(
"Value to put into Event");
141 Sleeper::fillDescription(
desc);
165 Sleeper::fillDescription(
desc);
215 std::lock_guard<std::mutex> lk{
mutex_};
235 std::vector<int> streamsToProcess;
237 std::unique_lock<std::mutex> lk(
mutex_);
244 long longestTime = 0;
246 for (
auto i : streamsToProcess) {
248 if (
v[1] > longestTime) {
251 std::this_thread::sleep_for(std::chrono::microseconds(
v[0]));
254 std::this_thread::sleep_for(std::chrono::microseconds(longestTime));
257 for (
auto i : streamsToProcess) {
259 std::this_thread::sleep_for(std::chrono::microseconds(
v[2]));
281 auto const& tv =
p.getParameter<std::vector<double>>(
"serviceInitTimes");
284 initTimes_.push_back(static_cast<useconds_t>(
t * 1E6));
288 auto const& tv =
p.getParameter<std::vector<double>>(
"serviceWorkTimes");
291 workTimes_.push_back(static_cast<useconds_t>(
t * 1E6));
295 auto const& tv =
p.getParameter<std::vector<double>>(
"serviceFinishTimes");
340 desc.add<
int>(
"ivalue")->setComment(
"Value to put into Event");
341 desc.add<std::vector<double>>(
"serviceInitTimes");
342 desc.add<std::vector<double>>(
"serviceWorkTimes");
343 desc.add<std::vector<double>>(
"serviceFinishTimes");
344 Sleeper::fillDescription(
desc);
SleepingServer(edm::ParameterSet const &iPS, edm::ActivityRegistry &iAR)
std::vector< long > initTimes_
void watchPreallocate(Preallocate::slot_type const &iSlot)
ExternalWorkSleepingProducer(edm::ParameterSet const &p)
OneSleepingAnalyzer(edm::ParameterSet const &p)
std::atomic< unsigned int > activeStreams_
std::vector< edm::WaitingTaskWithArenaHolder > waitingTaskPerStream_
unsigned int maxNumberOfStreams() const
std::vector< std::array< long, 3 > > waitTimesPerStream_
void watchPreStreamEndLumi(PreStreamEndLumi::slot_type const &iSlot)
std::vector< useconds_t > eventTimes_
void swap(Association< C > &lhs, Association< C > &rhs)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void produce(edm::Event &e, edm::EventSetup const &c) override
TEMPL(T2) struct Divides void
void addDefault(ParameterSetDescription const &psetDescription)
ConsumesCollector consumesCollector()
Use a ConsumesCollector to gather consumes information from helper functions.
SleepingProducer(edm::ParameterSet const &p)
std::unique_ptr< std::thread > serverThread_
const edm::EDPutTokenT< int > token_
#define DEFINE_FWK_MODULE(type)
const edm::EDPutTokenT< int > token_
void acquire(edm::StreamID, edm::Event const &e, edm::EventSetup const &c, edm::WaitingTaskWithArenaHolder holder) const override
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
const unsigned int nWaitingEvents_
std::vector< edm::EDGetTokenT< int > > tokens_
#define DEFINE_FWK_SERVICE(type)
std::condition_variable condition_
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void produce(edm::StreamID, edm::Event &e, edm::EventSetup const &c) const override
OneSleepingProducer(edm::ParameterSet const &p)
void watchPreStreamBeginLumi(PreStreamBeginLumi::slot_type const &iSlot)
void watchPreEndJob(PreEndJob::slot_type const &iSlot)
std::vector< int > waitingStreams_
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
bool readyToDoSomething()
std::vector< long > finishTimes_
void analyze(edm::Event const &e, edm::EventSetup const &c) override
void produce(edm::StreamID, edm::Event &e, edm::EventSetup const &c) const override
std::vector< long > workTimes_
const edm::EDPutTokenT< int > token_
std::atomic< bool > stopProcessing_
void asyncWork(edm::StreamID id, edm::WaitingTaskWithArenaHolder iTask, long initTime, long workTime, long finishTime)