CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
SerialTaskQueue.h
Go to the documentation of this file.
1 #ifndef FWCore_Concurrency_SerialTaskQueue_h
2 #define FWCore_Concurrency_SerialTaskQueue_h
3 // -*- C++ -*-
4 //
5 // Package: Concurrency
6 // Class : SerialTaskQueue
7 //
49 //
50 // Original Author: Chris Jones
51 // Created: Thu Feb 21 11:14:39 CST 2013
52 // $Id$
53 //
54 
55 // system include files
56 #include <atomic>
57 
58 #include "tbb/task.h"
59 #include "tbb/concurrent_queue.h"
60 
61 // user include files
62 
63 // forward declarations
64 namespace edm {
66  {
67  public:
70  m_pauseCount{0}
71  { }
72 
73  // ---------- const member functions ---------------------
75 
78  bool isPaused() const { return m_pauseCount.load()==0;}
79 
80  // ---------- member functions ---------------------------
82 
90  bool pause() {
91  return 1 == ++m_pauseCount;
92  }
93 
95 
102  bool resume();
103 
105 
111  template<typename T>
112  void push(const T& iAction);
113 
115 
122  template<typename T>
123  void pushAndWait(const T& iAction);
124 
126 
134  template<typename T>
135  tbb::task* pushAndGetNextTaskToRun(const T& iAction);
136 
137  private:
138  SerialTaskQueue(const SerialTaskQueue&) = delete;
139  const SerialTaskQueue& operator=(const SerialTaskQueue&) = delete;
140 
142  class TaskBase : public tbb::task {
143  friend class SerialTaskQueue;
144  TaskBase(): m_queue(0) {}
145 
146  protected:
147  tbb::task* finishedTask();
148  private:
149  void setQueue(SerialTaskQueue* iQueue) { m_queue = iQueue;}
150 
152  };
153 
154  template< typename T>
155  class QueuedTask : public TaskBase {
156  public:
157  QueuedTask( const T& iAction):
158  m_action(iAction) {}
159 
160  private:
161  tbb::task* execute();
162 
164  };
165 
166  friend class TaskBase;
167 
168  void pushTask(TaskBase*);
169  tbb::task* pushAndGetNextTask(TaskBase*);
170  tbb::task* finishedTask();
171  //returns nullptr if a task is already being processed
173 
174  void pushAndWait(tbb::empty_task* iWait,TaskBase*);
175 
176 
177  // ---------- member data --------------------------------
178  tbb::concurrent_queue<TaskBase*> m_tasks;
179  std::atomic<bool> m_taskChosen;
180  std::atomic<unsigned long> m_pauseCount;
181  };
182 
183  template<typename T>
184  void SerialTaskQueue::push(const T& iAction) {
185  QueuedTask<T>* pTask{ new (tbb::task::allocate_root()) QueuedTask<T>{iAction} };
186  pTask->setQueue(this);
187  pushTask(pTask);
188  }
189 
190  template<typename T>
191  void SerialTaskQueue::pushAndWait(const T& iAction) {
192  tbb::empty_task* waitTask = new (tbb::task::allocate_root()) tbb::empty_task;
193  waitTask->set_ref_count(2);
194  QueuedTask<T>* pTask{ new (waitTask->allocate_child()) QueuedTask<T>{iAction} };
195  pTask->setQueue(this);
196  pushAndWait(waitTask,pTask);
197  }
198 
199  template<typename T>
200  tbb::task* SerialTaskQueue::pushAndGetNextTaskToRun(const T& iAction) {
201  QueuedTask<T>* pTask{ new (tbb::task::allocate_root()) QueuedTask<T>{iAction} };
202  pTask->setQueue(this);
203  return pushAndGetNextTask(pTask);
204  }
205 
206  inline
207  tbb::task*
209 
210  template <typename T>
211  tbb::task*
213  try {
214  this->m_action();
215  } catch(...) {}
216  return this->finishedTask();
217  }
218 
219 }
220 
221 #endif
const SerialTaskQueue & operator=(const SerialTaskQueue &)=delete
void setQueue(SerialTaskQueue *iQueue)
tbb::task * finishedTask()
void pushAndWait(const T &iAction)
synchronously pushes functor iAction into queue
std::atomic< unsigned long > m_pauseCount
tbb::task * pushAndGetNextTask(TaskBase *)
void pushTask(TaskBase *)
bool resume()
Resumes processing if the queue was paused.
bool isPaused() const
Checks to see if the queue has been paused.
TaskBase * pickNextTask()
std::atomic< bool > m_taskChosen
void push(const T &iAction)
asynchronously pushes functor iAction into queue
volatile std::atomic< bool > shutdown_flag false
long double T
tbb::concurrent_queue< TaskBase * > m_tasks
bool pause()
Pauses processing of additional tasks from the queue.
tbb::task * pushAndGetNextTaskToRun(const T &iAction)
asynchronously pushes functor iAction into queue and finds next task to execute