CMS 3D CMS Logo

List of all members | Public Member Functions | Private Member Functions | Private Attributes
timestudy::SleepingServer Class Reference

Public Member Functions

void asyncWork (edm::StreamID id, edm::WaitingTaskWithArenaHolder iTask, long initTime, long workTime, long finishTime)
 
 SleepingServer (edm::ParameterSet const &iPS, edm::ActivityRegistry &iAR)
 

Private Member Functions

bool readyToDoSomething ()
 
void threadWork ()
 

Private Attributes

std::atomic< unsigned int > activeStreams_ {0}
 
std::condition_variable condition_
 
std::mutex mutex_
 
const unsigned int nWaitingEvents_
 
std::unique_ptr< std::thread > serverThread_
 
std::atomic< bool > stopProcessing_ {false}
 
std::vector< int > waitingStreams_
 
std::vector< edm::WaitingTaskWithArenaHolderwaitingTaskPerStream_
 
std::vector< std::array< long, 3 > > waitTimesPerStream_
 

Detailed Description

Definition at line 202 of file TimeStudyModules.cc.

Constructor & Destructor Documentation

timestudy::SleepingServer::SleepingServer ( edm::ParameterSet const &  iPS,
edm::ActivityRegistry iAR 
)
inline

Definition at line 204 of file TimeStudyModules.cc.

References edm::service::SystemBounds::maxNumberOfStreams(), edm::ActivityRegistry::watchPreallocate(), edm::ActivityRegistry::watchPreEndJob(), edm::ActivityRegistry::watchPreStreamBeginLumi(), and edm::ActivityRegistry::watchPreStreamEndLumi().

204  :
205  nWaitingEvents_(iPS.getUntrackedParameter<unsigned int>("nWaitingEvents"))
206  {
207  iAR.watchPreallocate([this](edm::service::SystemBounds const& iBounds) {
208  auto const nStreams =iBounds.maxNumberOfStreams();
209  waitingStreams_.reserve(nStreams);
210  waitTimesPerStream_.resize(nStreams);
211  waitingTaskPerStream_.resize(nStreams);
212  });
213 
214  iAR.watchPreEndJob([this]() {
215  stopProcessing_ = true;
216  condition_.notify_one();
217  serverThread_->join();
218  });
219  iAR.watchPreStreamBeginLumi([this](edm::StreamContext const&) {
220  ++activeStreams_;
221  });
222  iAR.watchPreStreamEndLumi([this](edm::StreamContext const&) {
223  --activeStreams_;
224  condition_.notify_one();
225  });
226 
227  serverThread_ = std::make_unique<std::thread>([this]() { threadWork(); } );
228  }
void watchPreallocate(Preallocate::slot_type const &iSlot)
std::atomic< unsigned int > activeStreams_
std::vector< edm::WaitingTaskWithArenaHolder > waitingTaskPerStream_
void watchPreStreamEndLumi(PreStreamEndLumi::slot_type const &iSlot)
unsigned int maxNumberOfStreams() const
Definition: SystemBounds.h:43
std::vector< std::array< long, 3 > > waitTimesPerStream_
std::unique_ptr< std::thread > serverThread_
const unsigned int nWaitingEvents_
std::condition_variable condition_
void watchPreStreamBeginLumi(PreStreamBeginLumi::slot_type const &iSlot)
void watchPreEndJob(PreEndJob::slot_type const &iSlot)
std::vector< int > waitingStreams_
std::atomic< bool > stopProcessing_

Member Function Documentation

void timestudy::SleepingServer::asyncWork ( edm::StreamID  id,
edm::WaitingTaskWithArenaHolder  iTask,
long  initTime,
long  workTime,
long  finishTime 
)
inline

Definition at line 230 of file TimeStudyModules.cc.

References eostools::move(), and relativeConstraints::value.

230  {
231  waitTimesPerStream_[id.value()]={{initTime,workTime,finishTime}};
232  waitingTaskPerStream_[id.value()]=std::move(iTask);
233  {
234  std::lock_guard<std::mutex> lk{mutex_};
235  waitingStreams_.push_back(id.value());
236  }
237  condition_.notify_one();
238  }
std::vector< edm::WaitingTaskWithArenaHolder > waitingTaskPerStream_
std::vector< std::array< long, 3 > > waitTimesPerStream_
std::condition_variable condition_
std::vector< int > waitingStreams_
def move(src, dest)
Definition: eostools.py:511
bool timestudy::SleepingServer::readyToDoSomething ( )
inlineprivate

Definition at line 241 of file TimeStudyModules.cc.

241  {
242  if(stopProcessing_) {
243  return true;
244  }
245  if(waitingStreams_.size() >= nWaitingEvents_) {
246  return true;
247  }
248  //every running stream is now waiting
249  return waitingStreams_.size() == activeStreams_;
250  }
std::atomic< unsigned int > activeStreams_
const unsigned int nWaitingEvents_
std::vector< int > waitingStreams_
std::atomic< bool > stopProcessing_
void timestudy::SleepingServer::threadWork ( )
inlineprivate

Definition at line 252 of file TimeStudyModules.cc.

References mps_fire::i, edm::swap(), and findQualityFiles::v.

252  {
253  while(not stopProcessing_.load()) {
254  std::vector<int> streamsToProcess;
255  {
256  std::unique_lock<std::mutex> lk(mutex_);
257  condition_.wait(lk, [this]() {
258  return readyToDoSomething();
259  });
260  swap(streamsToProcess,waitingStreams_);
261  }
262  if(stopProcessing_) {
263  break;
264  }
265  long longestTime = 0;
266  //simulate filling the external device
267  for(auto i: streamsToProcess) {
268  auto const& v=waitTimesPerStream_[i];
269  if(v[1]>longestTime) {
270  longestTime = v[1];
271  }
272  usleep(v[0]);
273  }
274  //simulate running external device
275  usleep(longestTime);
276 
277  //simulate copying data back
278  for(auto i: streamsToProcess) {
279  auto const& v=waitTimesPerStream_[i];
280  usleep(v[2]);
281  waitingTaskPerStream_[i].doneWaiting(std::exception_ptr());
282  }
283  }
284  waitingTaskPerStream_.clear();
285  }
std::vector< edm::WaitingTaskWithArenaHolder > waitingTaskPerStream_
void swap(Association< C > &lhs, Association< C > &rhs)
Definition: Association.h:116
std::vector< std::array< long, 3 > > waitTimesPerStream_
std::condition_variable condition_
std::vector< int > waitingStreams_
std::atomic< bool > stopProcessing_

Member Data Documentation

std::atomic<unsigned int> timestudy::SleepingServer::activeStreams_ {0}
private

Definition at line 293 of file TimeStudyModules.cc.

std::condition_variable timestudy::SleepingServer::condition_
private

Definition at line 292 of file TimeStudyModules.cc.

std::mutex timestudy::SleepingServer::mutex_
private

Definition at line 291 of file TimeStudyModules.cc.

const unsigned int timestudy::SleepingServer::nWaitingEvents_
private

Definition at line 286 of file TimeStudyModules.cc.

std::unique_ptr<std::thread> timestudy::SleepingServer::serverThread_
private

Definition at line 287 of file TimeStudyModules.cc.

std::atomic<bool> timestudy::SleepingServer::stopProcessing_ {false}
private

Definition at line 294 of file TimeStudyModules.cc.

std::vector<int> timestudy::SleepingServer::waitingStreams_
private

Definition at line 288 of file TimeStudyModules.cc.

std::vector<edm::WaitingTaskWithArenaHolder> timestudy::SleepingServer::waitingTaskPerStream_
private

Definition at line 290 of file TimeStudyModules.cc.

std::vector<std::array<long,3> > timestudy::SleepingServer::waitTimesPerStream_
private

Definition at line 289 of file TimeStudyModules.cc.