CMS 3D CMS Logo

TimeStudyModules.cc
Go to the documentation of this file.
1 // -*- C++ -*-
2 //
3 // Package: FWCore/Modules
4 // Class : TimeStudyModules
5 //
6 // Implementation:
7 // [Notes on implementation]
8 //
9 // Original Author: Chris Jones
10 // Created: Thu, 22 Mar 2018 16:23:48 GMT
11 //
12 
13 // system include files
14 #include <unistd.h>
15 #include <vector>
16 #include <thread>
17 #include <atomic>
18 #include <condition_variable>
19 #include <mutex>
20 
21 // user include files
27 
34 
36 
40 
41 
42 namespace timestudy {
43  namespace {
44  struct Sleeper {
45  Sleeper(edm::ParameterSet const& p, edm::ConsumesCollector&& iCol ) {
46  auto const& cv = p.getParameter<std::vector<edm::InputTag>>("consumes");
47  tokens_.reserve(cv.size());
48  for(auto const& c: cv) {
49  tokens_.emplace_back( iCol.consumes<int>(c));
50  }
51 
52  auto const& tv = p.getParameter<std::vector<double>>("eventTimes");
53  eventTimes_.reserve(tv.size());
54  for(auto t: tv) {
55  eventTimes_.push_back( static_cast<useconds_t>(t*1E6));
56  }
57  }
58 
59  void
60  getAndSleep(edm::Event const& e) const {
62  for(auto const&t: tokens_) {
63  e.getByToken(t,h);
64  }
65  //Event number minimum value is 1
66  usleep( eventTimes_[ (e.id().event()-1) % eventTimes_.size()]);
67  }
68 
69  static void fillDescription(edm::ParameterSetDescription& desc) {
70  desc.add<std::vector<edm::InputTag>>("consumes", {})->setComment("What event int data products to consume");
71  desc.add<std::vector<double>>("eventTimes")->setComment("The time, in seconds, for how long the module should sleep each event. The index to use is based on a modulo of size of the list applied to the Event ID number.");
72  }
73 
74  private:
75  std::vector<edm::EDGetTokenT<int>> tokens_;
76  std::vector<useconds_t> eventTimes_;
77 
78  };
79  }
80 //--------------------------------------------------------------------
81 //
82 // Produces an IntProduct instance.
83 //
85 public:
87  value_(p.getParameter<int>("ivalue")),
88  sleeper_(p, consumesCollector()),
89  token_{produces<int>()}
90  {
91  }
92  void produce(edm::StreamID, edm::Event& e, edm::EventSetup const& c) const override;
93 
94  static void fillDescriptions(edm::ConfigurationDescriptions & descriptions);
95 
96 private:
97  const int value_;
98  Sleeper sleeper_;
100 };
101 
102 void
104  // EventSetup is not used.
105  sleeper_.getAndSleep(e);
106 
107  e.emplace(token_,value_);
108 }
109 
110 void
113 
114  desc.add<int>("ivalue")->setComment("Value to put into Event");
115  Sleeper::fillDescription(desc);
116 
117  descriptions.addDefault(desc);
118 }
119 
120  class OneSleepingProducer : public edm::one::EDProducer<edm::one::SharedResources> {
121  public:
123  value_(p.getParameter<int>("ivalue")),
124  sleeper_(p, consumesCollector()),
125  token_{produces<int>()}
126  {
127  usesResource(p.getParameter<std::string>("resource"));
128  }
129  void produce( edm::Event& e, edm::EventSetup const& c) override;
130 
131  static void fillDescriptions(edm::ConfigurationDescriptions & descriptions);
132 
133  private:
134  const int value_;
135  Sleeper sleeper_;
137  };
138 
139  void
141  // EventSetup is not used.
142  sleeper_.getAndSleep(e);
143 
144  e.emplace(token_,value_);
145  }
146 
147  void
150 
151  desc.add<int>("ivalue")->setComment("Value to put into Event");
152  desc.add<std::string>("resource",std::string())->setComment("The name of the resource that is being shared");
153  Sleeper::fillDescription(desc);
154 
155  descriptions.addDefault(desc);
156  }
157 
159  public:
161  sleeper_(p, consumesCollector())
162  {
163  }
164  void analyze( edm::Event const& e, edm::EventSetup const& c) override;
165 
166  static void fillDescriptions(edm::ConfigurationDescriptions & descriptions);
167 
168  private:
169  Sleeper sleeper_;
170  };
171 
172  void
174  // EventSetup is not used.
175  sleeper_.getAndSleep(e);
176  }
177 
178  void
181 
182  Sleeper::fillDescription(desc);
183 
184  descriptions.addDefault(desc);
185  }
186 
187  /*
188  The SleepingServer is configured to wait to accumulate X events before starting to run.
189  On a call to asyncWork
190  -the data will be added to the streams' slot then the waiting thread will be informed
191  -if the server is waiting on threads
192  - it wakes up and sleeps for 'initTime'
193  - it then checks to see if another event was pushed and if it does it continues to do the sleep loop
194  - once all sleep are done, it checks to see if enough events have contacted it and if so it sleeps for the longest 'workTime' duration given
195  - when done, it sleeps for each event 'finishTime' and when it wakes it sends the callback
196  - when all calledback, it goes back to check if there are waiting events
197  - if there are not enough waiting events, it goes back to waiting on a condition variable
198 
199  The SleepingServer keeps track of the number of active Streams by counting the number of streamBeginLumi and streamEndLumi calls have taken place. If there are insufficient active Lumis compared to the number of events it wants to wait for, the Server thread is told to start processing without further waiting.
200 
201  */
203  public:
205  nWaitingEvents_(iPS.getUntrackedParameter<unsigned int>("nWaitingEvents"))
206  {
207  iAR.watchPreallocate([this](edm::service::SystemBounds const& iBounds) {
208  auto const nStreams =iBounds.maxNumberOfStreams();
209  waitingStreams_.reserve(nStreams);
210  waitTimesPerStream_.resize(nStreams);
211  waitingTaskPerStream_.resize(nStreams);
212  });
213 
214  iAR.watchPreEndJob([this]() {
215  stopProcessing_ = true;
216  condition_.notify_one();
217  serverThread_->join();
218  });
219  iAR.watchPreStreamBeginLumi([this](edm::StreamContext const&) {
220  ++activeStreams_;
221  });
222  iAR.watchPreStreamEndLumi([this](edm::StreamContext const&) {
223  --activeStreams_;
224  condition_.notify_one();
225  });
226 
227  serverThread_ = std::make_unique<std::thread>([this]() { threadWork(); } );
228  }
229 
230  void asyncWork(edm::StreamID id, edm::WaitingTaskWithArenaHolder iTask, long initTime, long workTime, long finishTime) {
231  waitTimesPerStream_[id.value()]={{initTime,workTime,finishTime}};
232  waitingTaskPerStream_[id.value()]=std::move(iTask);
233  {
234  std::lock_guard<std::mutex> lk{mutex_};
235  waitingStreams_.push_back(id.value());
236  }
237  condition_.notify_one();
238  }
239 
240  private:
242  if(stopProcessing_) {
243  return true;
244  }
245  if(waitingStreams_.size() >= nWaitingEvents_) {
246  return true;
247  }
248  //every running stream is now waiting
249  return waitingStreams_.size() == activeStreams_;
250  }
251 
252  void threadWork() {
253  while(not stopProcessing_.load()) {
254  std::vector<int> streamsToProcess;
255  {
256  std::unique_lock<std::mutex> lk(mutex_);
257  condition_.wait(lk, [this]() {
258  return readyToDoSomething();
259  });
260  swap(streamsToProcess,waitingStreams_);
261  }
262  if(stopProcessing_) {
263  break;
264  }
265  long longestTime = 0;
266  //simulate filling the external device
267  for(auto i: streamsToProcess) {
268  auto const& v=waitTimesPerStream_[i];
269  if(v[1]>longestTime) {
270  longestTime = v[1];
271  }
272  usleep(v[0]);
273  }
274  //simulate running external device
275  usleep(longestTime);
276 
277  //simulate copying data back
278  for(auto i: streamsToProcess) {
279  auto const& v=waitTimesPerStream_[i];
280  usleep(v[2]);
281  waitingTaskPerStream_[i].doneWaiting(std::exception_ptr());
282  }
283  }
284  waitingTaskPerStream_.clear();
285  }
286  const unsigned int nWaitingEvents_;
287  std::unique_ptr<std::thread> serverThread_;
288  std::vector<int> waitingStreams_;
289  std::vector<std::array<long,3>> waitTimesPerStream_;
290  std::vector<edm::WaitingTaskWithArenaHolder> waitingTaskPerStream_;
292  std::condition_variable condition_;
293  std::atomic<unsigned int> activeStreams_{0};
294  std::atomic<bool> stopProcessing_{false};
295  };
296 
297  class ExternalWorkSleepingProducer : public edm::global::EDProducer<edm::ExternalWork> {
298  public:
300  value_(p.getParameter<int>("ivalue")),
301  sleeper_(p, consumesCollector()),
302  token_{produces<int>()}
303  {
304  {
305  auto const& tv = p.getParameter<std::vector<double>>("serviceInitTimes");
306  initTimes_.reserve(tv.size());
307  for(auto t: tv) {
308  initTimes_.push_back( static_cast<useconds_t>(t*1E6));
309  }
310  }
311  {
312  auto const& tv = p.getParameter<std::vector<double>>("serviceWorkTimes");
313  workTimes_.reserve(tv.size());
314  for(auto t: tv) {
315  workTimes_.push_back( static_cast<useconds_t>(t*1E6));
316  }
317  }
318  {
319  auto const& tv = p.getParameter<std::vector<double>>("serviceFinishTimes");
320  finishTimes_.reserve(tv.size());
321  for(auto t: tv) {
322  finishTimes_.push_back( static_cast<useconds_t>(t*1E6));
323  }
324  }
325  assert(finishTimes_.size() == initTimes_.size());
326  assert(workTimes_.size() == initTimes_.size());
327  }
328  void acquire(edm::StreamID, edm::Event const & e, edm::EventSetup const& c, edm::WaitingTaskWithArenaHolder holder) const override;
329 
330  void produce(edm::StreamID, edm::Event& e, edm::EventSetup const& c) const override;
331 
332  static void fillDescriptions(edm::ConfigurationDescriptions & descriptions);
333 
334  private:
335  std::vector<long> initTimes_;
336  std::vector<long> workTimes_;
337  std::vector<long> finishTimes_;
338  const int value_;
339  Sleeper sleeper_;
341  };
342 
343  void
345  // EventSetup is not used.
346  sleeper_.getAndSleep(e);
348  auto index = (e.id().event()-1) % initTimes_.size();
349  server->asyncWork(id, std::move(holder), initTimes_[index], workTimes_[index], finishTimes_[index]);
350  }
351 
352  void
354  e.emplace(token_,value_);
355  }
356 
357  void
360 
361  desc.add<int>("ivalue")->setComment("Value to put into Event");
362  desc.add<std::vector<double>>("serviceInitTimes");
363  desc.add<std::vector<double>>("serviceWorkTimes");
364  desc.add<std::vector<double>>("serviceFinishTimes");
365  Sleeper::fillDescription(desc);
366 
367  descriptions.addDefault(desc);
368  }
369 
370 }
376 
T getParameter(std::string const &) const
EventNumber_t event() const
Definition: EventID.h:41
static boost::mutex mutex
Definition: Proxy.cc:11
SleepingServer(edm::ParameterSet const &iPS, edm::ActivityRegistry &iAR)
void watchPreallocate(Preallocate::slot_type const &iSlot)
ExternalWorkSleepingProducer(edm::ParameterSet const &p)
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
OneSleepingAnalyzer(edm::ParameterSet const &p)
std::vector< edm::WaitingTaskWithArenaHolder > waitingTaskPerStream_
bool getByToken(EDGetToken token, Handle< PROD > &result) const
Definition: Event.h:579
#define DEFINE_FWK_MODULE(type)
Definition: MakerMacros.h:17
void produce(edm::StreamID, edm::Event &e, edm::EventSetup const &c) const override
cv
Definition: cuy.py:364
void watchPreStreamEndLumi(PreStreamEndLumi::slot_type const &iSlot)
std::vector< useconds_t > eventTimes_
void swap(Association< C > &lhs, Association< C > &rhs)
Definition: Association.h:116
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void produce(edm::Event &e, edm::EventSetup const &c) override
unsigned int maxNumberOfStreams() const
Definition: SystemBounds.h:43
void addDefault(ParameterSetDescription const &psetDescription)
SleepingProducer(edm::ParameterSet const &p)
void acquire(edm::StreamID, edm::Event const &e, edm::EventSetup const &c, edm::WaitingTaskWithArenaHolder holder) const override
std::vector< std::array< long, 3 > > waitTimesPerStream_
std::unique_ptr< std::thread > serverThread_
const edm::EDPutTokenT< int > token_
const edm::EDPutTokenT< int > token_
ParameterDescriptionBase * add(U const &iLabel, T const &value)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
const unsigned int nWaitingEvents_
std::vector< edm::EDGetTokenT< int > > tokens_
#define DEFINE_FWK_SERVICE(type)
Definition: ServiceMaker.h:113
std::condition_variable condition_
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
OrphanHandle< PROD > emplace(EDPutTokenT< PROD > token, Args &&...args)
puts a new product
Definition: Event.h:453
OneSleepingProducer(edm::ParameterSet const &p)
void watchPreStreamBeginLumi(PreStreamBeginLumi::slot_type const &iSlot)
void watchPreEndJob(PreEndJob::slot_type const &iSlot)
std::vector< int > waitingStreams_
edm::EventID id() const
Definition: EventBase.h:60
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void produce(edm::StreamID, edm::Event &e, edm::EventSetup const &c) const override
void analyze(edm::Event const &e, edm::EventSetup const &c) override
const edm::EDPutTokenT< int > token_
virtual example_stream void analyze(const edm::Event &, const edm::EventSetup &) override
def move(src, dest)
Definition: eostools.py:511
void asyncWork(edm::StreamID id, edm::WaitingTaskWithArenaHolder iTask, long initTime, long workTime, long finishTime)