CMS 3D CMS Logo

SerialTaskQueueChain.h
Go to the documentation of this file.
1 #ifndef FWCore_Concurrency_SerialTaskQueueChain_h
2 #define FWCore_Concurrency_SerialTaskQueueChain_h
3 // -*- C++ -*-
4 //
5 // Package: FWCore/Concurrency
6 // Class : SerialTaskQueueChain
7 //
16 //
17 // Original Author: root
18 // Created: Mon, 15 Aug 2016 18:04:02 GMT
19 //
20 
21 // system include files
22 #include <cassert>
23 #include <memory>
24 #include <vector>
25 
26 // user include files
29 
30 // forward declarations
31 namespace edm {
33  public:
35  explicit SerialTaskQueueChain(std::vector<std::shared_ptr<SerialTaskQueue>> iQueues)
36  : m_queues(std::move(iQueues)) {}
37 
41  : m_queues(std::move(iOld.m_queues)), m_outstandingTasks{iOld.m_outstandingTasks.load()} {}
42 
44  m_queues = std::move(iOld.m_queues);
45  m_outstandingTasks.store(iOld.m_outstandingTasks.load());
46  return *this;
47  }
48 
50 
56  template <typename T>
57  void push(T&& iAction);
58 
60 
67  template <typename T>
68  void pushAndWait(T&& iAction);
69 
70  unsigned long outstandingTasks() const { return m_outstandingTasks; }
71  std::size_t numberOfQueues() const { return m_queues.size(); }
72 
73  private:
74  // ---------- member data --------------------------------
75  std::vector<std::shared_ptr<SerialTaskQueue>> m_queues;
76  std::atomic<unsigned long> m_outstandingTasks{0};
77 
78  template <typename T>
79  void passDownChain(unsigned int iIndex, T&& iAction);
80 
81  template <typename T>
82  void actionToRun(T&& iAction);
83  };
84 
85  template <typename T>
86  void SerialTaskQueueChain::push(T&& iAction) {
88  if (m_queues.size() == 1) {
89  m_queues[0]->push([this, iAction]() mutable { this->actionToRun(iAction); });
90  } else {
91  assert(!m_queues.empty());
92  m_queues[0]->push([this, iAction]() mutable { this->passDownChain(1, iAction); });
93  }
94  }
95 
96  template <typename T>
98  auto destry = [](tbb::task* iTask) { tbb::task::destroy(*iTask); };
99 
100  std::unique_ptr<tbb::task, decltype(destry)> waitTask(new (tbb::task::allocate_root()) tbb::empty_task, destry);
101  waitTask->set_ref_count(3);
102 
103  std::exception_ptr ptr;
104  auto waitTaskPtr = waitTask.get();
105  push([waitTaskPtr, iAction, &ptr]() {
106  //must wait until exception ptr would be set
107  auto dec = [](tbb::task* iTask) { iTask->decrement_ref_count(); };
108  std::unique_ptr<tbb::task, decltype(dec)> sentry(waitTaskPtr, dec);
109  // Caught exception is rethrown further below.
110  CMS_SA_ALLOW try { iAction(); } catch (...) {
111  ptr = std::current_exception();
112  }
113  });
114 
115  waitTask->decrement_ref_count();
116  waitTask->wait_for_all();
117 
118  if (ptr) {
119  std::rethrow_exception(ptr);
120  }
121  }
122 
123  template <typename T>
124  void SerialTaskQueueChain::passDownChain(unsigned int iQueueIndex, T&& iAction) {
125  //Have to be sure the queue associated to this running task
126  // does not attempt to start another task
127  m_queues[iQueueIndex - 1]->pause();
128  //is this the last queue?
129  if (iQueueIndex + 1 == m_queues.size()) {
130  m_queues[iQueueIndex]->push([this, iAction]() mutable { this->actionToRun(iAction); });
131  } else {
132  auto nextQueue = iQueueIndex + 1;
133  m_queues[iQueueIndex]->push([this, nextQueue, iAction]() mutable { this->passDownChain(nextQueue, iAction); });
134  }
135  }
136 
137  template <typename T>
139  //even if an exception happens we will resume the queues.
140  using Queues = std::vector<std::shared_ptr<SerialTaskQueue>>;
141  auto sentryAction = [](SerialTaskQueueChain* iChain) {
142  auto& vec = iChain->m_queues;
143  for (auto it = vec.rbegin() + 1; it != vec.rend(); ++it) {
144  (*it)->resume();
145  }
146  --(iChain->m_outstandingTasks);
147  };
148 
149  std::unique_ptr<SerialTaskQueueChain, decltype(sentryAction)> sentry(this, sentryAction);
150  iAction();
151  }
152 } // namespace edm
153 
154 #endif
edm::SerialTaskQueueChain::actionToRun
void actionToRun(T &&iAction)
Definition: SerialTaskQueueChain.h:138
edm::SerialTaskQueueChain::SerialTaskQueueChain
SerialTaskQueueChain()
Definition: SerialTaskQueueChain.h:34
edm
HLT enums.
Definition: AlignableModifier.h:19
cms::cuda::assert
assert(be >=bs)
CMS_SA_ALLOW
#define CMS_SA_ALLOW
Definition: thread_safety_macros.h:5
edm::SerialTaskQueueChain::m_outstandingTasks
std::atomic< unsigned long > m_outstandingTasks
Definition: SerialTaskQueueChain.h:76
edm::SerialTaskQueueChain::pushAndWait
void pushAndWait(T &&iAction)
synchronously pushes functor iAction into queue
Definition: SerialTaskQueueChain.h:97
SerialTaskQueue.h
edm::SerialTaskQueueChain::push
void push(T &&iAction)
asynchronously pushes functor iAction into queue
Definition: SerialTaskQueueChain.h:86
TrackValidation_cff.task
task
Definition: TrackValidation_cff.py:252
edm::SerialTaskQueueChain::operator=
SerialTaskQueueChain & operator=(const SerialTaskQueueChain &)=delete
edm::SerialTaskQueueChain::numberOfQueues
std::size_t numberOfQueues() const
Definition: SerialTaskQueueChain.h:71
edm::SerialTaskQueueChain::SerialTaskQueueChain
SerialTaskQueueChain(std::vector< std::shared_ptr< SerialTaskQueue >> iQueues)
Definition: SerialTaskQueueChain.h:35
thread_safety_macros.h
edm::SerialTaskQueueChain::SerialTaskQueueChain
SerialTaskQueueChain(SerialTaskQueueChain &&iOld)
Definition: SerialTaskQueueChain.h:40
trackerHitRTTI::vector
Definition: trackerHitRTTI.h:21
edm::SerialTaskQueueChain::operator=
SerialTaskQueueChain & operator=(SerialTaskQueueChain &&iOld)
Definition: SerialTaskQueueChain.h:43
cms::cuda::device::unique_ptr
std::unique_ptr< T, impl::DeviceDeleter > unique_ptr
Definition: device_unique_ptr.h:33
eostools.move
def move(src, dest)
Definition: eostools.py:511
std
Definition: JetResolutionObject.h:76
T
long double T
Definition: Basic3DVectorLD.h:48
edm::SerialTaskQueueChain::m_queues
std::vector< std::shared_ptr< SerialTaskQueue > > m_queues
Definition: SerialTaskQueueChain.h:75
edm::SerialTaskQueueChain
Definition: SerialTaskQueueChain.h:32
TauDecayModes.dec
dec
Definition: TauDecayModes.py:143
edm::SerialTaskQueueChain::outstandingTasks
unsigned long outstandingTasks() const
Definition: SerialTaskQueueChain.h:70
edm::SerialTaskQueueChain::passDownChain
void passDownChain(unsigned int iIndex, T &&iAction)
Definition: SerialTaskQueueChain.h:124