CMS 3D CMS Logo

WaitingServer.cc
Go to the documentation of this file.
1 
2 #include "WaitingServer.h"
3 
4 #include <exception>
5 
6 namespace edmtest {
7  namespace test_acquire {
8 
9  void WaitingServer::requestValuesAsync(unsigned int dataID,
10  std::vector<int> const* iIn,
11  std::vector<int>* iOut,
13  auto& streamData = m_perStream.at(dataID);
14 
15  streamData.in_ = iIn;
16  streamData.out_ = iOut;
17  streamData.holder_ = std::move(holder);
18  {
19  std::lock_guard<std::mutex> guard(m_mutex);
20  m_waitingStreams.push_back(dataID);
21  }
22  m_cond.notify_one(); //wakes up the server thread
23  }
24 
26  m_thread = std::make_unique<std::thread>([this]() { serverDoWork(); });
27  }
28 
30  m_shouldStop = true;
31  if (m_thread) {
32  m_thread->join();
33  m_thread.reset();
34  }
35  }
36 
38 
40  while (not m_shouldStop) {
41  std::vector<unsigned int> streamsToUse;
42  {
43  std::unique_lock<std::mutex> lk(m_mutex);
44 
45  m_cond.wait_for(lk, std::chrono::seconds(m_secondsToWait), [this]() -> bool { return readyForWork(); });
46 
47  // Once we know which streams have given us data
48  // we can release the lock and let other streams
49  // set their data
50  streamsToUse.swap(m_waitingStreams);
51  lk.unlock();
52  }
53 
54  // Here is the work that the server does for the modules
55  // it will just add 1 to each value it has been given
56  for (auto index : streamsToUse) {
57  auto& streamData = m_perStream.at(index);
58 
59  std::exception_ptr exceptionPtr;
60  try {
61  for (auto v : *streamData.in_) {
62  streamData.out_->push_back(v + 1);
63  }
64  } catch (...) {
65  exceptionPtr = std::current_exception();
66  }
67  streamData.holder_.doneWaiting(exceptionPtr);
68  }
69  }
70  }
71  } // namespace test_acquire
72 } // namespace edmtest
std::vector< StreamData > m_perStream
Definition: WaitingServer.h:50
const unsigned int m_minNumStreamsBeforeDoingWork
Definition: WaitingServer.h:52
std::condition_variable m_cond
Definition: WaitingServer.h:48
std::unique_ptr< std::thread > m_thread
Definition: WaitingServer.h:49
std::vector< unsigned int > m_waitingStreams
Definition: WaitingServer.h:51
def move(src, dest)
Definition: eostools.py:511
void requestValuesAsync(unsigned int dataID, std::vector< int > const *iIn, std::vector< int > *iOut, edm::WaitingTaskWithArenaHolder holder)
Definition: WaitingServer.cc:9