CMS 3D CMS Logo

WaitingThreadPool.h
Go to the documentation of this file.
1 #ifndef FWCore_Concurrency_WaitingThreadPool_h
2 #define FWCore_Concurrency_WaitingThreadPool_h
3 
7 
8 #include <condition_variable>
9 #include <mutex>
10 #include <thread>
11 
12 namespace edm {
13  namespace impl {
14  class WaitingThread {
15  public:
16  WaitingThread();
17  ~WaitingThread() noexcept;
18 
19  WaitingThread(WaitingThread const&) = delete;
21  WaitingThread(WaitingThread&&) = delete;
22  WaitingThread& operator=(WaitingThread const&) = delete;
23 
24  template <typename F, typename G>
26  F&& func,
27  G&& errorContextFunc,
28  std::shared_ptr<WaitingThread> thisPtr) {
29  std::unique_lock lk(mutex_);
30  func_ = [holder = std::move(holder),
31  func = std::forward<F>(func),
32  errorContext = std::forward<G>(errorContextFunc)]() mutable {
33  try {
34  convertException::wrap([&func]() { func(); });
35  } catch (cms::Exception& e) {
36  e.addContext(errorContext());
37  // doneWaiting() is intentionally not called here. The
38  // reference count decrement must be done only in
39  // threadLoop() (see the comment there)
40  holder.presetTaskAsFailed(std::current_exception());
41  }
42  };
43  thisPtr_ = std::move(thisPtr);
44  cond_.notify_one();
45  }
46 
47  private:
48  void stopThread() {
49  std::unique_lock lk(mutex_);
50  stopThread_ = true;
51  cond_.notify_one();
52  }
53 
54  void threadLoop() noexcept;
55 
56  std::thread thread_;
58  std::condition_variable cond_;
60  // The purpose of thisPtr_ is to keep the WaitingThread object
61  // outside of the WaitingThreadPool until the func_ has returned.
64  };
65  } // namespace impl
66 
67  // Provides a mechanism to run the function 'func' asynchronously,
68  // i.e. without the calling thread to wait for the func() to return.
69  // The func should do as little work (outside of the TBB threadpool)
70  // as possible. The func must terminate eventually. The intended use
71  // case are blocking synchronization calls with external entities,
72  // where the calling thread is suspended while waiting.
73  //
74  // The func() is run in a thread that belongs to a separate pool of
75  // threads than the calling thread. Remotely similar to
76  // std::async(), but instead of dealing with std::futures, takes an
77  // edm::WaitingTaskWithArenaHolder object, that is signaled upon the
78  // func() returning or throwing an exception.
79  //
80  // The caller is responsible for keeping the WaitingThreadPool
81  // object alive at least as long as all asynchronous calls finish.
83  public:
84  WaitingThreadPool() = default;
85  WaitingThreadPool(WaitingThreadPool const&) = delete;
89 
98  template <typename F, typename G>
99  void runAsync(WaitingTaskWithArenaHolder holder, F&& func, G&& errorContextFunc) {
100  auto thread = pool_.makeOrGet([]() { return std::make_unique<impl::WaitingThread>(); });
101  thread->run(std::move(holder), std::forward<F>(func), std::forward<G>(errorContextFunc), std::move(thread));
102  }
103 
104  private:
106  };
107 } // namespace edm
108 
109 #endif
static std::mutex mutex
Definition: Proxy.cc:8
edm::ReusableObjectHolder< impl::WaitingThread > pool_
#define CMS_THREAD_GUARD(_var_)
void runAsync(WaitingTaskWithArenaHolder holder, F &&func, G &&errorContextFunc)
std::shared_ptr< WaitingThread > thisPtr_
void run(WaitingTaskWithArenaHolder holder, F &&func, G &&errorContextFunc, std::shared_ptr< WaitingThread > thisPtr)
void presetTaskAsFailed(std::exception_ptr iExcept) noexcept
HLT enums.
WaitingThread & operator=(WaitingThread &&)=delete
auto wrap(F iFunc) -> decltype(iFunc())
static uInt32 F(BLOWFISH_CTX *ctx, uInt32 x)
Definition: blowfish.cc:163
std::condition_variable cond_
std::function< void()> func_
def move(src, dest)
Definition: eostools.py:511