CMS 3D CMS Logo

List of all members | Public Member Functions | Private Member Functions | Private Attributes
timestudy::SleepingServer Class Reference

Public Member Functions

void asyncWork (edm::StreamID id, edm::WaitingTaskWithArenaHolder iTask, long initTime, long workTime, long finishTime)
 
 SleepingServer (edm::ParameterSet const &iPS, edm::ActivityRegistry &iAR)
 

Private Member Functions

bool readyToDoSomething ()
 
void threadWork ()
 

Private Attributes

std::atomic< unsigned int > activeStreams_ {0}
 
std::condition_variable condition_
 
std::mutex mutex_
 
const unsigned int nWaitingEvents_
 
std::unique_ptr< std::thread > serverThread_
 
std::atomic< bool > stopProcessing_ {false}
 
std::vector< int > waitingStreams_
 
std::vector< edm::WaitingTaskWithArenaHolderwaitingTaskPerStream_
 
std::vector< std::array< long, 3 > > waitTimesPerStream_
 

Detailed Description

Definition at line 200 of file TimeStudyModules.cc.

Constructor & Destructor Documentation

timestudy::SleepingServer::SleepingServer ( edm::ParameterSet const &  iPS,
edm::ActivityRegistry iAR 
)
inline

Definition at line 202 of file TimeStudyModules.cc.

References edm::service::SystemBounds::maxNumberOfStreams(), edm::ActivityRegistry::watchPreallocate(), edm::ActivityRegistry::watchPreEndJob(), edm::ActivityRegistry::watchPreStreamBeginLumi(), and edm::ActivityRegistry::watchPreStreamEndLumi().

202  :
203  nWaitingEvents_(iPS.getUntrackedParameter<unsigned int>("nWaitingEvents"))
204  {
205  iAR.watchPreallocate([this](edm::service::SystemBounds const& iBounds) {
206  auto const nStreams =iBounds.maxNumberOfStreams();
207  waitingStreams_.reserve(nStreams);
208  waitTimesPerStream_.resize(nStreams);
209  waitingTaskPerStream_.resize(nStreams);
210  });
211 
212  iAR.watchPreEndJob([this]() {
213  stopProcessing_ = true;
214  condition_.notify_one();
215  serverThread_->join();
216  });
217  iAR.watchPreStreamBeginLumi([this](edm::StreamContext const&) {
218  ++activeStreams_;
219  });
220  iAR.watchPreStreamEndLumi([this](edm::StreamContext const&) {
221  --activeStreams_;
222  condition_.notify_one();
223  });
224 
225  serverThread_ = std::make_unique<std::thread>([this]() { threadWork(); } );
226  }
void watchPreallocate(Preallocate::slot_type const &iSlot)
std::atomic< unsigned int > activeStreams_
std::vector< edm::WaitingTaskWithArenaHolder > waitingTaskPerStream_
void watchPreStreamEndLumi(PreStreamEndLumi::slot_type const &iSlot)
unsigned int maxNumberOfStreams() const
Definition: SystemBounds.h:43
std::vector< std::array< long, 3 > > waitTimesPerStream_
std::unique_ptr< std::thread > serverThread_
const unsigned int nWaitingEvents_
std::condition_variable condition_
void watchPreStreamBeginLumi(PreStreamBeginLumi::slot_type const &iSlot)
void watchPreEndJob(PreEndJob::slot_type const &iSlot)
std::vector< int > waitingStreams_
std::atomic< bool > stopProcessing_

Member Function Documentation

void timestudy::SleepingServer::asyncWork ( edm::StreamID  id,
edm::WaitingTaskWithArenaHolder  iTask,
long  initTime,
long  workTime,
long  finishTime 
)
inline

Definition at line 228 of file TimeStudyModules.cc.

References eostools::move(), and relativeConstraints::value.

228  {
229  waitTimesPerStream_[id.value()]={{initTime,workTime,finishTime}};
230  waitingTaskPerStream_[id.value()]=std::move(iTask);
231  {
232  std::lock_guard<std::mutex> lk{mutex_};
233  waitingStreams_.push_back(id.value());
234  }
235  condition_.notify_one();
236  }
std::vector< edm::WaitingTaskWithArenaHolder > waitingTaskPerStream_
std::vector< std::array< long, 3 > > waitTimesPerStream_
std::condition_variable condition_
std::vector< int > waitingStreams_
def move(src, dest)
Definition: eostools.py:510
bool timestudy::SleepingServer::readyToDoSomething ( )
inlineprivate

Definition at line 239 of file TimeStudyModules.cc.

239  {
240  if(stopProcessing_) {
241  return true;
242  }
243  if(waitingStreams_.size() >= nWaitingEvents_) {
244  return true;
245  }
246  //every running stream is now waiting
247  return waitingStreams_.size() == activeStreams_;
248  }
std::atomic< unsigned int > activeStreams_
const unsigned int nWaitingEvents_
std::vector< int > waitingStreams_
std::atomic< bool > stopProcessing_
void timestudy::SleepingServer::threadWork ( )
inlineprivate

Definition at line 250 of file TimeStudyModules.cc.

References mps_fire::i, edm::swap(), and findQualityFiles::v.

250  {
251  while(not stopProcessing_.load()) {
252  std::vector<int> streamsToProcess;
253  {
254  std::unique_lock<std::mutex> lk(mutex_);
255  condition_.wait(lk, [this]() {
256  return readyToDoSomething();
257  });
258  swap(streamsToProcess,waitingStreams_);
259  }
260  if(stopProcessing_) {
261  break;
262  }
263  long longestTime = 0;
264  //simulate filling the external device
265  for(auto i: streamsToProcess) {
266  auto const& v=waitTimesPerStream_[i];
267  if(v[1]>longestTime) {
268  longestTime = v[1];
269  }
270  usleep(v[0]);
271  }
272  //simulate running external device
273  usleep(longestTime);
274 
275  //simulate copying data back
276  for(auto i: streamsToProcess) {
277  auto const& v=waitTimesPerStream_[i];
278  usleep(v[2]);
279  waitingTaskPerStream_[i].doneWaiting(std::exception_ptr());
280  }
281  }
282  waitingTaskPerStream_.clear();
283  }
std::vector< edm::WaitingTaskWithArenaHolder > waitingTaskPerStream_
void swap(Association< C > &lhs, Association< C > &rhs)
Definition: Association.h:116
std::vector< std::array< long, 3 > > waitTimesPerStream_
std::condition_variable condition_
std::vector< int > waitingStreams_
std::atomic< bool > stopProcessing_

Member Data Documentation

std::atomic<unsigned int> timestudy::SleepingServer::activeStreams_ {0}
private

Definition at line 291 of file TimeStudyModules.cc.

std::condition_variable timestudy::SleepingServer::condition_
private

Definition at line 290 of file TimeStudyModules.cc.

std::mutex timestudy::SleepingServer::mutex_
private

Definition at line 289 of file TimeStudyModules.cc.

const unsigned int timestudy::SleepingServer::nWaitingEvents_
private

Definition at line 284 of file TimeStudyModules.cc.

std::unique_ptr<std::thread> timestudy::SleepingServer::serverThread_
private

Definition at line 285 of file TimeStudyModules.cc.

std::atomic<bool> timestudy::SleepingServer::stopProcessing_ {false}
private

Definition at line 292 of file TimeStudyModules.cc.

std::vector<int> timestudy::SleepingServer::waitingStreams_
private

Definition at line 286 of file TimeStudyModules.cc.

std::vector<edm::WaitingTaskWithArenaHolder> timestudy::SleepingServer::waitingTaskPerStream_
private

Definition at line 288 of file TimeStudyModules.cc.

std::vector<std::array<long,3> > timestudy::SleepingServer::waitTimesPerStream_
private

Definition at line 287 of file TimeStudyModules.cc.