CMS 3D CMS Logo

TimeStudyModules.cc
Go to the documentation of this file.
1 // -*- C++ -*-
2 //
3 // Package: FWCore/Modules
4 // Class : TimeStudyModules
5 //
6 // Implementation:
7 // [Notes on implementation]
8 //
9 // Original Author: Chris Jones
10 // Created: Thu, 22 Mar 2018 16:23:48 GMT
11 //
12 
13 // system include files
14 #include <unistd.h>
15 #include <vector>
16 #include <thread>
17 #include <atomic>
18 #include <condition_variable>
19 #include <mutex>
20 
21 // user include files
27 
34 
36 
40 
41 namespace timestudy {
42  namespace {
43  struct Sleeper {
44  Sleeper(edm::ParameterSet const& p, edm::ConsumesCollector&& iCol) {
45  auto const& cv = p.getParameter<std::vector<edm::InputTag>>("consumes");
46  tokens_.reserve(cv.size());
47  for (auto const& c : cv) {
48  tokens_.emplace_back(iCol.consumes<int>(c));
49  }
50 
51  auto const& tv = p.getParameter<std::vector<double>>("eventTimes");
52  eventTimes_.reserve(tv.size());
53  for (auto t : tv) {
54  eventTimes_.push_back(static_cast<useconds_t>(t * 1E6));
55  }
56  }
57 
58  void getAndSleep(edm::Event const& e) const {
59  for (auto const& t : tokens_) {
60  (void)e.getHandle(t);
61  }
62  //Event number minimum value is 1
63  usleep(eventTimes_[(e.id().event() - 1) % eventTimes_.size()]);
64  }
65 
66  static void fillDescription(edm::ParameterSetDescription& desc) {
67  desc.add<std::vector<edm::InputTag>>("consumes", {})->setComment("What event int data products to consume");
68  desc.add<std::vector<double>>("eventTimes")
69  ->setComment(
70  "The time, in seconds, for how long the module should sleep each event. The index to use is based on a "
71  "modulo of size of the list applied to the Event ID number.");
72  }
73 
74  private:
75  std::vector<edm::EDGetTokenT<int>> tokens_;
76  std::vector<useconds_t> eventTimes_;
77  };
78  } // namespace
79  //--------------------------------------------------------------------
80  //
81  // Produces an IntProduct instance.
82  //
84  public:
86  : value_(p.getParameter<int>("ivalue")), sleeper_(p, consumesCollector()), token_{produces<int>()} {}
87  void produce(edm::StreamID, edm::Event& e, edm::EventSetup const& c) const override;
88 
89  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
90 
91  private:
92  const int value_;
93  Sleeper sleeper_;
95  };
96 
98  // EventSetup is not used.
99  sleeper_.getAndSleep(e);
100 
101  e.emplace(token_, value_);
102  }
103 
106 
107  desc.add<int>("ivalue")->setComment("Value to put into Event");
108  Sleeper::fillDescription(desc);
109 
110  descriptions.addDefault(desc);
111  }
112 
113  class OneSleepingProducer : public edm::one::EDProducer<edm::one::SharedResources> {
114  public:
116  : value_(p.getParameter<int>("ivalue")), sleeper_(p, consumesCollector()), token_{produces<int>()} {
117  usesResource(p.getParameter<std::string>("resource"));
118  }
119  void produce(edm::Event& e, edm::EventSetup const& c) override;
120 
121  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
122 
123  private:
124  const int value_;
125  Sleeper sleeper_;
127  };
128 
130  // EventSetup is not used.
131  sleeper_.getAndSleep(e);
132 
133  e.emplace(token_, value_);
134  }
135 
138 
139  desc.add<int>("ivalue")->setComment("Value to put into Event");
140  desc.add<std::string>("resource", std::string())->setComment("The name of the resource that is being shared");
141  Sleeper::fillDescription(desc);
142 
143  descriptions.addDefault(desc);
144  }
145 
147  public:
149  void analyze(edm::Event const& e, edm::EventSetup const& c) override;
150 
151  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
152 
153  private:
154  Sleeper sleeper_;
155  };
156 
158  // EventSetup is not used.
159  sleeper_.getAndSleep(e);
160  }
161 
164 
165  Sleeper::fillDescription(desc);
166 
167  descriptions.addDefault(desc);
168  }
169 
170  /*
171  The SleepingServer is configured to wait to accumulate X events before starting to run.
172  On a call to asyncWork
173  -the data will be added to the streams' slot then the waiting thread will be informed
174  -if the server is waiting on threads
175  - it wakes up and sleeps for 'initTime'
176  - it then checks to see if another event was pushed and if it does it continues to do the sleep loop
177  - once all sleep are done, it checks to see if enough events have contacted it and if so it sleeps for the longest 'workTime' duration given
178  - when done, it sleeps for each event 'finishTime' and when it wakes it sends the callback
179  - when all calledback, it goes back to check if there are waiting events
180  - if there are not enough waiting events, it goes back to waiting on a condition variable
181 
182  The SleepingServer keeps track of the number of active Streams by counting the number of streamBeginLumi and streamEndLumi calls have taken place. If there are insufficient active Lumis compared to the number of events it wants to wait for, the Server thread is told to start processing without further waiting.
183 
184  */
186  public:
188  : nWaitingEvents_(iPS.getUntrackedParameter<unsigned int>("nWaitingEvents")) {
189  iAR.watchPreallocate([this](edm::service::SystemBounds const& iBounds) {
190  auto const nStreams = iBounds.maxNumberOfStreams();
191  waitingStreams_.reserve(nStreams);
194  });
195 
196  iAR.watchPreEndJob([this]() {
197  stopProcessing_ = true;
198  condition_.notify_one();
199  serverThread_->join();
200  });
201  iAR.watchPreStreamBeginLumi([this](edm::StreamContext const&) { ++activeStreams_; });
202  iAR.watchPreStreamEndLumi([this](edm::StreamContext const&) {
203  --activeStreams_;
204  condition_.notify_one();
205  });
206 
207  serverThread_ = std::make_unique<std::thread>([this]() { threadWork(); });
208  }
209 
210  void asyncWork(
211  edm::StreamID id, edm::WaitingTaskWithArenaHolder iTask, long initTime, long workTime, long finishTime) {
212  waitTimesPerStream_[id.value()] = {{initTime, workTime, finishTime}};
213  waitingTaskPerStream_[id.value()] = std::move(iTask);
214  {
215  std::lock_guard<std::mutex> lk{mutex_};
216  waitingStreams_.push_back(id.value());
217  }
218  condition_.notify_one();
219  }
220 
221  private:
223  if (stopProcessing_) {
224  return true;
225  }
226  if (waitingStreams_.size() >= nWaitingEvents_) {
227  return true;
228  }
229  //every running stream is now waiting
230  return waitingStreams_.size() == activeStreams_;
231  }
232 
233  void threadWork() {
234  while (not stopProcessing_.load()) {
235  std::vector<int> streamsToProcess;
236  {
237  std::unique_lock<std::mutex> lk(mutex_);
238  condition_.wait(lk, [this]() { return readyToDoSomething(); });
239  swap(streamsToProcess, waitingStreams_);
240  }
241  if (stopProcessing_) {
242  break;
243  }
244  long longestTime = 0;
245  //simulate filling the external device
246  for (auto i : streamsToProcess) {
247  auto const& v = waitTimesPerStream_[i];
248  if (v[1] > longestTime) {
249  longestTime = v[1];
250  }
251  usleep(v[0]);
252  }
253  //simulate running external device
254  usleep(longestTime);
255 
256  //simulate copying data back
257  for (auto i : streamsToProcess) {
258  auto const& v = waitTimesPerStream_[i];
259  usleep(v[2]);
260  waitingTaskPerStream_[i].doneWaiting(std::exception_ptr());
261  }
262  }
263  waitingTaskPerStream_.clear();
264  }
265  const unsigned int nWaitingEvents_;
266  std::unique_ptr<std::thread> serverThread_;
267  std::vector<int> waitingStreams_;
268  std::vector<std::array<long, 3>> waitTimesPerStream_;
269  std::vector<edm::WaitingTaskWithArenaHolder> waitingTaskPerStream_;
271  std::condition_variable condition_;
272  std::atomic<unsigned int> activeStreams_{0};
273  std::atomic<bool> stopProcessing_{false};
274  };
275 
276  class ExternalWorkSleepingProducer : public edm::global::EDProducer<edm::ExternalWork> {
277  public:
279  : value_(p.getParameter<int>("ivalue")), sleeper_(p, consumesCollector()), token_{produces<int>()} {
280  {
281  auto const& tv = p.getParameter<std::vector<double>>("serviceInitTimes");
282  initTimes_.reserve(tv.size());
283  for (auto t : tv) {
284  initTimes_.push_back(static_cast<useconds_t>(t * 1E6));
285  }
286  }
287  {
288  auto const& tv = p.getParameter<std::vector<double>>("serviceWorkTimes");
289  workTimes_.reserve(tv.size());
290  for (auto t : tv) {
291  workTimes_.push_back(static_cast<useconds_t>(t * 1E6));
292  }
293  }
294  {
295  auto const& tv = p.getParameter<std::vector<double>>("serviceFinishTimes");
296  finishTimes_.reserve(tv.size());
297  for (auto t : tv) {
298  finishTimes_.push_back(static_cast<useconds_t>(t * 1E6));
299  }
300  }
301  assert(finishTimes_.size() == initTimes_.size());
302  assert(workTimes_.size() == initTimes_.size());
303  }
304  void acquire(edm::StreamID,
305  edm::Event const& e,
306  edm::EventSetup const& c,
307  edm::WaitingTaskWithArenaHolder holder) const override;
308 
309  void produce(edm::StreamID, edm::Event& e, edm::EventSetup const& c) const override;
310 
311  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
312 
313  private:
314  std::vector<long> initTimes_;
315  std::vector<long> workTimes_;
316  std::vector<long> finishTimes_;
317  const int value_;
318  Sleeper sleeper_;
320  };
321 
323  edm::Event const& e,
324  edm::EventSetup const&,
325  edm::WaitingTaskWithArenaHolder holder) const {
326  // EventSetup is not used.
327  sleeper_.getAndSleep(e);
329  auto index = (e.id().event() - 1) % initTimes_.size();
330  server->asyncWork(id, std::move(holder), initTimes_[index], workTimes_[index], finishTimes_[index]);
331  }
332 
334  e.emplace(token_, value_);
335  }
336 
339 
340  desc.add<int>("ivalue")->setComment("Value to put into Event");
341  desc.add<std::vector<double>>("serviceInitTimes");
342  desc.add<std::vector<double>>("serviceWorkTimes");
343  desc.add<std::vector<double>>("serviceFinishTimes");
344  Sleeper::fillDescription(desc);
345 
346  descriptions.addDefault(desc);
347  }
348 
349 } // namespace timestudy
timestudy::SleepingServer::readyToDoSomething
bool readyToDoSomething()
Definition: TimeStudyModules.cc:222
ConfigurationDescriptions.h
timestudy::ExternalWorkSleepingProducer::initTimes_
std::vector< long > initTimes_
Definition: TimeStudyModules.cc:314
edm::StreamID
Definition: StreamID.h:30
timestudy::SleepingServer::serverThread_
std::unique_ptr< std::thread > serverThread_
Definition: TimeStudyModules.cc:266
timestudy::ExternalWorkSleepingProducer::ExternalWorkSleepingProducer
ExternalWorkSleepingProducer(edm::ParameterSet const &p)
Definition: TimeStudyModules.cc:278
EDAnalyzer.h
mps_fire.i
i
Definition: mps_fire.py:428
timestudy::SleepingServer::stopProcessing_
std::atomic< bool > stopProcessing_
Definition: TimeStudyModules.cc:273
EDProducer.h
cuy.cv
cv
Definition: cuy.py:364
timestudy::OneSleepingAnalyzer::analyze
void analyze(edm::Event const &e, edm::EventSetup const &c) override
Definition: TimeStudyModules.cc:157
edm::EDPutTokenT< int >
AlCaHLTBitMon_ParallelJobs.p
p
Definition: AlCaHLTBitMon_ParallelJobs.py:153
edm::swap
void swap(Association< C > &lhs, Association< C > &rhs)
Definition: Association.h:117
timestudy::SleepingServer::threadWork
void threadWork()
Definition: TimeStudyModules.cc:233
edm::ParameterSetDescription
Definition: ParameterSetDescription.h:52
DEFINE_FWK_SERVICE
#define DEFINE_FWK_SERVICE(type)
Definition: ServiceMaker.h:96
cms::cuda::assert
assert(be >=bs)
edm::EDConsumerBase::consumesCollector
ConsumesCollector consumesCollector()
Use a ConsumesCollector to gather consumes information from helper functions.
Definition: EDConsumerBase.cc:47
edm::one::EDProducer
Definition: EDProducer.h:30
edm::one::EDAnalyzer
Definition: EDAnalyzer.h:30
findQualityFiles.v
v
Definition: findQualityFiles.py:179
timestudy::OneSleepingProducer::fillDescriptions
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
Definition: TimeStudyModules.cc:136
timestudy::ExternalWorkSleepingProducer::fillDescriptions
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
Definition: TimeStudyModules.cc:337
runTheMatrix.nStreams
nStreams
Definition: runTheMatrix.py:371
timestudy::OneSleepingProducer::value_
const int value_
Definition: TimeStudyModules.cc:124
timestudy::OneSleepingProducer
Definition: TimeStudyModules.cc:113
edm::WaitingTaskWithArenaHolder
Definition: WaitingTaskWithArenaHolder.h:34
MakerMacros.h
timestudy::SleepingProducer::value_
const int value_
Definition: TimeStudyModules.cc:92
edm::ActivityRegistry::watchPreStreamBeginLumi
void watchPreStreamBeginLumi(PreStreamBeginLumi::slot_type const &iSlot)
Definition: ActivityRegistry.h:431
DEFINE_FWK_MODULE
#define DEFINE_FWK_MODULE(type)
Definition: MakerMacros.h:16
edm::StreamContext
Definition: StreamContext.h:31
Service.h
edm::ActivityRegistry
Definition: ActivityRegistry.h:134
tokens_
std::vector< edm::EDGetTokenT< int > > tokens_
Definition: TimeStudyModules.cc:75
ParameterSetDescription.h
timestudy::SleepingServer::waitingStreams_
std::vector< int > waitingStreams_
Definition: TimeStudyModules.cc:267
EDGetToken.h
edm::ActivityRegistry::watchPreEndJob
void watchPreEndJob(PreEndJob::slot_type const &iSlot)
Definition: ActivityRegistry.h:164
timestudy::SleepingProducer::token_
const edm::EDPutTokenT< int > token_
Definition: TimeStudyModules.cc:94
timestudy::SleepingServer::asyncWork
void asyncWork(edm::StreamID id, edm::WaitingTaskWithArenaHolder iTask, long initTime, long workTime, long finishTime)
Definition: TimeStudyModules.cc:210
ServiceMaker.h
edm::global::EDProducer
Definition: EDProducer.h:32
timestudy::OneSleepingAnalyzer::fillDescriptions
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
Definition: TimeStudyModules.cc:162
edm::ConfigurationDescriptions
Definition: ConfigurationDescriptions.h:28
timestudy::SleepingProducer::sleeper_
Sleeper sleeper_
Definition: TimeStudyModules.cc:93
AlCaHLTBitMon_QueryRunRegistry.string
string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
timestudy::SleepingProducer::SleepingProducer
SleepingProducer(edm::ParameterSet const &p)
Definition: TimeStudyModules.cc:85
edm::service::SystemBounds
Definition: SystemBounds.h:29
edm::ParameterSet
Definition: ParameterSet.h:47
Event.h
timestudy::SleepingServer::mutex_
std::mutex mutex_
Definition: TimeStudyModules.cc:270
contentValuesFiles.server
server
Definition: contentValuesFiles.py:37
EDPutToken.h
timestudy::ExternalWorkSleepingProducer::value_
const int value_
Definition: TimeStudyModules.cc:317
edm::ActivityRegistry::watchPreStreamEndLumi
void watchPreStreamEndLumi(PreStreamEndLumi::slot_type const &iSlot)
Definition: ActivityRegistry.h:445
timestudy::SleepingServer::waitTimesPerStream_
std::vector< std::array< long, 3 > > waitTimesPerStream_
Definition: TimeStudyModules.cc:268
timestudy::SleepingServer::activeStreams_
std::atomic< unsigned int > activeStreams_
Definition: TimeStudyModules.cc:272
timestudy::OneSleepingAnalyzer
Definition: TimeStudyModules.cc:146
timestudy::ExternalWorkSleepingProducer::acquire
void acquire(edm::StreamID, edm::Event const &e, edm::EventSetup const &c, edm::WaitingTaskWithArenaHolder holder) const override
Definition: TimeStudyModules.cc:322
edm::Service
Definition: Service.h:30
createfilelist.int
int
Definition: createfilelist.py:10
timestudy::SleepingProducer::fillDescriptions
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
Definition: TimeStudyModules.cc:104
mutex
static std::mutex mutex
Definition: Proxy.cc:8
timestudy::SleepingProducer
Definition: TimeStudyModules.cc:83
timestudy::OneSleepingAnalyzer::sleeper_
Sleeper sleeper_
Definition: TimeStudyModules.cc:154
edm::service::SystemBounds::maxNumberOfStreams
unsigned int maxNumberOfStreams() const
Definition: SystemBounds.h:35
timestudy::OneSleepingProducer::OneSleepingProducer
OneSleepingProducer(edm::ParameterSet const &p)
Definition: TimeStudyModules.cc:115
edm::EventSetup
Definition: EventSetup.h:58
eventTimes_
std::vector< useconds_t > eventTimes_
Definition: TimeStudyModules.cc:76
timestudy::OneSleepingAnalyzer::OneSleepingAnalyzer
OneSleepingAnalyzer(edm::ParameterSet const &p)
Definition: TimeStudyModules.cc:148
InputTag.h
timestudy::SleepingProducer::produce
void produce(edm::StreamID, edm::Event &e, edm::EventSetup const &c) const override
Definition: TimeStudyModules.cc:97
timestudy::ExternalWorkSleepingProducer
Definition: TimeStudyModules.cc:276
edm::ActivityRegistry::watchPreallocate
void watchPreallocate(Preallocate::slot_type const &iSlot)
Definition: ActivityRegistry.h:144
submitPVResolutionJobs.desc
string desc
Definition: submitPVResolutionJobs.py:251
timestudy::OneSleepingProducer::token_
const edm::EDPutTokenT< int > token_
Definition: TimeStudyModules.cc:126
eostools.move
def move(src, dest)
Definition: eostools.py:511
timestudy::OneSleepingProducer::sleeper_
Sleeper sleeper_
Definition: TimeStudyModules.cc:125
relativeConstraints.value
value
Definition: relativeConstraints.py:53
timestudy::ExternalWorkSleepingProducer::finishTimes_
std::vector< long > finishTimes_
Definition: TimeStudyModules.cc:316
AlignmentPI::index
index
Definition: AlignmentPayloadInspectorHelper.h:46
funct::void
TEMPL(T2) struct Divides void
Definition: Factorize.h:24
timestudy::SleepingServer::nWaitingEvents_
const unsigned int nWaitingEvents_
Definition: TimeStudyModules.cc:265
timestudy::ExternalWorkSleepingProducer::workTimes_
std::vector< long > workTimes_
Definition: TimeStudyModules.cc:315
timestudy::SleepingServer::waitingTaskPerStream_
std::vector< edm::WaitingTaskWithArenaHolder > waitingTaskPerStream_
Definition: TimeStudyModules.cc:269
timestudy::OneSleepingProducer::produce
void produce(edm::Event &e, edm::EventSetup const &c) override
Definition: TimeStudyModules.cc:129
ConsumesCollector.h
ParameterSet.h
timestudy
Definition: TimeStudyModules.cc:41
timestudy::SleepingServer
Definition: TimeStudyModules.cc:185
EDProducer.h
c
auto & c
Definition: CAHitNtupletGeneratorKernelsImpl.h:46
timestudy::SleepingServer::SleepingServer
SleepingServer(edm::ParameterSet const &iPS, edm::ActivityRegistry &iAR)
Definition: TimeStudyModules.cc:187
edm::Event
Definition: Event.h:73
submitPVValidationJobs.t
string t
Definition: submitPVValidationJobs.py:644
edm::ConfigurationDescriptions::addDefault
void addDefault(ParameterSetDescription const &psetDescription)
Definition: ConfigurationDescriptions.cc:99
SystemBounds.h
edm::ConsumesCollector
Definition: ConsumesCollector.h:45
timestudy::ExternalWorkSleepingProducer::produce
void produce(edm::StreamID, edm::Event &e, edm::EventSetup const &c) const override
Definition: TimeStudyModules.cc:333
timestudy::SleepingServer::condition_
std::condition_variable condition_
Definition: TimeStudyModules.cc:271
timestudy::ExternalWorkSleepingProducer::sleeper_
Sleeper sleeper_
Definition: TimeStudyModules.cc:318
timestudy::ExternalWorkSleepingProducer::token_
const edm::EDPutTokenT< int > token_
Definition: TimeStudyModules.cc:319
MillePedeFileConverter_cfg.e
e
Definition: MillePedeFileConverter_cfg.py:37