CMS 3D CMS Logo

SerialTaskQueue.h
Go to the documentation of this file.
1 #ifndef FWCore_Concurrency_SerialTaskQueue_h
2 #define FWCore_Concurrency_SerialTaskQueue_h
3 // -*- C++ -*-
4 //
5 // Package: Concurrency
6 // Class : SerialTaskQueue
7 //
49 //
50 // Original Author: Chris Jones
51 // Created: Thu Feb 21 11:14:39 CST 2013
52 // $Id$
53 //
54 
55 // system include files
56 #include <atomic>
57 
58 #include "tbb/task.h"
59 #include "tbb/concurrent_queue.h"
60 
61 // user include files
62 
63 // forward declarations
64 namespace edm {
66  {
67  public:
70  m_pauseCount{0}
71  { }
72 
74 
75  // ---------- const member functions ---------------------
77 
80  bool isPaused() const { return m_pauseCount.load()==0;}
81 
82  // ---------- member functions ---------------------------
84 
92  bool pause() {
93  return 1 == ++m_pauseCount;
94  }
95 
97 
104  bool resume();
105 
107 
113  template<typename T>
114  void push(const T& iAction);
115 
117 
124  template<typename T>
125  void pushAndWait(const T& iAction);
126 
128 
136  template<typename T>
137  tbb::task* pushAndGetNextTaskToRun(const T& iAction);
138 
139  private:
140  SerialTaskQueue(const SerialTaskQueue&) = delete;
141  const SerialTaskQueue& operator=(const SerialTaskQueue&) = delete;
142 
144  class TaskBase : public tbb::task {
145  friend class SerialTaskQueue;
146  TaskBase(): m_queue(0) {}
147 
148  protected:
149  tbb::task* finishedTask();
150  private:
151  void setQueue(SerialTaskQueue* iQueue) { m_queue = iQueue;}
152 
154  };
155 
156  template< typename T>
157  class QueuedTask : public TaskBase {
158  public:
159  QueuedTask( const T& iAction):
160  m_action(iAction) {}
161 
162  private:
163  tbb::task* execute();
164 
166  };
167 
168  friend class TaskBase;
169 
170  void pushTask(TaskBase*);
171  tbb::task* pushAndGetNextTask(TaskBase*);
172  tbb::task* finishedTask();
173  //returns nullptr if a task is already being processed
175 
176  void pushAndWait(tbb::empty_task* iWait,TaskBase*);
177 
178 
179  // ---------- member data --------------------------------
180  tbb::concurrent_queue<TaskBase*> m_tasks;
181  std::atomic<bool> m_taskChosen;
182  std::atomic<unsigned long> m_pauseCount;
183  };
184 
185  template<typename T>
186  void SerialTaskQueue::push(const T& iAction) {
187  QueuedTask<T>* pTask{ new (tbb::task::allocate_root()) QueuedTask<T>{iAction} };
188  pTask->setQueue(this);
189  pushTask(pTask);
190  }
191 
192  template<typename T>
193  void SerialTaskQueue::pushAndWait(const T& iAction) {
194  tbb::empty_task* waitTask = new (tbb::task::allocate_root()) tbb::empty_task;
195  waitTask->set_ref_count(2);
196  QueuedTask<T>* pTask{ new (waitTask->allocate_child()) QueuedTask<T>{iAction} };
197  pTask->setQueue(this);
198  pushAndWait(waitTask,pTask);
199  }
200 
201  template<typename T>
202  tbb::task* SerialTaskQueue::pushAndGetNextTaskToRun(const T& iAction) {
203  QueuedTask<T>* pTask{ new (tbb::task::allocate_root()) QueuedTask<T>{iAction} };
204  pTask->setQueue(this);
205  return pushAndGetNextTask(pTask);
206  }
207 
208  inline
209  tbb::task*
211 
212  template <typename T>
213  tbb::task*
215  try {
216  this->m_action();
217  } catch(...) {}
218  return this->finishedTask();
219  }
220 
221 }
222 
223 #endif
const SerialTaskQueue & operator=(const SerialTaskQueue &)=delete
void setQueue(SerialTaskQueue *iQueue)
tbb::task * finishedTask()
void pushAndWait(const T &iAction)
synchronously pushes functor iAction into queue
std::atomic< unsigned long > m_pauseCount
tbb::task * pushAndGetNextTask(TaskBase *)
void pushTask(TaskBase *)
bool resume()
Resumes processing if the queue was paused.
bool isPaused() const
Checks to see if the queue has been paused.
TaskBase * pickNextTask()
std::atomic< bool > m_taskChosen
HLT enums.
void push(const T &iAction)
asynchronously pushes functor iAction into queue
long double T
tbb::concurrent_queue< TaskBase * > m_tasks
bool pause()
Pauses processing of additional tasks from the queue.
tbb::task * pushAndGetNextTaskToRun(const T &iAction)
asynchronously pushes functor iAction into queue and finds next task to execute