CMS 3D CMS Logo

TimeStudyModules.cc
Go to the documentation of this file.
1 // -*- C++ -*-
2 //
3 // Package: FWCore/Modules
4 // Class : TimeStudyModules
5 //
6 // Implementation:
7 // [Notes on implementation]
8 //
9 // Original Author: Chris Jones
10 // Created: Thu, 22 Mar 2018 16:23:48 GMT
11 //
12 
13 // system include files
14 #include <unistd.h>
15 #include <vector>
16 #include <thread>
17 #include <atomic>
18 #include <condition_variable>
19 #include <mutex>
20 
21 // user include files
27 
34 
36 
40 
41 namespace timestudy {
42  namespace {
43  struct Sleeper {
44  Sleeper(edm::ParameterSet const& p, edm::ConsumesCollector&& iCol) {
45  auto const& cv = p.getParameter<std::vector<edm::InputTag>>("consumes");
46  tokens_.reserve(cv.size());
47  for (auto const& c : cv) {
48  tokens_.emplace_back(iCol.consumes<int>(c));
49  }
50 
51  auto const& tv = p.getParameter<std::vector<double>>("eventTimes");
52  eventTimes_.reserve(tv.size());
53  for (auto t : tv) {
54  eventTimes_.push_back(static_cast<useconds_t>(t * 1E6));
55  }
56  }
57 
58  void getAndSleep(edm::Event const& e) const {
59  for (auto const& t : tokens_) {
60  (void)e.getHandle(t);
61  }
62  //Event number minimum value is 1
63  usleep(eventTimes_[(e.id().event() - 1) % eventTimes_.size()]);
64  }
65 
66  static void fillDescription(edm::ParameterSetDescription& desc) {
67  desc.add<std::vector<edm::InputTag>>("consumes", {})->setComment("What event int data products to consume");
68  desc.add<std::vector<double>>("eventTimes")
69  ->setComment(
70  "The time, in seconds, for how long the module should sleep each event. The index to use is based on a "
71  "modulo of size of the list applied to the Event ID number.");
72  }
73 
74  private:
75  std::vector<edm::EDGetTokenT<int>> tokens_;
76  std::vector<useconds_t> eventTimes_;
77  };
78  } // namespace
79  //--------------------------------------------------------------------
80  //
81  // Produces an IntProduct instance.
82  //
84  public:
86  : value_(p.getParameter<int>("ivalue")), sleeper_(p, consumesCollector()), token_{produces<int>()} {}
87  void produce(edm::StreamID, edm::Event& e, edm::EventSetup const& c) const override;
88 
89  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
90 
91  private:
92  const int value_;
93  Sleeper sleeper_;
95  };
96 
98  // EventSetup is not used.
99  sleeper_.getAndSleep(e);
100 
101  e.emplace(token_, value_);
102  }
103 
106 
107  desc.add<int>("ivalue")->setComment("Value to put into Event");
108  Sleeper::fillDescription(desc);
109 
110  descriptions.addDefault(desc);
111  }
112 
113  class OneSleepingProducer : public edm::one::EDProducer<edm::one::SharedResources> {
114  public:
116  : value_(p.getParameter<int>("ivalue")), sleeper_(p, consumesCollector()), token_{produces<int>()} {
117  usesResource(p.getParameter<std::string>("resource"));
118  }
119  void produce(edm::Event& e, edm::EventSetup const& c) override;
120 
121  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
122 
123  private:
124  const int value_;
125  Sleeper sleeper_;
127  };
128 
130  // EventSetup is not used.
131  sleeper_.getAndSleep(e);
132 
133  e.emplace(token_, value_);
134  }
135 
138 
139  desc.add<int>("ivalue")->setComment("Value to put into Event");
140  desc.add<std::string>("resource", std::string())->setComment("The name of the resource that is being shared");
141  Sleeper::fillDescription(desc);
142 
143  descriptions.addDefault(desc);
144  }
145 
147  public:
148  explicit OneSleepingAnalyzer(edm::ParameterSet const& p) : sleeper_(p, consumesCollector()) {}
149  void analyze(edm::Event const& e, edm::EventSetup const& c) override;
150 
151  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
152 
153  private:
154  Sleeper sleeper_;
155  };
156 
158  // EventSetup is not used.
159  sleeper_.getAndSleep(e);
160  }
161 
164 
165  Sleeper::fillDescription(desc);
166 
167  descriptions.addDefault(desc);
168  }
169 
170  /*
171  The SleepingServer is configured to wait to accumulate X events before starting to run.
172  On a call to asyncWork
173  -the data will be added to the streams' slot then the waiting thread will be informed
174  -if the server is waiting on threads
175  - it wakes up and sleeps for 'initTime'
176  - it then checks to see if another event was pushed and if it does it continues to do the sleep loop
177  - once all sleep are done, it checks to see if enough events have contacted it and if so it sleeps for the longest 'workTime' duration given
178  - when done, it sleeps for each event 'finishTime' and when it wakes it sends the callback
179  - when all calledback, it goes back to check if there are waiting events
180  - if there are not enough waiting events, it goes back to waiting on a condition variable
181 
182  The SleepingServer keeps track of the number of active Streams by counting the number of streamBeginLumi and streamEndLumi calls have taken place. If there are insufficient active Lumis compared to the number of events it wants to wait for, the Server thread is told to start processing without further waiting.
183 
184  */
186  public:
188  : nWaitingEvents_(iPS.getUntrackedParameter<unsigned int>("nWaitingEvents")) {
189  iAR.watchPreallocate([this](edm::service::SystemBounds const& iBounds) {
190  auto const nStreams = iBounds.maxNumberOfStreams();
191  waitingStreams_.reserve(nStreams);
192  waitTimesPerStream_.resize(nStreams);
193  waitingTaskPerStream_.resize(nStreams);
194  });
195 
196  iAR.watchPreEndJob([this]() {
197  stopProcessing_ = true;
198  condition_.notify_one();
199  serverThread_->join();
200  });
201  iAR.watchPreStreamBeginLumi([this](edm::StreamContext const&) { ++activeStreams_; });
202  iAR.watchPreStreamEndLumi([this](edm::StreamContext const&) {
203  --activeStreams_;
204  condition_.notify_one();
205  });
206 
207  serverThread_ = std::make_unique<std::thread>([this]() { threadWork(); });
208  }
209 
210  void asyncWork(
211  edm::StreamID id, edm::WaitingTaskWithArenaHolder iTask, long initTime, long workTime, long finishTime) {
212  waitTimesPerStream_[id.value()] = {{initTime, workTime, finishTime}};
213  waitingTaskPerStream_[id.value()] = std::move(iTask);
214  {
215  std::lock_guard<std::mutex> lk{mutex_};
216  waitingStreams_.push_back(id.value());
217  }
218  condition_.notify_one();
219  }
220 
221  private:
223  if (stopProcessing_) {
224  return true;
225  }
226  if (waitingStreams_.size() >= nWaitingEvents_) {
227  return true;
228  }
229  //every running stream is now waiting
230  return waitingStreams_.size() == activeStreams_;
231  }
232 
233  void threadWork() {
234  while (not stopProcessing_.load()) {
235  std::vector<int> streamsToProcess;
236  {
237  std::unique_lock<std::mutex> lk(mutex_);
238  condition_.wait(lk, [this]() { return readyToDoSomething(); });
239  swap(streamsToProcess, waitingStreams_);
240  }
241  if (stopProcessing_) {
242  break;
243  }
244  long longestTime = 0;
245  //simulate filling the external device
246  for (auto i : streamsToProcess) {
247  auto const& v = waitTimesPerStream_[i];
248  if (v[1] > longestTime) {
249  longestTime = v[1];
250  }
251  usleep(v[0]);
252  }
253  //simulate running external device
254  usleep(longestTime);
255 
256  //simulate copying data back
257  for (auto i : streamsToProcess) {
258  auto const& v = waitTimesPerStream_[i];
259  usleep(v[2]);
260  waitingTaskPerStream_[i].doneWaiting(std::exception_ptr());
261  }
262  }
263  waitingTaskPerStream_.clear();
264  }
265  const unsigned int nWaitingEvents_;
266  std::unique_ptr<std::thread> serverThread_;
267  std::vector<int> waitingStreams_;
268  std::vector<std::array<long, 3>> waitTimesPerStream_;
269  std::vector<edm::WaitingTaskWithArenaHolder> waitingTaskPerStream_;
271  std::condition_variable condition_;
272  std::atomic<unsigned int> activeStreams_{0};
273  std::atomic<bool> stopProcessing_{false};
274  };
275 
276  class ExternalWorkSleepingProducer : public edm::global::EDProducer<edm::ExternalWork> {
277  public:
279  : value_(p.getParameter<int>("ivalue")), sleeper_(p, consumesCollector()), token_{produces<int>()} {
280  {
281  auto const& tv = p.getParameter<std::vector<double>>("serviceInitTimes");
282  initTimes_.reserve(tv.size());
283  for (auto t : tv) {
284  initTimes_.push_back(static_cast<useconds_t>(t * 1E6));
285  }
286  }
287  {
288  auto const& tv = p.getParameter<std::vector<double>>("serviceWorkTimes");
289  workTimes_.reserve(tv.size());
290  for (auto t : tv) {
291  workTimes_.push_back(static_cast<useconds_t>(t * 1E6));
292  }
293  }
294  {
295  auto const& tv = p.getParameter<std::vector<double>>("serviceFinishTimes");
296  finishTimes_.reserve(tv.size());
297  for (auto t : tv) {
298  finishTimes_.push_back(static_cast<useconds_t>(t * 1E6));
299  }
300  }
301  assert(finishTimes_.size() == initTimes_.size());
302  assert(workTimes_.size() == initTimes_.size());
303  }
304  void acquire(edm::StreamID,
305  edm::Event const& e,
306  edm::EventSetup const& c,
307  edm::WaitingTaskWithArenaHolder holder) const override;
308 
309  void produce(edm::StreamID, edm::Event& e, edm::EventSetup const& c) const override;
310 
311  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
312 
313  private:
314  std::vector<long> initTimes_;
315  std::vector<long> workTimes_;
316  std::vector<long> finishTimes_;
317  const int value_;
318  Sleeper sleeper_;
320  };
321 
323  edm::Event const& e,
324  edm::EventSetup const&,
325  edm::WaitingTaskWithArenaHolder holder) const {
326  // EventSetup is not used.
327  sleeper_.getAndSleep(e);
329  auto index = (e.id().event() - 1) % initTimes_.size();
330  server->asyncWork(id, std::move(holder), initTimes_[index], workTimes_[index], finishTimes_[index]);
331  }
332 
334  e.emplace(token_, value_);
335  }
336 
339 
340  desc.add<int>("ivalue")->setComment("Value to put into Event");
341  desc.add<std::vector<double>>("serviceInitTimes");
342  desc.add<std::vector<double>>("serviceWorkTimes");
343  desc.add<std::vector<double>>("serviceFinishTimes");
344  Sleeper::fillDescription(desc);
345 
346  descriptions.addDefault(desc);
347  }
348 
349 } // namespace timestudy
T getParameter(std::string const &) const
EventNumber_t event() const
Definition: EventID.h:41
static boost::mutex mutex
Definition: Proxy.cc:11
SleepingServer(edm::ParameterSet const &iPS, edm::ActivityRegistry &iAR)
void watchPreallocate(Preallocate::slot_type const &iSlot)
ExternalWorkSleepingProducer(edm::ParameterSet const &p)
OneSleepingAnalyzer(edm::ParameterSet const &p)
std::vector< edm::WaitingTaskWithArenaHolder > waitingTaskPerStream_
void produce(edm::StreamID, edm::Event &e, edm::EventSetup const &c) const override
cv
Definition: cuy.py:364
std::vector< std::array< long, 3 > > waitTimesPerStream_
void watchPreStreamEndLumi(PreStreamEndLumi::slot_type const &iSlot)
example_stream void analyze(const edm::Event &, const edm::EventSetup &) override
std::vector< useconds_t > eventTimes_
void swap(Association< C > &lhs, Association< C > &rhs)
Definition: Association.h:116
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void produce(edm::Event &e, edm::EventSetup const &c) override
unsigned int maxNumberOfStreams() const
Definition: SystemBounds.h:35
Handle< PROD > getHandle(EDGetTokenT< PROD > token) const
Definition: Event.h:539
#define DEFINE_FWK_MODULE(type)
Definition: MakerMacros.h:16
void addDefault(ParameterSetDescription const &psetDescription)
SleepingProducer(edm::ParameterSet const &p)
void acquire(edm::StreamID, edm::Event const &e, edm::EventSetup const &c, edm::WaitingTaskWithArenaHolder holder) const override
std::unique_ptr< std::thread > serverThread_
const edm::EDPutTokenT< int > token_
const edm::EDPutTokenT< int > token_
#define DEFINE_FWK_SERVICE(type)
Definition: ServiceMaker.h:105
ParameterDescriptionBase * add(U const &iLabel, T const &value)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
const unsigned int nWaitingEvents_
std::vector< edm::EDGetTokenT< int > > tokens_
std::condition_variable condition_
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
OrphanHandle< PROD > emplace(EDPutTokenT< PROD > token, Args &&...args)
puts a new product
Definition: Event.h:413
OneSleepingProducer(edm::ParameterSet const &p)
void watchPreStreamBeginLumi(PreStreamBeginLumi::slot_type const &iSlot)
void watchPreEndJob(PreEndJob::slot_type const &iSlot)
std::vector< int > waitingStreams_
edm::EventID id() const
Definition: EventBase.h:59
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void produce(edm::StreamID, edm::Event &e, edm::EventSetup const &c) const override
void analyze(edm::Event const &e, edm::EventSetup const &c) override
const edm::EDPutTokenT< int > token_
def move(src, dest)
Definition: eostools.py:511
void asyncWork(edm::StreamID id, edm::WaitingTaskWithArenaHolder iTask, long initTime, long workTime, long finishTime)