CMS 3D CMS Logo

SerialTaskQueue.h
Go to the documentation of this file.
1 #ifndef FWCore_Concurrency_SerialTaskQueue_h
2 #define FWCore_Concurrency_SerialTaskQueue_h
3 // -*- C++ -*-
4 //
5 // Package: Concurrency
6 // Class : SerialTaskQueue
7 //
49 //
50 // Original Author: Chris Jones
51 // Created: Thu Feb 21 11:14:39 CST 2013
52 // $Id$
53 //
54 
55 // system include files
56 #include <atomic>
57 #include <cassert>
58 
59 #include "tbb/task.h"
60 #include "tbb/concurrent_queue.h"
61 
62 // user include files
63 
64 // forward declarations
65 namespace edm {
67  {
68  public:
71  m_pauseCount{0}
72  { }
73 
75  m_tasks(std::move(iOther.m_tasks)),
76  m_taskChosen(iOther.m_taskChosen.exchange(false)),
77  m_pauseCount(iOther.m_pauseCount.exchange(0))
78  {
79  assert(m_tasks.empty() and m_taskChosen == false);
80  }
82 
83  // ---------- const member functions ---------------------
85 
88  bool isPaused() const { return m_pauseCount.load()!=0;}
89 
90  // ---------- member functions ---------------------------
92 
100  bool pause() {
101  return 1 == ++m_pauseCount;
102  }
103 
105 
112  bool resume();
113 
115 
121  template<typename T>
122  void push(const T& iAction);
123 
125 
132  template<typename T>
133  void pushAndWait(const T& iAction);
134 
136 
144  template<typename T>
145  tbb::task* pushAndGetNextTaskToRun(const T& iAction);
146 
147  private:
148  SerialTaskQueue(const SerialTaskQueue&) = delete;
149  const SerialTaskQueue& operator=(const SerialTaskQueue&) = delete;
150 
152  class TaskBase : public tbb::task {
153  friend class SerialTaskQueue;
155 
156  protected:
157  tbb::task* finishedTask();
158  private:
159  void setQueue(SerialTaskQueue* iQueue) { m_queue = iQueue;}
160 
162  };
163 
164  template< typename T>
165  class QueuedTask : public TaskBase {
166  public:
167  QueuedTask( const T& iAction):
168  m_action(iAction) {}
169 
170  private:
171  tbb::task* execute() override;
172 
174  };
175 
176  friend class TaskBase;
177 
178  void pushTask(TaskBase*);
179  tbb::task* pushAndGetNextTask(TaskBase*);
180  tbb::task* finishedTask();
181  //returns nullptr if a task is already being processed
183 
184  void pushAndWait(tbb::empty_task* iWait,TaskBase*);
185 
186 
187  // ---------- member data --------------------------------
188  tbb::concurrent_queue<TaskBase*> m_tasks;
189  std::atomic<bool> m_taskChosen;
190  std::atomic<unsigned long> m_pauseCount;
191  };
192 
193  template<typename T>
194  void SerialTaskQueue::push(const T& iAction) {
195  QueuedTask<T>* pTask{ new (tbb::task::allocate_root()) QueuedTask<T>{iAction} };
196  pTask->setQueue(this);
197  pushTask(pTask);
198  }
199 
200  template<typename T>
201  void SerialTaskQueue::pushAndWait(const T& iAction) {
202  tbb::empty_task* waitTask = new (tbb::task::allocate_root()) tbb::empty_task;
203  waitTask->set_ref_count(2);
204  QueuedTask<T>* pTask{ new (waitTask->allocate_child()) QueuedTask<T>{iAction} };
205  pTask->setQueue(this);
206  pushAndWait(waitTask,pTask);
207  }
208 
209  template<typename T>
210  tbb::task* SerialTaskQueue::pushAndGetNextTaskToRun(const T& iAction) {
211  QueuedTask<T>* pTask{ new (tbb::task::allocate_root()) QueuedTask<T>{iAction} };
212  pTask->setQueue(this);
213  return pushAndGetNextTask(pTask);
214  }
215 
216  inline
217  tbb::task*
219 
220  template <typename T>
221  tbb::task*
223  try {
224  this->m_action();
225  } catch(...) {}
226  return this->finishedTask();
227  }
228 
229 }
230 
231 #endif
const SerialTaskQueue & operator=(const SerialTaskQueue &)=delete
void setQueue(SerialTaskQueue *iQueue)
tbb::task * finishedTask()
void pushAndWait(const T &iAction)
synchronously pushes functor iAction into queue
std::atomic< unsigned long > m_pauseCount
tbb::task * pushAndGetNextTask(TaskBase *)
#define nullptr
void pushTask(TaskBase *)
bool resume()
Resumes processing if the queue was paused.
bool isPaused() const
Checks to see if the queue has been paused.
SerialTaskQueue(SerialTaskQueue &&iOther)
tbb::task * execute() override
TaskBase * pickNextTask()
std::atomic< bool > m_taskChosen
HLT enums.
void push(const T &iAction)
asynchronously pushes functor iAction into queue
long double T
tbb::concurrent_queue< TaskBase * > m_tasks
bool pause()
Pauses processing of additional tasks from the queue.
def move(src, dest)
Definition: eostools.py:511
tbb::task * pushAndGetNextTaskToRun(const T &iAction)
asynchronously pushes functor iAction into queue and finds next task to execute