CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Classes | Public Member Functions | Private Member Functions | Private Attributes | Friends
edm::SerialTaskQueue Class Reference

#include <SerialTaskQueue.h>

Classes

class  QueuedTask
 
class  TaskBase
 

Public Member Functions

bool isPaused () const
 Checks to see if the queue has been paused. More...
 
bool pause ()
 Pauses processing of additional tasks from the queue. More...
 
template<typename T >
void push (const T &iAction)
 asynchronously pushes functor iAction into queue More...
 
template<typename T >
tbb::task * pushAndGetNextTaskToRun (const T &iAction)
 asynchronously pushes functor iAction into queue and finds next task to execute More...
 
template<typename T >
void pushAndWait (const T &iAction)
 synchronously pushes functor iAction into queue More...
 
bool resume ()
 Resumes processing if the queue was paused. More...
 
 SerialTaskQueue ()
 

Private Member Functions

tbb::task * finishedTask ()
 
const SerialTaskQueueoperator= (const SerialTaskQueue &)=delete
 
TaskBasepickNextTask ()
 
tbb::task * pushAndGetNextTask (TaskBase *)
 
void pushAndWait (tbb::empty_task *iWait, TaskBase *)
 
void pushTask (TaskBase *)
 
 SerialTaskQueue (const SerialTaskQueue &)=delete
 

Private Attributes

std::atomic< unsigned long > m_pauseCount
 
std::atomic_flag m_taskChosen
 
tbb::concurrent_queue< TaskBase * > m_tasks
 

Friends

class TaskBase
 

Detailed Description

Definition at line 65 of file SerialTaskQueue.h.

Constructor & Destructor Documentation

edm::SerialTaskQueue::SerialTaskQueue ( )
inline

Definition at line 68 of file SerialTaskQueue.h.

68  :
69  m_taskChosen{ATOMIC_FLAG_INIT},
70  m_pauseCount{0}
71  { }
std::atomic< unsigned long > m_pauseCount
std::atomic_flag m_taskChosen
edm::SerialTaskQueue::SerialTaskQueue ( const SerialTaskQueue )
privatedelete

Member Function Documentation

tbb::task * SerialTaskQueue::finishedTask ( )
private

Definition at line 58 of file SerialTaskQueue.cc.

References m_taskChosen, and pickNextTask().

Referenced by edm::SerialTaskQueue::QueuedTask< T >::execute(), and edm::SerialTaskQueue::TaskBase::finishedTask().

58  {
59  m_taskChosen.clear();
60  return pickNextTask();
61 }
std::atomic_flag m_taskChosen
TaskBase * pickNextTask()
bool edm::SerialTaskQueue::isPaused ( ) const
inline

Checks to see if the queue has been paused.

Returns
true if the queue is paused
See Also
pause(), resume()

Definition at line 78 of file SerialTaskQueue.h.

References m_pauseCount.

78 { return m_pauseCount.load()==0;}
std::atomic< unsigned long > m_pauseCount
const SerialTaskQueue& edm::SerialTaskQueue::operator= ( const SerialTaskQueue )
privatedelete
bool edm::SerialTaskQueue::pause ( )
inline

Pauses processing of additional tasks from the queue.

Any task already running will not be paused however once that running task finishes no further tasks will be started. Multiple calls to pause() are allowed, however each call to pause() must be balanced by a call to resume().

Returns
false if queue was already paused.
See Also
resume(), isPaused()

Definition at line 90 of file SerialTaskQueue.h.

References m_pauseCount.

90  {
91  return 1 == ++m_pauseCount;
92  }
std::atomic< unsigned long > m_pauseCount
SerialTaskQueue::TaskBase * SerialTaskQueue::pickNextTask ( )
private

Definition at line 64 of file SerialTaskQueue.cc.

References likely, m_pauseCount, m_taskChosen, m_tasks, and edmStreamStallGrapher::t.

Referenced by finishedTask(), pushAndGetNextTask(), and resume().

64  {
65 
66  if likely(0 == m_pauseCount and not m_taskChosen.test_and_set()) {
67  TaskBase* t=0;
68  if likely(m_tasks.try_pop(t)) {
69  return t;
70  }
71  //no task was actually pulled
72  m_taskChosen.clear();
73 
74  //was a new entry added after we called 'try_pop' but before we did the clear?
75  if(not m_tasks.empty() and not m_taskChosen.test_and_set()) {
76  TaskBase* t=0;
77  if(m_tasks.try_pop(t)) {
78  return t;
79  }
80  //no task was still pulled since a different thread beat us to it
81  m_taskChosen.clear();
82 
83  }
84  }
85  return 0;
86 }
std::atomic< unsigned long > m_pauseCount
std::atomic_flag m_taskChosen
#define likely(x)
Definition: Likely.h:20
tbb::concurrent_queue< TaskBase * > m_tasks
template<typename T >
void SerialTaskQueue::push ( const T iAction)

asynchronously pushes functor iAction into queue

The function will return immediately and iAction will either process concurrently with the calling thread or wait until the protected resource becomes available or until a CPU becomes available.

Parameters
[in]iActionMust be a functor that takes no arguments and return no values.

Definition at line 184 of file SerialTaskQueue.h.

References pushTask().

184  {
185  QueuedTask<T>* pTask{ new (tbb::task::allocate_root()) QueuedTask<T>{iAction} };
186  pTask->setQueue(this);
187  pushTask(pTask);
188  }
void pushTask(TaskBase *)
tbb::task * SerialTaskQueue::pushAndGetNextTask ( TaskBase iTask)
private

Definition at line 47 of file SerialTaskQueue.cc.

References likely, m_tasks, and pickNextTask().

Referenced by pushAndGetNextTaskToRun(), pushAndWait(), and pushTask().

47  {
48  tbb::task* returnValue{0};
49  if likely(0!=iTask) {
50  m_tasks.push(iTask);
51  returnValue = pickNextTask();
52  }
53  return returnValue;
54 }
TaskBase * pickNextTask()
#define likely(x)
Definition: Likely.h:20
tbb::concurrent_queue< TaskBase * > m_tasks
template<typename T >
tbb::task * SerialTaskQueue::pushAndGetNextTaskToRun ( const T iAction)

asynchronously pushes functor iAction into queue and finds next task to execute

This function is useful if you are accessing the SerialTaskQueue for the execute() method of a TBB task and want to efficiently schedule the next task from the queue. In that case you can take the return value and return it directly from your execute() method. The function will return immediately and not wait for iAction to run.

Parameters
[in]iActionMust be a functor that takes no arguments and return no values.
Returns
Returns either the next task that the user must schedule with TBB or a nullptr.

Definition at line 200 of file SerialTaskQueue.h.

References pushAndGetNextTask().

200  {
201  QueuedTask<T>* pTask{ new (tbb::task::allocate_root()) QueuedTask<T>{iAction} };
202  pTask->setQueue(this);
203  return pushAndGetNextTask(pTask);
204  }
tbb::task * pushAndGetNextTask(TaskBase *)
template<typename T >
void SerialTaskQueue::pushAndWait ( const T iAction)

synchronously pushes functor iAction into queue

The function will wait until iAction has completed before returning. If another task is already running on the queue, the system is allowed to find another TBB task to execute while waiting for the iAction to finish. In that way the core is not idled while waiting.

Parameters
[in]iActionMust be a functor that takes no arguments and return no values.

Definition at line 191 of file SerialTaskQueue.h.

191  {
192  tbb::empty_task* waitTask = new (tbb::task::allocate_root()) tbb::empty_task;
193  waitTask->set_ref_count(2);
194  QueuedTask<T>* pTask{ new (waitTask->allocate_child()) QueuedTask<T>{iAction} };
195  pTask->setQueue(this);
196  pushAndWait(waitTask,pTask);
197  }
void pushAndWait(const T &iAction)
synchronously pushes functor iAction into queue
void SerialTaskQueue::pushAndWait ( tbb::empty_task *  iWait,
TaskBase iTask 
)
private

Definition at line 88 of file SerialTaskQueue.cc.

References pyrootRender::destroy(), likely, and pushAndGetNextTask().

88  {
89  auto nextTask = pushAndGetNextTask(iTask);
90  if likely(nullptr != nextTask) {
91  if likely(nextTask == iTask) {
92  //spawn and wait for all requires the task to have its parent set
93  iWait->spawn_and_wait_for_all(*nextTask);
94  } else {
95  tbb::task::spawn(*nextTask);
96  iWait->wait_for_all();
97  }
98  } else {
99  //a task must already be running in this queue
100  iWait->wait_for_all();
101  }
102  tbb::task::destroy(*iWait);
103 }
tbb::task * pushAndGetNextTask(TaskBase *)
#define likely(x)
Definition: Likely.h:20
void SerialTaskQueue::pushTask ( TaskBase iTask)
private

Definition at line 39 of file SerialTaskQueue.cc.

References pushAndGetNextTask(), and edmStreamStallGrapher::t.

Referenced by push().

39  {
40  tbb::task* t = pushAndGetNextTask(iTask);
41  if(0!=t) {
42  tbb::task::spawn(*t);
43  }
44 }
tbb::task * pushAndGetNextTask(TaskBase *)
bool SerialTaskQueue::resume ( )

Resumes processing if the queue was paused.

Multiple calls to resume() are allowed if there were multiple calls to pause(). Only when we reach as many resume() calls as pause() calls will the queue restart.

Returns
true if the call really restarts the queue
See Also
pause(), isPaused()

Definition at line 27 of file SerialTaskQueue.cc.

References m_pauseCount, pickNextTask(), and edmStreamStallGrapher::t.

27  {
28  if(0==--m_pauseCount) {
29  tbb::task* t = pickNextTask();
30  if(0 != t) {
31  tbb::task::spawn(*t);
32  }
33  return true;
34  }
35  return false;
36 }
std::atomic< unsigned long > m_pauseCount
TaskBase * pickNextTask()

Friends And Related Function Documentation

friend class TaskBase
friend

Definition at line 166 of file SerialTaskQueue.h.

Member Data Documentation

std::atomic<unsigned long> edm::SerialTaskQueue::m_pauseCount
private

Definition at line 180 of file SerialTaskQueue.h.

Referenced by isPaused(), pause(), pickNextTask(), and resume().

std::atomic_flag edm::SerialTaskQueue::m_taskChosen
private

Definition at line 179 of file SerialTaskQueue.h.

Referenced by finishedTask(), and pickNextTask().

tbb::concurrent_queue<TaskBase*> edm::SerialTaskQueue::m_tasks
private

Definition at line 178 of file SerialTaskQueue.h.

Referenced by pickNextTask(), and pushAndGetNextTask().