CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
List of all members | Public Member Functions | Private Member Functions | Private Attributes
timestudy::SleepingServer Class Reference

Public Member Functions

void asyncWork (edm::StreamID id, edm::WaitingTaskWithArenaHolder iTask, long initTime, long workTime, long finishTime)
 
 SleepingServer (edm::ParameterSet const &iPS, edm::ActivityRegistry &iAR)
 

Private Member Functions

bool readyToDoSomething ()
 
void threadWork ()
 

Private Attributes

std::atomic< unsigned int > activeStreams_ {0}
 
std::condition_variable condition_
 
std::mutex mutex_
 
const unsigned int nWaitingEvents_
 
std::unique_ptr< std::thread > serverThread_
 
std::atomic< bool > stopProcessing_ {false}
 
std::vector< int > waitingStreams_
 
std::vector
< edm::WaitingTaskWithArenaHolder
waitingTaskPerStream_
 
std::vector< std::array< long, 3 > > waitTimesPerStream_
 

Detailed Description

Definition at line 185 of file TimeStudyModules.cc.

Constructor & Destructor Documentation

timestudy::SleepingServer::SleepingServer ( edm::ParameterSet const &  iPS,
edm::ActivityRegistry iAR 
)
inline

Definition at line 187 of file TimeStudyModules.cc.

References edm::service::SystemBounds::maxNumberOfStreams(), edm::ActivityRegistry::watchPreallocate(), edm::ActivityRegistry::watchPreEndJob(), edm::ActivityRegistry::watchPreStreamBeginLumi(), and edm::ActivityRegistry::watchPreStreamEndLumi().

188  : nWaitingEvents_(iPS.getUntrackedParameter<unsigned int>("nWaitingEvents")) {
189  iAR.watchPreallocate([this](edm::service::SystemBounds const& iBounds) {
190  auto const nStreams = iBounds.maxNumberOfStreams();
191  waitingStreams_.reserve(nStreams);
192  waitTimesPerStream_.resize(nStreams);
193  waitingTaskPerStream_.resize(nStreams);
194  });
195 
196  iAR.watchPreEndJob([this]() {
197  stopProcessing_ = true;
198  condition_.notify_one();
199  serverThread_->join();
200  });
201  iAR.watchPreStreamBeginLumi([this](edm::StreamContext const&) { ++activeStreams_; });
202  iAR.watchPreStreamEndLumi([this](edm::StreamContext const&) {
203  --activeStreams_;
204  condition_.notify_one();
205  });
206 
207  serverThread_ = std::make_unique<std::thread>([this]() { threadWork(); });
208  }
void watchPreallocate(Preallocate::slot_type const &iSlot)
std::atomic< unsigned int > activeStreams_
std::vector< edm::WaitingTaskWithArenaHolder > waitingTaskPerStream_
std::vector< std::array< long, 3 > > waitTimesPerStream_
void watchPreStreamEndLumi(PreStreamEndLumi::slot_type const &iSlot)
unsigned int maxNumberOfStreams() const
Definition: SystemBounds.h:35
std::unique_ptr< std::thread > serverThread_
const unsigned int nWaitingEvents_
std::condition_variable condition_
void watchPreStreamBeginLumi(PreStreamBeginLumi::slot_type const &iSlot)
void watchPreEndJob(PreEndJob::slot_type const &iSlot)
std::vector< int > waitingStreams_
std::atomic< bool > stopProcessing_

Member Function Documentation

void timestudy::SleepingServer::asyncWork ( edm::StreamID  id,
edm::WaitingTaskWithArenaHolder  iTask,
long  initTime,
long  workTime,
long  finishTime 
)
inline

Definition at line 210 of file TimeStudyModules.cc.

References eostools::move(), and relativeConstraints::value.

211  {
212  waitTimesPerStream_[id.value()] = {{initTime, workTime, finishTime}};
213  waitingTaskPerStream_[id.value()] = std::move(iTask);
214  {
215  std::lock_guard<std::mutex> lk{mutex_};
216  waitingStreams_.push_back(id.value());
217  }
218  condition_.notify_one();
219  }
std::vector< edm::WaitingTaskWithArenaHolder > waitingTaskPerStream_
std::vector< std::array< long, 3 > > waitTimesPerStream_
def move
Definition: eostools.py:511
std::condition_variable condition_
std::vector< int > waitingStreams_
bool timestudy::SleepingServer::readyToDoSomething ( )
inlineprivate

Definition at line 222 of file TimeStudyModules.cc.

222  {
223  if (stopProcessing_) {
224  return true;
225  }
226  if (waitingStreams_.size() >= nWaitingEvents_) {
227  return true;
228  }
229  //every running stream is now waiting
230  return waitingStreams_.size() == activeStreams_;
231  }
std::atomic< unsigned int > activeStreams_
const unsigned int nWaitingEvents_
std::vector< int > waitingStreams_
std::atomic< bool > stopProcessing_
void timestudy::SleepingServer::threadWork ( )
inlineprivate

Definition at line 233 of file TimeStudyModules.cc.

References mps_fire::i, edm::swap(), and findQualityFiles::v.

233  {
234  while (not stopProcessing_.load()) {
235  std::vector<int> streamsToProcess;
236  {
237  std::unique_lock<std::mutex> lk(mutex_);
238  condition_.wait(lk, [this]() { return readyToDoSomething(); });
239  swap(streamsToProcess, waitingStreams_);
240  }
241  if (stopProcessing_) {
242  break;
243  }
244  long longestTime = 0;
245  //simulate filling the external device
246  for (auto i : streamsToProcess) {
247  auto const& v = waitTimesPerStream_[i];
248  if (v[1] > longestTime) {
249  longestTime = v[1];
250  }
251  usleep(v[0]);
252  }
253  //simulate running external device
254  usleep(longestTime);
255 
256  //simulate copying data back
257  for (auto i : streamsToProcess) {
258  auto const& v = waitTimesPerStream_[i];
259  usleep(v[2]);
260  waitingTaskPerStream_[i].doneWaiting(std::exception_ptr());
261  }
262  }
263  waitingTaskPerStream_.clear();
264  }
std::vector< edm::WaitingTaskWithArenaHolder > waitingTaskPerStream_
std::vector< std::array< long, 3 > > waitTimesPerStream_
void swap(Association< C > &lhs, Association< C > &rhs)
Definition: Association.h:117
std::condition_variable condition_
std::vector< int > waitingStreams_
std::atomic< bool > stopProcessing_

Member Data Documentation

std::atomic<unsigned int> timestudy::SleepingServer::activeStreams_ {0}
private

Definition at line 272 of file TimeStudyModules.cc.

std::condition_variable timestudy::SleepingServer::condition_
private
std::mutex timestudy::SleepingServer::mutex_
private

Definition at line 270 of file TimeStudyModules.cc.

const unsigned int timestudy::SleepingServer::nWaitingEvents_
private

Definition at line 265 of file TimeStudyModules.cc.

std::unique_ptr<std::thread> timestudy::SleepingServer::serverThread_
private

Definition at line 266 of file TimeStudyModules.cc.

std::atomic<bool> timestudy::SleepingServer::stopProcessing_ {false}
private

Definition at line 273 of file TimeStudyModules.cc.

std::vector<int> timestudy::SleepingServer::waitingStreams_
private

Definition at line 267 of file TimeStudyModules.cc.

std::vector<edm::WaitingTaskWithArenaHolder> timestudy::SleepingServer::waitingTaskPerStream_
private

Definition at line 269 of file TimeStudyModules.cc.

std::vector<std::array<long, 3> > timestudy::SleepingServer::waitTimesPerStream_
private

Definition at line 268 of file TimeStudyModules.cc.