CMS 3D CMS Logo

SerialTaskQueueChain.h
Go to the documentation of this file.
1 #ifndef FWCore_Concurrency_SerialTaskQueueChain_h
2 #define FWCore_Concurrency_SerialTaskQueueChain_h
3 // -*- C++ -*-
4 //
5 // Package: FWCore/Concurrency
6 // Class : SerialTaskQueueChain
7 //
16 //
17 // Original Author: root
18 // Created: Mon, 15 Aug 2016 18:04:02 GMT
19 //
20 
21 // system include files
22 #include <cassert>
23 #include <memory>
24 #include <vector>
25 
26 // user include files
28 
29 // forward declarations
30 namespace edm {
32  {
33 
34  public:
36  explicit SerialTaskQueueChain(std::vector<std::shared_ptr<SerialTaskQueue>> iQueues):
37  m_queues(std::move(iQueues)) {}
38 
42  m_queues(std::move(iOld.m_queues)),
43  m_outstandingTasks{ iOld.m_outstandingTasks.load() } {}
44 
46  m_queues = std::move(iOld.m_queues);
47  m_outstandingTasks.store( iOld.m_outstandingTasks.load());
48  return *this;
49  }
50 
51 
52 
54 
60  template<typename T>
61  void push(const T& iAction);
62 
64 
71  template<typename T>
72  void pushAndWait(const T& iAction);
73 
74  unsigned long outstandingTasks() const { return m_outstandingTasks; }
75  std::size_t numberOfQueues() const {return m_queues.size(); }
76  private:
77 
78  // ---------- member data --------------------------------
79  std::vector<std::shared_ptr<SerialTaskQueue>> m_queues;
80  std::atomic<unsigned long> m_outstandingTasks{0};
81 
82  template<typename T>
83  void passDownChain(unsigned int iIndex, const T& iAction);
84 
85  template<typename T>
86  void actionToRun(const T& iAction);
87 
88  };
89 
90  template<typename T>
91  void SerialTaskQueueChain::push(const T& iAction) {
93  if(m_queues.size() == 1) {
94  m_queues[0]->push( [this,iAction]() {this->actionToRun(iAction);} );
95  } else {
96  assert(!m_queues.empty());
97  m_queues[0]->push([this, iAction]() {
98  this->passDownChain(1, iAction);
99  });
100  }
101  }
102 
103  template<typename T>
104  void SerialTaskQueueChain::pushAndWait(const T& iAction) {
105  auto destry = [](tbb::task* iTask) { tbb::task::destroy(*iTask); };
106 
107  std::unique_ptr<tbb::task, decltype(destry)> waitTask( new (tbb::task::allocate_root()) tbb::empty_task, destry );
108  waitTask->set_ref_count(3);
109 
110  std::exception_ptr ptr;
111  auto waitTaskPtr = waitTask.get();
112  push([waitTaskPtr, iAction,&ptr](){
113  //must wait until exception ptr would be set
114  auto dec = [](tbb::task* iTask){ iTask->decrement_ref_count();};
115  std::unique_ptr<tbb::task, decltype(dec)> sentry(waitTaskPtr,dec);
116  try {
117  iAction();
118  }catch(...) {
119  ptr = std::current_exception();
120  }
121  });
122 
123  waitTask->decrement_ref_count();
124  waitTask->wait_for_all();
125 
126  if(ptr) {
127  std::rethrow_exception(ptr);
128  }
129  }
130 
131  template<typename T>
132  void SerialTaskQueueChain::passDownChain(unsigned int iQueueIndex, const T& iAction) {
133  //Have to be sure the queue associated to this running task
134  // does not attempt to start another task
135  m_queues[iQueueIndex-1]->pause();
136  //is this the last queue?
137  if(iQueueIndex +1 == m_queues.size()) {
138  m_queues[iQueueIndex]->push([this,iAction]{ this->actionToRun(iAction); });
139  } else {
140  auto nextQueue = iQueueIndex+1;
141  m_queues[iQueueIndex]->push([this, nextQueue, iAction]() {
142  this->passDownChain(nextQueue, iAction);
143  });
144  }
145  }
146 
147  template<typename T>
148  void SerialTaskQueueChain::actionToRun(const T& iAction) {
149  //even if an exception happens we will resume the queues.
150  using Queues= std::vector<std::shared_ptr<SerialTaskQueue>>;
151  auto sentryAction = [](SerialTaskQueueChain* iChain) {
152  auto& vec = iChain->m_queues;
153  for(auto it = vec.rbegin()+1; it != vec.rend(); ++it) {
154  (*it)->resume();
155  }
156  --(iChain->m_outstandingTasks);
157  };
158 
159  std::unique_ptr<SerialTaskQueueChain,decltype(sentryAction)> sentry( this, sentryAction);
160  iAction();
161  }
162 }
163 
164 
165 #endif
unsigned long outstandingTasks() const
SerialTaskQueueChain & operator=(const SerialTaskQueueChain &)=delete
def destroy(e)
Definition: pyrootRender.py:13
void push(const T &iAction)
asynchronously pushes functor iAction into queue
std::vector< std::shared_ptr< SerialTaskQueue > > m_queues
void actionToRun(const T &iAction)
std::atomic< unsigned long > m_outstandingTasks
std::size_t numberOfQueues() const
void pushAndWait(const T &iAction)
synchronously pushes functor iAction into queue
SerialTaskQueueChain(SerialTaskQueueChain &&iOld)
SerialTaskQueueChain & operator=(SerialTaskQueueChain &&iOld)
HLT enums.
SerialTaskQueueChain(std::vector< std::shared_ptr< SerialTaskQueue >> iQueues)
long double T
void passDownChain(unsigned int iIndex, const T &iAction)
def move(src, dest)
Definition: eostools.py:510