CMS 3D CMS Logo

SerialTaskQueue.h
Go to the documentation of this file.
1 #ifndef FWCore_Concurrency_SerialTaskQueue_h
2 #define FWCore_Concurrency_SerialTaskQueue_h
3 // -*- C++ -*-
4 //
5 // Package: Concurrency
6 // Class : SerialTaskQueue
7 //
49 //
50 // Original Author: Chris Jones
51 // Created: Thu Feb 21 11:14:39 CST 2013
52 // $Id$
53 //
54 
55 // system include files
56 #include <atomic>
57 #include <cassert>
58 
59 #include "oneapi/tbb/task_group.h"
60 #include "oneapi/tbb/concurrent_queue.h"
62 
63 // user include files
64 
65 // forward declarations
66 namespace edm {
68  public:
70 
72  : m_tasks(std::move(iOther.m_tasks)),
73  m_taskChosen(iOther.m_taskChosen.exchange(false)),
74  m_pauseCount(iOther.m_pauseCount.exchange(0)) {
75  assert(m_tasks.empty() and m_taskChosen == false);
76  }
77  SerialTaskQueue(const SerialTaskQueue&) = delete;
78  const SerialTaskQueue& operator=(const SerialTaskQueue&) = delete;
79 
81 
82  // ---------- const member functions ---------------------
84 
87  bool isPaused() const { return m_pauseCount.load() != 0; }
88 
89  // ---------- member functions ---------------------------
91 
99  bool pause() { return 1 == ++m_pauseCount; }
100 
102 
109  bool resume();
110 
112 
118  template <typename T>
119  void push(oneapi::tbb::task_group&, const T& iAction);
120 
121  private:
123  class TaskBase {
124  friend class SerialTaskQueue;
125 
126  oneapi::tbb::task_group* group() { return m_group; }
127  virtual void execute() = 0;
128 
129  public:
130  virtual ~TaskBase() = default;
131 
132  protected:
133  explicit TaskBase(oneapi::tbb::task_group* iGroup) : m_group(iGroup) {}
134 
135  private:
136  oneapi::tbb::task_group* m_group;
137  };
138 
139  template <typename T>
140  class QueuedTask : public TaskBase {
141  public:
142  QueuedTask(oneapi::tbb::task_group& iGroup, const T& iAction) : TaskBase(&iGroup), m_action(iAction) {}
143 
144  private:
145  void execute() final;
146 
148  };
149 
151 
152  void pushTask(TaskBase*);
155  //returns nullptr if a task is already being processed
157 
158  void spawn(TaskBase&);
159 
160  // ---------- member data --------------------------------
161  oneapi::tbb::concurrent_queue<TaskBase*> m_tasks;
162  std::atomic<bool> m_taskChosen;
163  std::atomic<unsigned long> m_pauseCount;
164  };
165 
166  template <typename T>
167  void SerialTaskQueue::push(oneapi::tbb::task_group& iGroup, const T& iAction) {
168  QueuedTask<T>* pTask{new QueuedTask<T>{iGroup, iAction}};
169  pushTask(pTask);
170  }
171 
172  template <typename T>
174  // Exception has to swallowed in order to avoid throwing from execute(). The user of SerialTaskQueue should handle exceptions within m_action().
175  CMS_SA_ALLOW try { this->m_action(); } catch (...) {
176  }
177  }
178 
179 } // namespace edm
180 
181 #endif
TaskBase * pushAndGetNextTask(TaskBase *)
const SerialTaskQueue & operator=(const SerialTaskQueue &)=delete
#define CMS_SA_ALLOW
std::atomic< unsigned long > m_pauseCount
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
void pushTask(TaskBase *)
assert(be >=bs)
bool resume()
Resumes processing if the queue was paused.
QueuedTask(oneapi::tbb::task_group &iGroup, const T &iAction)
TaskBase * finishedTask()
bool isPaused() const
Checks to see if the queue has been paused.
def template(fileName, svg, replaceme="REPLACEME")
Definition: svgfig.py:521
SerialTaskQueue(SerialTaskQueue &&iOther)
TaskBase * pickNextTask()
std::atomic< bool > m_taskChosen
oneapi::tbb::concurrent_queue< TaskBase * > m_tasks
void spawn(TaskBase &)
HLT enums.
oneapi::tbb::task_group * m_group
long double T
oneapi::tbb::task_group * group()
TaskBase(oneapi::tbb::task_group *iGroup)
bool pause()
Pauses processing of additional tasks from the queue.
def move(src, dest)
Definition: eostools.py:511