CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
SerialTaskQueueChain.h
Go to the documentation of this file.
1 #ifndef FWCore_Concurrency_SerialTaskQueueChain_h
2 #define FWCore_Concurrency_SerialTaskQueueChain_h
3 // -*- C++ -*-
4 //
5 // Package: FWCore/Concurrency
6 // Class : SerialTaskQueueChain
7 //
16 //
17 // Original Author: root
18 // Created: Mon, 15 Aug 2016 18:04:02 GMT
19 //
20 
21 // system include files
22 #include <cassert>
23 #include <memory>
24 #include <vector>
25 
26 // user include files
29 
30 // forward declarations
31 namespace edm {
33  public:
35  explicit SerialTaskQueueChain(std::vector<std::shared_ptr<SerialTaskQueue>> iQueues)
36  : m_queues(std::move(iQueues)) {}
37 
41  : m_queues(std::move(iOld.m_queues)), m_outstandingTasks{iOld.m_outstandingTasks.load()} {}
42 
44  m_queues = std::move(iOld.m_queues);
45  m_outstandingTasks.store(iOld.m_outstandingTasks.load());
46  return *this;
47  }
48 
50 
56  template <typename T>
57  void push(tbb::task_group& iGroup, T&& iAction);
58 
59  unsigned long outstandingTasks() const { return m_outstandingTasks; }
60  std::size_t numberOfQueues() const { return m_queues.size(); }
61 
62  private:
63  // ---------- member data --------------------------------
64  std::vector<std::shared_ptr<SerialTaskQueue>> m_queues;
65  std::atomic<unsigned long> m_outstandingTasks{0};
66 
67  template <typename T>
68  void passDownChain(unsigned int iIndex, tbb::task_group& iGroup, T&& iAction);
69 
70  template <typename T>
71  void actionToRun(T&& iAction);
72  };
73 
74  template <typename T>
75  void SerialTaskQueueChain::push(tbb::task_group& iGroup, T&& iAction) {
77  if (m_queues.size() == 1) {
78  m_queues[0]->push(iGroup, [this, iAction]() mutable { this->actionToRun(iAction); });
79  } else {
80  assert(!m_queues.empty());
81  m_queues[0]->push(iGroup, [this, &iGroup, iAction]() mutable { this->passDownChain(1, iGroup, iAction); });
82  }
83  }
84 
85  template <typename T>
86  void SerialTaskQueueChain::passDownChain(unsigned int iQueueIndex, tbb::task_group& iGroup, T&& iAction) {
87  //Have to be sure the queue associated to this running task
88  // does not attempt to start another task
89  m_queues[iQueueIndex - 1]->pause();
90  //is this the last queue?
91  if (iQueueIndex + 1 == m_queues.size()) {
92  m_queues[iQueueIndex]->push(iGroup, [this, iAction]() mutable { this->actionToRun(iAction); });
93  } else {
94  auto nextQueue = iQueueIndex + 1;
95  m_queues[iQueueIndex]->push(
96  iGroup, [this, nextQueue, &iGroup, iAction]() mutable { this->passDownChain(nextQueue, iGroup, iAction); });
97  }
98  }
99 
100  template <typename T>
102  //even if an exception happens we will resume the queues.
103  using Queues = std::vector<std::shared_ptr<SerialTaskQueue>>;
104  auto sentryAction = [](SerialTaskQueueChain* iChain) {
105  auto& vec = iChain->m_queues;
106  for (auto it = vec.rbegin() + 1; it != vec.rend(); ++it) {
107  (*it)->resume();
108  }
109  --(iChain->m_outstandingTasks);
110  };
111 
112  std::unique_ptr<SerialTaskQueueChain, decltype(sentryAction)> sentry(this, sentryAction);
113  iAction();
114  }
115 } // namespace edm
116 
117 #endif
unsigned long outstandingTasks() const
SerialTaskQueueChain & operator=(const SerialTaskQueueChain &)=delete
void push(tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
std::vector< std::shared_ptr< SerialTaskQueue > > m_queues
assert(be >=bs)
void passDownChain(unsigned int iIndex, tbb::task_group &iGroup, T &&iAction)
def move
Definition: eostools.py:511
std::atomic< unsigned long > m_outstandingTasks
std::size_t numberOfQueues() const
SerialTaskQueueChain(SerialTaskQueueChain &&iOld)
SerialTaskQueueChain & operator=(SerialTaskQueueChain &&iOld)
SerialTaskQueueChain(std::vector< std::shared_ptr< SerialTaskQueue >> iQueues)
long double T