CMS 3D CMS Logo

SerialTaskQueueChain.h
Go to the documentation of this file.
1 #ifndef FWCore_Concurrency_SerialTaskQueueChain_h
2 #define FWCore_Concurrency_SerialTaskQueueChain_h
3 // -*- C++ -*-
4 //
5 // Package: FWCore/Concurrency
6 // Class : SerialTaskQueueChain
7 //
16 //
17 // Original Author: root
18 // Created: Mon, 15 Aug 2016 18:04:02 GMT
19 //
20 
21 // system include files
22 #include <cassert>
23 #include <memory>
24 #include <vector>
25 
26 // user include files
28 
29 // forward declarations
30 namespace edm {
32  public:
34  explicit SerialTaskQueueChain(std::vector<std::shared_ptr<SerialTaskQueue>> iQueues)
35  : m_queues(std::move(iQueues)) {}
36 
40  : m_queues(std::move(iOld.m_queues)), m_outstandingTasks{iOld.m_outstandingTasks.load()} {}
41 
43  m_queues = std::move(iOld.m_queues);
44  m_outstandingTasks.store(iOld.m_outstandingTasks.load());
45  return *this;
46  }
47 
49 
55  template <typename T>
56  void push(T&& iAction);
57 
59 
66  template <typename T>
67  void pushAndWait(T&& iAction);
68 
69  unsigned long outstandingTasks() const { return m_outstandingTasks; }
70  std::size_t numberOfQueues() const { return m_queues.size(); }
71 
72  private:
73  // ---------- member data --------------------------------
74  std::vector<std::shared_ptr<SerialTaskQueue>> m_queues;
75  std::atomic<unsigned long> m_outstandingTasks{0};
76 
77  template <typename T>
78  void passDownChain(unsigned int iIndex, T&& iAction);
79 
80  template <typename T>
81  void actionToRun(T&& iAction);
82  };
83 
84  template <typename T>
85  void SerialTaskQueueChain::push(T&& iAction) {
87  if (m_queues.size() == 1) {
88  m_queues[0]->push([this, iAction]() mutable { this->actionToRun(iAction); });
89  } else {
90  assert(!m_queues.empty());
91  m_queues[0]->push([this, iAction]() mutable { this->passDownChain(1, iAction); });
92  }
93  }
94 
95  template <typename T>
97  auto destry = [](tbb::task* iTask) { tbb::task::destroy(*iTask); };
98 
99  std::unique_ptr<tbb::task, decltype(destry)> waitTask(new (tbb::task::allocate_root()) tbb::empty_task, destry);
100  waitTask->set_ref_count(3);
101 
102  std::exception_ptr ptr;
103  auto waitTaskPtr = waitTask.get();
104  push([waitTaskPtr, iAction, &ptr]() {
105  //must wait until exception ptr would be set
106  auto dec = [](tbb::task* iTask) { iTask->decrement_ref_count(); };
107  std::unique_ptr<tbb::task, decltype(dec)> sentry(waitTaskPtr, dec);
108  try {
109  iAction();
110  } catch (...) {
111  ptr = std::current_exception();
112  }
113  });
114 
115  waitTask->decrement_ref_count();
116  waitTask->wait_for_all();
117 
118  if (ptr) {
119  std::rethrow_exception(ptr);
120  }
121  }
122 
123  template <typename T>
124  void SerialTaskQueueChain::passDownChain(unsigned int iQueueIndex, T&& iAction) {
125  //Have to be sure the queue associated to this running task
126  // does not attempt to start another task
127  m_queues[iQueueIndex - 1]->pause();
128  //is this the last queue?
129  if (iQueueIndex + 1 == m_queues.size()) {
130  m_queues[iQueueIndex]->push([this, iAction]() mutable { this->actionToRun(iAction); });
131  } else {
132  auto nextQueue = iQueueIndex + 1;
133  m_queues[iQueueIndex]->push([this, nextQueue, iAction]() mutable { this->passDownChain(nextQueue, iAction); });
134  }
135  }
136 
137  template <typename T>
139  //even if an exception happens we will resume the queues.
140  using Queues = std::vector<std::shared_ptr<SerialTaskQueue>>;
141  auto sentryAction = [](SerialTaskQueueChain* iChain) {
142  auto& vec = iChain->m_queues;
143  for (auto it = vec.rbegin() + 1; it != vec.rend(); ++it) {
144  (*it)->resume();
145  }
146  --(iChain->m_outstandingTasks);
147  };
148 
149  std::unique_ptr<SerialTaskQueueChain, decltype(sentryAction)> sentry(this, sentryAction);
150  iAction();
151  }
152 } // namespace edm
153 
154 #endif
unsigned long outstandingTasks() const
SerialTaskQueueChain & operator=(const SerialTaskQueueChain &)=delete
def destroy(e)
Definition: pyrootRender.py:15
std::vector< std::shared_ptr< SerialTaskQueue > > m_queues
void push(T &&iAction)
asynchronously pushes functor iAction into queue
std::atomic< unsigned long > m_outstandingTasks
std::size_t numberOfQueues() const
SerialTaskQueueChain(SerialTaskQueueChain &&iOld)
SerialTaskQueueChain & operator=(SerialTaskQueueChain &&iOld)
HLT enums.
void pushAndWait(T &&iAction)
synchronously pushes functor iAction into queue
SerialTaskQueueChain(std::vector< std::shared_ptr< SerialTaskQueue >> iQueues)
long double T
void passDownChain(unsigned int iIndex, T &&iAction)
def move(src, dest)
Definition: eostools.py:511