CMS 3D CMS Logo

SerialTaskQueue.h
Go to the documentation of this file.
1 #ifndef FWCore_Concurrency_SerialTaskQueue_h
2 #define FWCore_Concurrency_SerialTaskQueue_h
3 // -*- C++ -*-
4 //
5 // Package: Concurrency
6 // Class : SerialTaskQueue
7 //
49 //
50 // Original Author: Chris Jones
51 // Created: Thu Feb 21 11:14:39 CST 2013
52 // $Id$
53 //
54 
55 // system include files
56 #include <atomic>
57 #include <cassert>
58 
59 #include "tbb/task_group.h"
60 #include "tbb/concurrent_queue.h"
62 
63 // user include files
64 
65 // forward declarations
66 namespace edm {
68  public:
70 
72  : m_tasks(std::move(iOther.m_tasks)),
73  m_taskChosen(iOther.m_taskChosen.exchange(false)),
74  m_pauseCount(iOther.m_pauseCount.exchange(0)) {
75  assert(m_tasks.empty() and m_taskChosen == false);
76  }
77  SerialTaskQueue(const SerialTaskQueue&) = delete;
78  const SerialTaskQueue& operator=(const SerialTaskQueue&) = delete;
79 
81 
82  // ---------- const member functions ---------------------
84 
87  bool isPaused() const { return m_pauseCount.load() != 0; }
88 
89  // ---------- member functions ---------------------------
91 
99  bool pause() { return 1 == ++m_pauseCount; }
100 
102 
109  bool resume();
110 
112 
118  template <typename T>
119  void push(tbb::task_group&, const T& iAction);
120 
121  private:
123  class TaskBase {
124  friend class SerialTaskQueue;
125 
126  tbb::task_group* group() { return m_group; }
127  virtual void execute() = 0;
128 
129  public:
130  virtual ~TaskBase() = default;
131 
132  protected:
133  explicit TaskBase(tbb::task_group* iGroup) : m_group(iGroup) {}
134 
135  private:
136  tbb::task_group* m_group;
137  };
138 
139  template <typename T>
140  class QueuedTask : public TaskBase {
141  public:
142  QueuedTask(tbb::task_group& iGroup, const T& iAction) : TaskBase(&iGroup), m_action(iAction) {}
143 
144  private:
145  void execute() final;
146 
148  };
149 
150  friend class TaskBase;
151 
152  void pushTask(TaskBase*);
155  //returns nullptr if a task is already being processed
157 
158  void spawn(TaskBase&);
159 
160  // ---------- member data --------------------------------
161  tbb::concurrent_queue<TaskBase*> m_tasks;
162  std::atomic<bool> m_taskChosen;
163  std::atomic<unsigned long> m_pauseCount;
164  };
165 
166  template <typename T>
167  void SerialTaskQueue::push(tbb::task_group& iGroup, const T& iAction) {
168  QueuedTask<T>* pTask{new QueuedTask<T>{iGroup, iAction}};
169  pushTask(pTask);
170  }
171 
172  template <typename T>
174  // Exception has to swallowed in order to avoid throwing from execute(). The user of SerialTaskQueue should handle exceptions within m_action().
175  CMS_SA_ALLOW try { this->m_action(); } catch (...) {
176  }
177  }
178 
179 } // namespace edm
180 
181 #endif
edm::SerialTaskQueue::spawn
void spawn(TaskBase &)
Definition: SerialTaskQueue.cc:41
edm::SerialTaskQueue::SerialTaskQueue
SerialTaskQueue()
Definition: SerialTaskQueue.h:69
funct::false
false
Definition: Factorize.h:29
edm::SerialTaskQueue::TaskBase::group
tbb::task_group * group()
Definition: SerialTaskQueue.h:126
edm::SerialTaskQueue::TaskBase::m_group
tbb::task_group * m_group
Definition: SerialTaskQueue.h:136
edm
HLT enums.
Definition: AlignableModifier.h:19
edm::SerialTaskQueue::resume
bool resume()
Resumes processing if the queue was paused.
Definition: SerialTaskQueue.cc:58
cms::cuda::assert
assert(be >=bs)
edm::SerialTaskQueue
Definition: SerialTaskQueue.h:67
edm::SerialTaskQueue::push
void push(tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
Definition: SerialTaskQueue.h:167
watchdog.const
const
Definition: watchdog.py:83
edm::SerialTaskQueue::pushAndGetNextTask
TaskBase * pushAndGetNextTask(TaskBase *)
Definition: SerialTaskQueue.cc:76
CMS_SA_ALLOW
#define CMS_SA_ALLOW
Definition: thread_safety_macros.h:5
edm::SerialTaskQueue::finishedTask
TaskBase * finishedTask()
Definition: SerialTaskQueue.cc:85
edm::SerialTaskQueue::TaskBase::~TaskBase
virtual ~TaskBase()=default
edm::SerialTaskQueue::pickNextTask
TaskBase * pickNextTask()
Definition: SerialTaskQueue.cc:90
edm::SerialTaskQueue::~SerialTaskQueue
~SerialTaskQueue()
Definition: SerialTaskQueue.cc:27
edm::SerialTaskQueue::m_pauseCount
std::atomic< unsigned long > m_pauseCount
Definition: SerialTaskQueue.h:163
edm::SerialTaskQueue::TaskBase::TaskBase
TaskBase(tbb::task_group *iGroup)
Definition: SerialTaskQueue.h:133
edm::SerialTaskQueue::pause
bool pause()
Pauses processing of additional tasks from the queue.
Definition: SerialTaskQueue.h:99
edm::SerialTaskQueue::QueuedTask::execute
void execute() final
Definition: SerialTaskQueue.h:173
edm::SerialTaskQueue::TaskBase::execute
virtual void execute()=0
edm::SerialTaskQueue::m_tasks
tbb::concurrent_queue< TaskBase * > m_tasks
Definition: SerialTaskQueue.h:161
thread_safety_macros.h
svgfig.template
def template(fileName, svg, replaceme="REPLACEME")
Definition: svgfig.py:521
edm::SerialTaskQueue::QueuedTask::m_action
T m_action
Definition: SerialTaskQueue.h:147
eostools.move
def move(src, dest)
Definition: eostools.py:511
std
Definition: JetResolutionObject.h:76
edm::SerialTaskQueue::QueuedTask
Definition: SerialTaskQueue.h:140
T
long double T
Definition: Basic3DVectorLD.h:48
edm::SerialTaskQueue::m_taskChosen
std::atomic< bool > m_taskChosen
Definition: SerialTaskQueue.h:162
edm::SerialTaskQueue::SerialTaskQueue
SerialTaskQueue(SerialTaskQueue &&iOther)
Definition: SerialTaskQueue.h:71
edm::SerialTaskQueue::operator=
const SerialTaskQueue & operator=(const SerialTaskQueue &)=delete
edm::SerialTaskQueue::TaskBase
Definition: SerialTaskQueue.h:123
edm::SerialTaskQueue::QueuedTask::QueuedTask
QueuedTask(tbb::task_group &iGroup, const T &iAction)
Definition: SerialTaskQueue.h:142
edm::SerialTaskQueue::pushTask
void pushTask(TaskBase *)
Definition: SerialTaskQueue.cc:69
edm::SerialTaskQueue::isPaused
bool isPaused() const
Checks to see if the queue has been paused.
Definition: SerialTaskQueue.h:87