CMS 3D CMS Logo

TimeStudyModules.cc
Go to the documentation of this file.
1 // -*- C++ -*-
2 //
3 // Package: FWCore/Modules
4 // Class : TimeStudyModules
5 //
6 // Implementation:
7 // [Notes on implementation]
8 //
9 // Original Author: Chris Jones
10 // Created: Thu, 22 Mar 2018 16:23:48 GMT
11 //
12 
13 // system include files
14 #include <unistd.h>
15 #include <vector>
16 #include <thread>
17 #include <atomic>
18 #include <condition_variable>
19 #include <mutex>
20 
21 // user include files
27 
33 
35 
39 
40 
41 namespace timestudy {
42  namespace {
43  struct Sleeper {
44  Sleeper(edm::ParameterSet const& p, edm::ConsumesCollector&& iCol ) {
45  auto const& cv = p.getParameter<std::vector<edm::InputTag>>("consumes");
46  tokens_.reserve(cv.size());
47  for(auto const& c: cv) {
48  tokens_.emplace_back( iCol.consumes<int>(c));
49  }
50 
51  auto const& tv = p.getParameter<std::vector<double>>("eventTimes");
52  eventTimes_.reserve(tv.size());
53  for(auto t: tv) {
54  eventTimes_.push_back( static_cast<useconds_t>(t*1E6));
55  }
56  }
57 
58  void
59  getAndSleep(edm::Event const& e) const {
61  for(auto const&t: tokens_) {
62  e.getByToken(t,h);
63  }
64  //Event number minimum value is 1
65  usleep( eventTimes_[ (e.id().event()-1) % eventTimes_.size()]);
66  }
67 
68  static void fillDescription(edm::ParameterSetDescription& desc) {
69  desc.add<std::vector<edm::InputTag>>("consumes", {})->setComment("What event int data products to consume");
70  desc.add<std::vector<double>>("eventTimes")->setComment("The time, in seconds, for how long the module should sleep each event. The index to use is based on a modulo of size of the list applied to the Event ID number.");
71  }
72 
73  private:
74  std::vector<edm::EDGetTokenT<int>> tokens_;
75  std::vector<useconds_t> eventTimes_;
76 
77  };
78  }
79 //--------------------------------------------------------------------
80 //
81 // Produces an IntProduct instance.
82 //
84 public:
86  value_(p.getParameter<int>("ivalue")),
87  sleeper_(p, consumesCollector())
88  {
89  produces<int>();
90  }
91  void produce(edm::StreamID, edm::Event& e, edm::EventSetup const& c) const override;
92 
93  static void fillDescriptions(edm::ConfigurationDescriptions & descriptions);
94 
95 private:
96  int value_;
97  Sleeper sleeper_;
98 };
99 
100 void
102  // EventSetup is not used.
103  sleeper_.getAndSleep(e);
104 
105  e.put(std::make_unique<int>(value_));
106 }
107 
108 void
111 
112  desc.add<int>("ivalue")->setComment("Value to put into Event");
113  Sleeper::fillDescription(desc);
114 
115  descriptions.addDefault(desc);
116 }
117 
118  class OneSleepingProducer : public edm::one::EDProducer<edm::one::SharedResources> {
119  public:
121  value_(p.getParameter<int>("ivalue")),
122  sleeper_(p, consumesCollector())
123  {
124  produces<int>();
125  usesResource(p.getParameter<std::string>("resource"));
126  }
127  void produce( edm::Event& e, edm::EventSetup const& c) override;
128 
129  static void fillDescriptions(edm::ConfigurationDescriptions & descriptions);
130 
131  private:
132  int value_;
133  Sleeper sleeper_;
134  };
135 
136  void
138  // EventSetup is not used.
139  sleeper_.getAndSleep(e);
140 
141  e.put(std::make_unique<int>(value_));
142  }
143 
144  void
147 
148  desc.add<int>("ivalue")->setComment("Value to put into Event");
149  desc.add<std::string>("resource",std::string())->setComment("The name of the resource that is being shared");
150  Sleeper::fillDescription(desc);
151 
152  descriptions.addDefault(desc);
153  }
154 
156  public:
158  sleeper_(p, consumesCollector())
159  {
160  }
161  void analyze( edm::Event const& e, edm::EventSetup const& c) override;
162 
163  static void fillDescriptions(edm::ConfigurationDescriptions & descriptions);
164 
165  private:
166  int value_;
167  Sleeper sleeper_;
168  };
169 
170  void
172  // EventSetup is not used.
173  sleeper_.getAndSleep(e);
174  }
175 
176  void
179 
180  Sleeper::fillDescription(desc);
181 
182  descriptions.addDefault(desc);
183  }
184 
185  /*
186  The SleepingServer is configured to wait to accumulate X events before starting to run.
187  On a call to asyncWork
188  -the data will be added to the streams' slot then the waiting thread will be informed
189  -if the server is waiting on threads
190  - it wakes up and sleeps for 'initTime'
191  - it then checks to see if another event was pushed and if it does it continues to do the sleep loop
192  - once all sleep are done, it checks to see if enough events have contacted it and if so it sleeps for the longest 'workTime' duration given
193  - when done, it sleeps for each event 'finishTime' and when it wakes it sends the callback
194  - when all calledback, it goes back to check if there are waiting events
195  - if there are not enough waiting events, it goes back to waiting on a condition variable
196 
197  The SleepingServer keeps track of the number of active Streams by counting the number of streamBeginLumi and streamEndLumi calls have taken place. If there are insufficient active Lumis compared to the number of events it wants to wait for, the Server thread is told to start processing without further waiting.
198 
199  */
201  public:
203  nWaitingEvents_(iPS.getUntrackedParameter<unsigned int>("nWaitingEvents"))
204  {
205  iAR.watchPreallocate([this](edm::service::SystemBounds const& iBounds) {
206  auto const nStreams =iBounds.maxNumberOfStreams();
207  waitingStreams_.reserve(nStreams);
208  waitTimesPerStream_.resize(nStreams);
209  waitingTaskPerStream_.resize(nStreams);
210  });
211 
212  iAR.watchPreEndJob([this]() {
213  stopProcessing_ = true;
214  condition_.notify_one();
215  serverThread_->join();
216  });
217  iAR.watchPreStreamBeginLumi([this](edm::StreamContext const&) {
218  ++activeStreams_;
219  });
220  iAR.watchPreStreamEndLumi([this](edm::StreamContext const&) {
221  --activeStreams_;
222  condition_.notify_one();
223  });
224 
225  serverThread_ = std::make_unique<std::thread>([this]() { threadWork(); } );
226  }
227 
228  void asyncWork(edm::StreamID id, edm::WaitingTaskWithArenaHolder iTask, long initTime, long workTime, long finishTime) {
229  waitTimesPerStream_[id.value()]={{initTime,workTime,finishTime}};
230  waitingTaskPerStream_[id.value()]=std::move(iTask);
231  {
232  std::lock_guard<std::mutex> lk{mutex_};
233  waitingStreams_.push_back(id.value());
234  }
235  condition_.notify_one();
236  }
237 
238  private:
240  if(stopProcessing_) {
241  return true;
242  }
243  if(waitingStreams_.size() >= nWaitingEvents_) {
244  return true;
245  }
246  //every running stream is now waiting
247  return waitingStreams_.size() == activeStreams_;
248  }
249 
250  void threadWork() {
251  while(not stopProcessing_.load()) {
252  std::vector<int> streamsToProcess;
253  {
254  std::unique_lock<std::mutex> lk(mutex_);
255  condition_.wait(lk, [this]() {
256  return readyToDoSomething();
257  });
258  swap(streamsToProcess,waitingStreams_);
259  }
260  if(stopProcessing_) {
261  break;
262  }
263  long longestTime = 0;
264  //simulate filling the external device
265  for(auto i: streamsToProcess) {
266  auto const& v=waitTimesPerStream_[i];
267  if(v[1]>longestTime) {
268  longestTime = v[1];
269  }
270  usleep(v[0]);
271  }
272  //simulate running external device
273  usleep(longestTime);
274 
275  //simulate copying data back
276  for(auto i: streamsToProcess) {
277  auto const& v=waitTimesPerStream_[i];
278  usleep(v[2]);
279  waitingTaskPerStream_[i].doneWaiting(std::exception_ptr());
280  }
281  }
282  waitingTaskPerStream_.clear();
283  }
284  const unsigned int nWaitingEvents_;
285  std::unique_ptr<std::thread> serverThread_;
286  std::vector<int> waitingStreams_;
287  std::vector<std::array<long,3>> waitTimesPerStream_;
288  std::vector<edm::WaitingTaskWithArenaHolder> waitingTaskPerStream_;
290  std::condition_variable condition_;
291  std::atomic<unsigned int> activeStreams_{0};
292  std::atomic<bool> stopProcessing_{false};
293  };
294 
295  class ExternalWorkSleepingProducer : public edm::global::EDProducer<edm::ExternalWork> {
296  public:
298  value_(p.getParameter<int>("ivalue")),
299  sleeper_(p, consumesCollector())
300  {
301  {
302  auto const& tv = p.getParameter<std::vector<double>>("serviceInitTimes");
303  initTimes_.reserve(tv.size());
304  for(auto t: tv) {
305  initTimes_.push_back( static_cast<useconds_t>(t*1E6));
306  }
307  }
308  {
309  auto const& tv = p.getParameter<std::vector<double>>("serviceWorkTimes");
310  workTimes_.reserve(tv.size());
311  for(auto t: tv) {
312  workTimes_.push_back( static_cast<useconds_t>(t*1E6));
313  }
314  }
315  {
316  auto const& tv = p.getParameter<std::vector<double>>("serviceFinishTimes");
317  finishTimes_.reserve(tv.size());
318  for(auto t: tv) {
319  finishTimes_.push_back( static_cast<useconds_t>(t*1E6));
320  }
321  }
322  assert(finishTimes_.size() == initTimes_.size());
323  assert(workTimes_.size() == initTimes_.size());
324 
325  produces<int>();
326  }
327  void acquire(edm::StreamID, edm::Event const & e, edm::EventSetup const& c, edm::WaitingTaskWithArenaHolder holder) const override;
328 
329  void produce(edm::StreamID, edm::Event& e, edm::EventSetup const& c) const override;
330 
331  static void fillDescriptions(edm::ConfigurationDescriptions & descriptions);
332 
333  private:
334  std::vector<long> initTimes_;
335  std::vector<long> workTimes_;
336  std::vector<long> finishTimes_;
337  int value_;
338  Sleeper sleeper_;
339  };
340 
341  void
343  // EventSetup is not used.
344  sleeper_.getAndSleep(e);
346  auto index = (e.id().event()-1) % initTimes_.size();
347  server->asyncWork(id, std::move(holder), initTimes_[index], workTimes_[index], finishTimes_[index]);
348  }
349 
350  void
352  e.put(std::make_unique<int>(value_));
353  }
354 
355  void
358 
359  desc.add<int>("ivalue")->setComment("Value to put into Event");
360  desc.add<std::vector<double>>("serviceInitTimes");
361  desc.add<std::vector<double>>("serviceWorkTimes");
362  desc.add<std::vector<double>>("serviceFinishTimes");
363  Sleeper::fillDescription(desc);
364 
365  descriptions.addDefault(desc);
366  }
367 
368 }
374 
T getParameter(std::string const &) const
EventNumber_t event() const
Definition: EventID.h:41
static boost::mutex mutex
Definition: Proxy.cc:11
SleepingServer(edm::ParameterSet const &iPS, edm::ActivityRegistry &iAR)
OrphanHandle< PROD > put(std::unique_ptr< PROD > product)
Put a new product.
Definition: Event.h:137
void watchPreallocate(Preallocate::slot_type const &iSlot)
ExternalWorkSleepingProducer(edm::ParameterSet const &p)
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
virtual example_stream void analyze(const edm::Event &, const edm::EventSetup &) override
OneSleepingAnalyzer(edm::ParameterSet const &p)
std::vector< edm::WaitingTaskWithArenaHolder > waitingTaskPerStream_
bool getByToken(EDGetToken token, Handle< PROD > &result) const
Definition: Event.h:579
#define DEFINE_FWK_MODULE(type)
Definition: MakerMacros.h:17
void produce(edm::StreamID, edm::Event &e, edm::EventSetup const &c) const override
cv
Definition: cuy.py:363
void watchPreStreamEndLumi(PreStreamEndLumi::slot_type const &iSlot)
std::vector< useconds_t > eventTimes_
void swap(Association< C > &lhs, Association< C > &rhs)
Definition: Association.h:116
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void produce(edm::Event &e, edm::EventSetup const &c) override
unsigned int maxNumberOfStreams() const
Definition: SystemBounds.h:43
void addDefault(ParameterSetDescription const &psetDescription)
SleepingProducer(edm::ParameterSet const &p)
void acquire(edm::StreamID, edm::Event const &e, edm::EventSetup const &c, edm::WaitingTaskWithArenaHolder holder) const override
std::vector< std::array< long, 3 > > waitTimesPerStream_
std::unique_ptr< std::thread > serverThread_
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
ParameterDescriptionBase * add(U const &iLabel, T const &value)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
const unsigned int nWaitingEvents_
std::vector< edm::EDGetTokenT< int > > tokens_
#define DEFINE_FWK_SERVICE(type)
Definition: ServiceMaker.h:113
std::condition_variable condition_
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
OneSleepingProducer(edm::ParameterSet const &p)
void watchPreStreamBeginLumi(PreStreamBeginLumi::slot_type const &iSlot)
void watchPreEndJob(PreEndJob::slot_type const &iSlot)
std::vector< int > waitingStreams_
edm::EventID id() const
Definition: EventBase.h:60
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void produce(edm::StreamID, edm::Event &e, edm::EventSetup const &c) const override
void analyze(edm::Event const &e, edm::EventSetup const &c) override
def move(src, dest)
Definition: eostools.py:510
void asyncWork(edm::StreamID id, edm::WaitingTaskWithArenaHolder iTask, long initTime, long workTime, long finishTime)