CMS 3D CMS Logo

SerialTaskQueue.h
Go to the documentation of this file.
1 #ifndef FWCore_Concurrency_SerialTaskQueue_h
2 #define FWCore_Concurrency_SerialTaskQueue_h
3 // -*- C++ -*-
4 //
5 // Package: Concurrency
6 // Class : SerialTaskQueue
7 //
49 //
50 // Original Author: Chris Jones
51 // Created: Thu Feb 21 11:14:39 CST 2013
52 // $Id$
53 //
54 
55 // system include files
56 #include <atomic>
57 #include <cassert>
58 
59 #include "tbb/task.h"
60 #include "tbb/concurrent_queue.h"
61 
62 // user include files
63 
64 // forward declarations
65 namespace edm {
67  public:
69 
71  : m_tasks(std::move(iOther.m_tasks)),
72  m_taskChosen(iOther.m_taskChosen.exchange(false)),
73  m_pauseCount(iOther.m_pauseCount.exchange(0)) {
74  assert(m_tasks.empty() and m_taskChosen == false);
75  }
77 
78  // ---------- const member functions ---------------------
80 
83  bool isPaused() const { return m_pauseCount.load() != 0; }
84 
85  // ---------- member functions ---------------------------
87 
95  bool pause() { return 1 == ++m_pauseCount; }
96 
98 
105  bool resume();
106 
108 
114  template <typename T>
115  void push(const T& iAction);
116 
118 
125  template <typename T>
126  void pushAndWait(const T& iAction);
127 
129 
137  template <typename T>
138  tbb::task* pushAndGetNextTaskToRun(const T& iAction);
139 
140  private:
141  SerialTaskQueue(const SerialTaskQueue&) = delete;
142  const SerialTaskQueue& operator=(const SerialTaskQueue&) = delete;
143 
145  class TaskBase : public tbb::task {
146  friend class SerialTaskQueue;
148 
149  protected:
151 
152  private:
153  void setQueue(SerialTaskQueue* iQueue) { m_queue = iQueue; }
154 
156  };
157 
158  template <typename T>
159  class QueuedTask : public TaskBase {
160  public:
161  QueuedTask(const T& iAction) : m_action(iAction) {}
162 
163  private:
164  tbb::task* execute() override;
165 
167  };
168 
169  friend class TaskBase;
170 
171  void pushTask(TaskBase*);
174  //returns nullptr if a task is already being processed
176 
177  void pushAndWait(tbb::empty_task* iWait, TaskBase*);
178 
179  // ---------- member data --------------------------------
180  tbb::concurrent_queue<TaskBase*> m_tasks;
181  std::atomic<bool> m_taskChosen;
182  std::atomic<unsigned long> m_pauseCount;
183  };
184 
185  template <typename T>
186  void SerialTaskQueue::push(const T& iAction) {
187  QueuedTask<T>* pTask{new (tbb::task::allocate_root()) QueuedTask<T>{iAction}};
188  pTask->setQueue(this);
189  pushTask(pTask);
190  }
191 
192  template <typename T>
193  void SerialTaskQueue::pushAndWait(const T& iAction) {
194  tbb::empty_task* waitTask = new (tbb::task::allocate_root()) tbb::empty_task;
195  waitTask->set_ref_count(2);
196  QueuedTask<T>* pTask{new (waitTask->allocate_child()) QueuedTask<T>{iAction}};
197  pTask->setQueue(this);
198  pushAndWait(waitTask, pTask);
199  }
200 
201  template <typename T>
203  QueuedTask<T>* pTask{new (tbb::task::allocate_root()) QueuedTask<T>{iAction}};
204  pTask->setQueue(this);
205  return pushAndGetNextTask(pTask);
206  }
207 
209 
210  template <typename T>
212  try {
213  this->m_action();
214  } catch (...) {
215  }
216  return this->finishedTask();
217  }
218 
219 } // namespace edm
220 
221 #endif
const SerialTaskQueue & operator=(const SerialTaskQueue &)=delete
void setQueue(SerialTaskQueue *iQueue)
tbb::task * finishedTask()
void pushAndWait(const T &iAction)
synchronously pushes functor iAction into queue
std::atomic< unsigned long > m_pauseCount
tbb::task * pushAndGetNextTask(TaskBase *)
#define nullptr
void pushTask(TaskBase *)
bool resume()
Resumes processing if the queue was paused.
bool isPaused() const
Checks to see if the queue has been paused.
SerialTaskQueue(SerialTaskQueue &&iOther)
tbb::task * execute() override
TaskBase * pickNextTask()
std::atomic< bool > m_taskChosen
HLT enums.
void push(const T &iAction)
asynchronously pushes functor iAction into queue
long double T
tbb::concurrent_queue< TaskBase * > m_tasks
bool pause()
Pauses processing of additional tasks from the queue.
def move(src, dest)
Definition: eostools.py:511
tbb::task * pushAndGetNextTaskToRun(const T &iAction)
asynchronously pushes functor iAction into queue and finds next task to execute