CMS 3D CMS Logo

SerialTaskQueue.h
Go to the documentation of this file.
1 #ifndef FWCore_Concurrency_SerialTaskQueue_h
2 #define FWCore_Concurrency_SerialTaskQueue_h
3 // -*- C++ -*-
4 //
5 // Package: Concurrency
6 // Class : SerialTaskQueue
7 //
49 //
50 // Original Author: Chris Jones
51 // Created: Thu Feb 21 11:14:39 CST 2013
52 // $Id$
53 //
54 
55 // system include files
56 #include <atomic>
57 #include <cassert>
58 
59 #include "tbb/task.h"
60 #include "tbb/concurrent_queue.h"
62 
63 // user include files
64 
65 // forward declarations
66 namespace edm {
68  public:
70 
72  : m_tasks(std::move(iOther.m_tasks)),
73  m_taskChosen(iOther.m_taskChosen.exchange(false)),
74  m_pauseCount(iOther.m_pauseCount.exchange(0)) {
75  assert(m_tasks.empty() and m_taskChosen == false);
76  }
77  SerialTaskQueue(const SerialTaskQueue&) = delete;
78  const SerialTaskQueue& operator=(const SerialTaskQueue&) = delete;
79 
81 
82  // ---------- const member functions ---------------------
84 
87  bool isPaused() const { return m_pauseCount.load() != 0; }
88 
89  // ---------- member functions ---------------------------
91 
99  bool pause() { return 1 == ++m_pauseCount; }
100 
102 
109  bool resume();
110 
112 
118  template <typename T>
119  void push(const T& iAction);
120 
122 
129  template <typename T>
130  void pushAndWait(const T& iAction);
131 
133 
141  template <typename T>
142  tbb::task* pushAndGetNextTaskToRun(const T& iAction);
143 
144  private:
146  class TaskBase : public tbb::task {
147  friend class SerialTaskQueue;
148  TaskBase() : m_queue(nullptr) {}
149 
150  protected:
152 
153  private:
154  void setQueue(SerialTaskQueue* iQueue) { m_queue = iQueue; }
155 
157  };
158 
159  template <typename T>
160  class QueuedTask : public TaskBase {
161  public:
162  QueuedTask(const T& iAction) : m_action(iAction) {}
163 
164  private:
165  tbb::task* execute() override;
166 
168  };
169 
170  friend class TaskBase;
171 
172  void pushTask(TaskBase*);
175  //returns nullptr if a task is already being processed
177 
178  void pushAndWait(tbb::empty_task* iWait, TaskBase*);
179 
180  // ---------- member data --------------------------------
181  tbb::concurrent_queue<TaskBase*> m_tasks;
182  std::atomic<bool> m_taskChosen;
183  std::atomic<unsigned long> m_pauseCount;
184  };
185 
186  template <typename T>
187  void SerialTaskQueue::push(const T& iAction) {
188  QueuedTask<T>* pTask{new (tbb::task::allocate_root()) QueuedTask<T>{iAction}};
189  pTask->setQueue(this);
190  pushTask(pTask);
191  }
192 
193  template <typename T>
194  void SerialTaskQueue::pushAndWait(const T& iAction) {
195  tbb::empty_task* waitTask = new (tbb::task::allocate_root()) tbb::empty_task;
196  waitTask->set_ref_count(2);
197  QueuedTask<T>* pTask{new (waitTask->allocate_child()) QueuedTask<T>{iAction}};
198  pTask->setQueue(this);
199  pushAndWait(waitTask, pTask);
200  }
201 
202  template <typename T>
204  QueuedTask<T>* pTask{new (tbb::task::allocate_root()) QueuedTask<T>{iAction}};
205  pTask->setQueue(this);
206  return pushAndGetNextTask(pTask);
207  }
208 
210 
211  template <typename T>
213  // Exception has to swallowed in order to avoid throwing from execute(). The user of SerialTaskQueue should handle exceptions within m_action().
214  CMS_SA_ALLOW try { this->m_action(); } catch (...) {
215  }
216  return this->finishedTask();
217  }
218 
219 } // namespace edm
220 
221 #endif
edm::SerialTaskQueue::SerialTaskQueue
SerialTaskQueue()
Definition: SerialTaskQueue.h:69
edm::SerialTaskQueue::QueuedTask::QueuedTask
QueuedTask(const T &iAction)
Definition: SerialTaskQueue.h:162
funct::false
false
Definition: Factorize.h:29
edm
HLT enums.
Definition: AlignableModifier.h:19
edm::SerialTaskQueue::resume
bool resume()
Resumes processing if the queue was paused.
Definition: SerialTaskQueue.cc:35
cms::cuda::assert
assert(be >=bs)
edm::SerialTaskQueue
Definition: SerialTaskQueue.h:67
CMS_SA_ALLOW
#define CMS_SA_ALLOW
Definition: thread_safety_macros.h:5
edm::SerialTaskQueue::pickNextTask
TaskBase * pickNextTask()
Definition: SerialTaskQueue.cc:68
edm::SerialTaskQueue::~SerialTaskQueue
~SerialTaskQueue()
Definition: SerialTaskQueue.cc:26
edm::SerialTaskQueue::m_pauseCount
std::atomic< unsigned long > m_pauseCount
Definition: SerialTaskQueue.h:183
edm::SerialTaskQueue::push
void push(const T &iAction)
asynchronously pushes functor iAction into queue
Definition: SerialTaskQueue.h:187
TrackValidation_cff.task
task
Definition: TrackValidation_cff.py:252
edm::SerialTaskQueue::finishedTask
tbb::task * finishedTask()
Definition: SerialTaskQueue.cc:63
edm::SerialTaskQueue::TaskBase::TaskBase
TaskBase()
Definition: SerialTaskQueue.h:148
edm::SerialTaskQueue::pushAndGetNextTaskToRun
tbb::task * pushAndGetNextTaskToRun(const T &iAction)
asynchronously pushes functor iAction into queue and finds next task to execute
Definition: SerialTaskQueue.h:203
edm::SerialTaskQueue::pause
bool pause()
Pauses processing of additional tasks from the queue.
Definition: SerialTaskQueue.h:99
edm::SerialTaskQueue::TaskBase::setQueue
void setQueue(SerialTaskQueue *iQueue)
Definition: SerialTaskQueue.h:154
edm::SerialTaskQueue::pushAndWait
void pushAndWait(const T &iAction)
synchronously pushes functor iAction into queue
Definition: SerialTaskQueue.h:194
edm::SerialTaskQueue::pushAndGetNextTask
tbb::task * pushAndGetNextTask(TaskBase *)
Definition: SerialTaskQueue.cc:53
edm::SerialTaskQueue::TaskBase::m_queue
SerialTaskQueue * m_queue
Definition: SerialTaskQueue.h:156
edm::SerialTaskQueue::m_tasks
tbb::concurrent_queue< TaskBase * > m_tasks
Definition: SerialTaskQueue.h:181
thread_safety_macros.h
edm::SerialTaskQueue::QueuedTask::execute
tbb::task * execute() override
Definition: SerialTaskQueue.h:212
edm::SerialTaskQueue::QueuedTask::m_action
T m_action
Definition: SerialTaskQueue.h:167
eostools.move
def move(src, dest)
Definition: eostools.py:511
std
Definition: JetResolutionObject.h:76
edm::SerialTaskQueue::QueuedTask
Definition: SerialTaskQueue.h:160
T
long double T
Definition: Basic3DVectorLD.h:48
edm::SerialTaskQueue::m_taskChosen
std::atomic< bool > m_taskChosen
Definition: SerialTaskQueue.h:182
edm::SerialTaskQueue::SerialTaskQueue
SerialTaskQueue(SerialTaskQueue &&iOther)
Definition: SerialTaskQueue.h:71
edm::SerialTaskQueue::operator=
const SerialTaskQueue & operator=(const SerialTaskQueue &)=delete
edm::SerialTaskQueue::TaskBase
Definition: SerialTaskQueue.h:146
edm::SerialTaskQueue::pushTask
void pushTask(TaskBase *)
Definition: SerialTaskQueue.cc:46
edm::SerialTaskQueue::TaskBase::finishedTask
tbb::task * finishedTask()
Definition: SerialTaskQueue.h:209
edm::SerialTaskQueue::isPaused
bool isPaused() const
Checks to see if the queue has been paused.
Definition: SerialTaskQueue.h:87