CMS 3D CMS Logo

List of all members | Public Member Functions | Private Member Functions | Private Attributes
timestudy::SleepingServer Class Reference

Public Member Functions

void asyncWork (edm::StreamID id, edm::WaitingTaskWithArenaHolder iTask, long initTime, long workTime, long finishTime)
 
 SleepingServer (edm::ParameterSet const &iPS, edm::ActivityRegistry &iAR)
 

Private Member Functions

bool readyToDoSomething ()
 
void threadWork ()
 

Private Attributes

std::atomic< unsigned int > activeStreams_ {0}
 
std::condition_variable condition_
 
std::mutex mutex_
 
const unsigned int nWaitingEvents_
 
std::unique_ptr< std::thread > serverThread_
 
std::atomic< bool > stopProcessing_ {false}
 
std::vector< int > waitingStreams_
 
std::vector< edm::WaitingTaskWithArenaHolderwaitingTaskPerStream_
 
std::vector< std::array< long, 3 > > waitTimesPerStream_
 

Detailed Description

Definition at line 185 of file TimeStudyModules.cc.

Constructor & Destructor Documentation

◆ SleepingServer()

timestudy::SleepingServer::SleepingServer ( edm::ParameterSet const &  iPS,
edm::ActivityRegistry iAR 
)
inline

Definition at line 187 of file TimeStudyModules.cc.

188  : nWaitingEvents_(iPS.getUntrackedParameter<unsigned int>("nWaitingEvents")) {
189  iAR.watchPreallocate([this](edm::service::SystemBounds const& iBounds) {
190  auto const nStreams = iBounds.maxNumberOfStreams();
191  waitingStreams_.reserve(nStreams);
192  waitTimesPerStream_.resize(nStreams);
193  waitingTaskPerStream_.resize(nStreams);
194  });
195 
196  iAR.watchPreEndJob([this]() {
197  stopProcessing_ = true;
198  condition_.notify_one();
199  serverThread_->join();
200  });
201  iAR.watchPreStreamBeginLumi([this](edm::StreamContext const&) { ++activeStreams_; });
202  iAR.watchPreStreamEndLumi([this](edm::StreamContext const&) {
203  --activeStreams_;
204  condition_.notify_one();
205  });
206 
207  serverThread_ = std::make_unique<std::thread>([this]() { threadWork(); });
208  }

References activeStreams_, condition_, edm::service::SystemBounds::maxNumberOfStreams(), serverThread_, stopProcessing_, threadWork(), waitingStreams_, waitingTaskPerStream_, waitTimesPerStream_, edm::ActivityRegistry::watchPreallocate(), edm::ActivityRegistry::watchPreEndJob(), edm::ActivityRegistry::watchPreStreamBeginLumi(), and edm::ActivityRegistry::watchPreStreamEndLumi().

Member Function Documentation

◆ asyncWork()

void timestudy::SleepingServer::asyncWork ( edm::StreamID  id,
edm::WaitingTaskWithArenaHolder  iTask,
long  initTime,
long  workTime,
long  finishTime 
)
inline

Definition at line 210 of file TimeStudyModules.cc.

211  {
212  waitTimesPerStream_[id.value()] = {{initTime, workTime, finishTime}};
213  waitingTaskPerStream_[id.value()] = std::move(iTask);
214  {
215  std::lock_guard<std::mutex> lk{mutex_};
216  waitingStreams_.push_back(id.value());
217  }
218  condition_.notify_one();
219  }

References condition_, eostools::move(), mutex_, relativeConstraints::value, waitingStreams_, waitingTaskPerStream_, and waitTimesPerStream_.

◆ readyToDoSomething()

bool timestudy::SleepingServer::readyToDoSomething ( )
inlineprivate

Definition at line 222 of file TimeStudyModules.cc.

222  {
223  if (stopProcessing_) {
224  return true;
225  }
226  if (waitingStreams_.size() >= nWaitingEvents_) {
227  return true;
228  }
229  //every running stream is now waiting
230  return waitingStreams_.size() == activeStreams_;
231  }

References activeStreams_, nWaitingEvents_, stopProcessing_, and waitingStreams_.

Referenced by threadWork().

◆ threadWork()

void timestudy::SleepingServer::threadWork ( )
inlineprivate

Definition at line 233 of file TimeStudyModules.cc.

233  {
234  while (not stopProcessing_.load()) {
235  std::vector<int> streamsToProcess;
236  {
237  std::unique_lock<std::mutex> lk(mutex_);
238  condition_.wait(lk, [this]() { return readyToDoSomething(); });
239  swap(streamsToProcess, waitingStreams_);
240  }
241  if (stopProcessing_) {
242  break;
243  }
244  long longestTime = 0;
245  //simulate filling the external device
246  for (auto i : streamsToProcess) {
247  auto const& v = waitTimesPerStream_[i];
248  if (v[1] > longestTime) {
249  longestTime = v[1];
250  }
251  usleep(v[0]);
252  }
253  //simulate running external device
254  usleep(longestTime);
255 
256  //simulate copying data back
257  for (auto i : streamsToProcess) {
258  auto const& v = waitTimesPerStream_[i];
259  usleep(v[2]);
260  waitingTaskPerStream_[i].doneWaiting(std::exception_ptr());
261  }
262  }
263  waitingTaskPerStream_.clear();
264  }

References condition_, mps_fire::i, mutex_, readyToDoSomething(), stopProcessing_, edm::swap(), findQualityFiles::v, waitingStreams_, waitingTaskPerStream_, and waitTimesPerStream_.

Referenced by SleepingServer().

Member Data Documentation

◆ activeStreams_

std::atomic<unsigned int> timestudy::SleepingServer::activeStreams_ {0}
private

Definition at line 272 of file TimeStudyModules.cc.

Referenced by readyToDoSomething(), and SleepingServer().

◆ condition_

std::condition_variable timestudy::SleepingServer::condition_
private

◆ mutex_

std::mutex timestudy::SleepingServer::mutex_
private

Definition at line 270 of file TimeStudyModules.cc.

Referenced by asyncWork(), and threadWork().

◆ nWaitingEvents_

const unsigned int timestudy::SleepingServer::nWaitingEvents_
private

Definition at line 265 of file TimeStudyModules.cc.

Referenced by readyToDoSomething().

◆ serverThread_

std::unique_ptr<std::thread> timestudy::SleepingServer::serverThread_
private

Definition at line 266 of file TimeStudyModules.cc.

Referenced by SleepingServer().

◆ stopProcessing_

std::atomic<bool> timestudy::SleepingServer::stopProcessing_ {false}
private

Definition at line 273 of file TimeStudyModules.cc.

Referenced by readyToDoSomething(), SleepingServer(), and threadWork().

◆ waitingStreams_

std::vector<int> timestudy::SleepingServer::waitingStreams_
private

Definition at line 267 of file TimeStudyModules.cc.

Referenced by asyncWork(), readyToDoSomething(), SleepingServer(), and threadWork().

◆ waitingTaskPerStream_

std::vector<edm::WaitingTaskWithArenaHolder> timestudy::SleepingServer::waitingTaskPerStream_
private

Definition at line 269 of file TimeStudyModules.cc.

Referenced by asyncWork(), SleepingServer(), and threadWork().

◆ waitTimesPerStream_

std::vector<std::array<long, 3> > timestudy::SleepingServer::waitTimesPerStream_
private

Definition at line 268 of file TimeStudyModules.cc.

Referenced by asyncWork(), SleepingServer(), and threadWork().

timestudy::SleepingServer::readyToDoSomething
bool readyToDoSomething()
Definition: TimeStudyModules.cc:222
timestudy::SleepingServer::serverThread_
std::unique_ptr< std::thread > serverThread_
Definition: TimeStudyModules.cc:266
mps_fire.i
i
Definition: mps_fire.py:355
timestudy::SleepingServer::stopProcessing_
std::atomic< bool > stopProcessing_
Definition: TimeStudyModules.cc:273
edm::swap
void swap(Association< C > &lhs, Association< C > &rhs)
Definition: Association.h:117
timestudy::SleepingServer::threadWork
void threadWork()
Definition: TimeStudyModules.cc:233
findQualityFiles.v
v
Definition: findQualityFiles.py:179
edm::ActivityRegistry::watchPreStreamBeginLumi
void watchPreStreamBeginLumi(PreStreamBeginLumi::slot_type const &iSlot)
Definition: ActivityRegistry.h:373
edm::StreamContext
Definition: StreamContext.h:31
timestudy::SleepingServer::waitingStreams_
std::vector< int > waitingStreams_
Definition: TimeStudyModules.cc:267
edm::ActivityRegistry::watchPreEndJob
void watchPreEndJob(PreEndJob::slot_type const &iSlot)
Definition: ActivityRegistry.h:162
edm::service::SystemBounds
Definition: SystemBounds.h:29
timestudy::SleepingServer::mutex_
std::mutex mutex_
Definition: TimeStudyModules.cc:270
edm::ActivityRegistry::watchPreStreamEndLumi
void watchPreStreamEndLumi(PreStreamEndLumi::slot_type const &iSlot)
Definition: ActivityRegistry.h:387
timestudy::SleepingServer::waitTimesPerStream_
std::vector< std::array< long, 3 > > waitTimesPerStream_
Definition: TimeStudyModules.cc:268
timestudy::SleepingServer::activeStreams_
std::atomic< unsigned int > activeStreams_
Definition: TimeStudyModules.cc:272
edm::service::SystemBounds::maxNumberOfStreams
unsigned int maxNumberOfStreams() const
Definition: SystemBounds.h:35
edm::ActivityRegistry::watchPreallocate
void watchPreallocate(Preallocate::slot_type const &iSlot)
Definition: ActivityRegistry.h:142
eostools.move
def move(src, dest)
Definition: eostools.py:511
relativeConstraints.value
value
Definition: relativeConstraints.py:53
timestudy::SleepingServer::nWaitingEvents_
const unsigned int nWaitingEvents_
Definition: TimeStudyModules.cc:265
timestudy::SleepingServer::waitingTaskPerStream_
std::vector< edm::WaitingTaskWithArenaHolder > waitingTaskPerStream_
Definition: TimeStudyModules.cc:269
timestudy::SleepingServer::condition_
std::condition_variable condition_
Definition: TimeStudyModules.cc:271