CMS 3D CMS Logo

List of all members | Classes | Public Member Functions | Private Member Functions | Private Attributes | Friends
edm::SerialTaskQueue Class Reference

#include <SerialTaskQueue.h>

Classes

class  QueuedTask
 
class  TaskBase
 

Public Member Functions

bool isPaused () const
 Checks to see if the queue has been paused. More...
 
bool pause ()
 Pauses processing of additional tasks from the queue. More...
 
template<typename T >
void push (const T &iAction)
 asynchronously pushes functor iAction into queue More...
 
template<typename T >
tbb::task * pushAndGetNextTaskToRun (const T &iAction)
 asynchronously pushes functor iAction into queue and finds next task to execute More...
 
template<typename T >
void pushAndWait (const T &iAction)
 synchronously pushes functor iAction into queue More...
 
bool resume ()
 Resumes processing if the queue was paused. More...
 
 SerialTaskQueue ()
 
 SerialTaskQueue (SerialTaskQueue &&iOther)
 
 ~SerialTaskQueue ()
 

Private Member Functions

tbb::task * finishedTask ()
 
const SerialTaskQueueoperator= (const SerialTaskQueue &)=delete
 
TaskBasepickNextTask ()
 
tbb::task * pushAndGetNextTask (TaskBase *)
 
void pushAndWait (tbb::empty_task *iWait, TaskBase *)
 
void pushTask (TaskBase *)
 
 SerialTaskQueue (const SerialTaskQueue &)=delete
 

Private Attributes

std::atomic< unsigned long > m_pauseCount
 
std::atomic< bool > m_taskChosen
 
tbb::concurrent_queue< TaskBase * > m_tasks
 

Friends

class TaskBase
 

Detailed Description

Definition at line 67 of file SerialTaskQueue.h.

Constructor & Destructor Documentation

◆ SerialTaskQueue() [1/3]

edm::SerialTaskQueue::SerialTaskQueue ( )
inline

Definition at line 69 of file SerialTaskQueue.h.

69 : m_taskChosen(false), m_pauseCount{0} {}

◆ SerialTaskQueue() [2/3]

edm::SerialTaskQueue::SerialTaskQueue ( SerialTaskQueue &&  iOther)
inline

Definition at line 71 of file SerialTaskQueue.h.

72  : m_tasks(std::move(iOther.m_tasks)),
73  m_taskChosen(iOther.m_taskChosen.exchange(false)),
74  m_pauseCount(iOther.m_pauseCount.exchange(0)) {
75  assert(m_tasks.empty() and m_taskChosen == false);
76  }

References cms::cuda::assert(), m_taskChosen, and m_tasks.

◆ ~SerialTaskQueue()

SerialTaskQueue::~SerialTaskQueue ( )

Definition at line 26 of file SerialTaskQueue.cc.

26  {
27  //be certain all tasks have completed
28  bool isEmpty = m_tasks.empty();
29  bool isTaskChosen = m_taskChosen;
30  if ((not isEmpty and not isPaused()) or isTaskChosen) {
31  pushAndWait([]() { return; });
32  }
33 }

References isPaused(), m_taskChosen, m_tasks, or, and pushAndWait().

◆ SerialTaskQueue() [3/3]

edm::SerialTaskQueue::SerialTaskQueue ( const SerialTaskQueue )
privatedelete

Member Function Documentation

◆ finishedTask()

tbb::task * SerialTaskQueue::finishedTask ( )
private

Definition at line 63 of file SerialTaskQueue.cc.

63  {
64  m_taskChosen.store(false);
65  return pickNextTask();
66 }

References m_taskChosen, and pickNextTask().

Referenced by edm::SerialTaskQueue::QueuedTask< T >::execute(), and edm::SerialTaskQueue::TaskBase::finishedTask().

◆ isPaused()

bool edm::SerialTaskQueue::isPaused ( ) const
inline

Checks to see if the queue has been paused.

Returns
true if the queue is paused
See also
pause(), resume()

Definition at line 84 of file SerialTaskQueue.h.

84 { return m_pauseCount.load() != 0; }

References m_pauseCount.

Referenced by ~SerialTaskQueue().

◆ operator=()

const SerialTaskQueue& edm::SerialTaskQueue::operator= ( const SerialTaskQueue )
privatedelete

◆ pause()

bool edm::SerialTaskQueue::pause ( )
inline

Pauses processing of additional tasks from the queue.

Any task already running will not be paused however once that running task finishes no further tasks will be started. Multiple calls to pause() are allowed, however each call to pause() must be balanced by a call to resume().

Returns
false if queue was already paused.
See also
resume(), isPaused()

Definition at line 96 of file SerialTaskQueue.h.

96 { return 1 == ++m_pauseCount; }

References m_pauseCount.

Referenced by edm::EventProcessor::beginLumiAsync(), and edm::LimitedTaskQueue::Resumer::Resumer().

◆ pickNextTask()

SerialTaskQueue::TaskBase * SerialTaskQueue::pickNextTask ( )
private

Definition at line 68 of file SerialTaskQueue.cc.

68  {
69  bool expect = false;
70  if
71  LIKELY(0 == m_pauseCount and m_taskChosen.compare_exchange_strong(expect, true)) {
72  TaskBase* t = nullptr;
73  if
74  LIKELY(m_tasks.try_pop(t)) { return t; }
75  //no task was actually pulled
76  m_taskChosen.store(false);
77 
78  //was a new entry added after we called 'try_pop' but before we did the clear?
79  expect = false;
80  if (not m_tasks.empty() and m_taskChosen.compare_exchange_strong(expect, true)) {
81  t = nullptr;
82  if (m_tasks.try_pop(t)) {
83  return t;
84  }
85  //no task was still pulled since a different thread beat us to it
86  m_taskChosen.store(false);
87  }
88  }
89  return nullptr;
90 }

References LIKELY, m_pauseCount, m_taskChosen, m_tasks, and OrderedSet::t.

Referenced by finishedTask(), pushAndGetNextTask(), and resume().

◆ push()

template<typename T >
void SerialTaskQueue::push ( const T iAction)

asynchronously pushes functor iAction into queue

The function will return immediately and iAction will either process concurrently with the calling thread or wait until the protected resource becomes available or until a CPU becomes available.

Parameters
[in]iActionMust be a functor that takes no arguments and return no values.

Definition at line 187 of file SerialTaskQueue.h.

187  {
188  QueuedTask<T>* pTask{new (tbb::task::allocate_root()) QueuedTask<T>{iAction}};
189  pTask->setQueue(this);
190  pushTask(pTask);
191  }

References pushTask(), and edm::SerialTaskQueue::TaskBase::setQueue().

Referenced by edm::EventProcessor::beginLumiAsync().

◆ pushAndGetNextTask()

tbb::task * SerialTaskQueue::pushAndGetNextTask ( TaskBase iTask)
private

Definition at line 53 of file SerialTaskQueue.cc.

53  {
54  tbb::task* returnValue{nullptr};
55  if
56  LIKELY(nullptr != iTask) {
57  m_tasks.push(iTask);
58  returnValue = pickNextTask();
59  }
60  return returnValue;
61 }

References LIKELY, m_tasks, pickNextTask(), and TrackValidation_cff::task.

Referenced by pushAndGetNextTaskToRun(), pushAndWait(), and pushTask().

◆ pushAndGetNextTaskToRun()

template<typename T >
tbb::task * SerialTaskQueue::pushAndGetNextTaskToRun ( const T iAction)

asynchronously pushes functor iAction into queue and finds next task to execute

This function is useful if you are accessing the SerialTaskQueue for the execute() method of a TBB task and want to efficiently schedule the next task from the queue. In that case you can take the return value and return it directly from your execute() method. The function will return immediately and not wait for iAction to run.

Parameters
[in]iActionMust be a functor that takes no arguments and return no values.
Returns
Returns either the next task that the user must schedule with TBB or a nullptr.

Definition at line 203 of file SerialTaskQueue.h.

203  {
204  QueuedTask<T>* pTask{new (tbb::task::allocate_root()) QueuedTask<T>{iAction}};
205  pTask->setQueue(this);
206  return pushAndGetNextTask(pTask);
207  }

References pushAndGetNextTask(), and edm::SerialTaskQueue::TaskBase::setQueue().

◆ pushAndWait() [1/2]

template<typename T >
void SerialTaskQueue::pushAndWait ( const T iAction)

synchronously pushes functor iAction into queue

The function will wait until iAction has completed before returning. If another task is already running on the queue, the system is allowed to find another TBB task to execute while waiting for the iAction to finish. In that way the core is not idled while waiting.

Parameters
[in]iActionMust be a functor that takes no arguments and return no values.

Definition at line 194 of file SerialTaskQueue.h.

194  {
195  tbb::empty_task* waitTask = new (tbb::task::allocate_root()) tbb::empty_task;
196  waitTask->set_ref_count(2);
197  QueuedTask<T>* pTask{new (waitTask->allocate_child()) QueuedTask<T>{iAction}};
198  pTask->setQueue(this);
199  pushAndWait(waitTask, pTask);
200  }

References edm::SerialTaskQueue::TaskBase::setQueue().

Referenced by ~SerialTaskQueue().

◆ pushAndWait() [2/2]

void SerialTaskQueue::pushAndWait ( tbb::empty_task *  iWait,
TaskBase iTask 
)
private

Definition at line 92 of file SerialTaskQueue.cc.

92  {
93  auto nextTask = pushAndGetNextTask(iTask);
94  if
95  LIKELY(nullptr != nextTask) {
96  if
97  LIKELY(nextTask == iTask) {
98  //spawn and wait for all requires the task to have its parent set
99  iWait->spawn_and_wait_for_all(*nextTask);
100  }
101  else {
102  tbb::task::spawn(*nextTask);
103  iWait->wait_for_all();
104  }
105  }
106  else {
107  //a task must already be running in this queue
108  iWait->wait_for_all();
109  }
110  tbb::task::destroy(*iWait);
111 }

References LIKELY, and pushAndGetNextTask().

◆ pushTask()

void SerialTaskQueue::pushTask ( TaskBase iTask)
private

Definition at line 46 of file SerialTaskQueue.cc.

46  {
47  tbb::task* t = pushAndGetNextTask(iTask);
48  if (nullptr != t) {
49  tbb::task::spawn(*t);
50  }
51 }

References pushAndGetNextTask(), OrderedSet::t, and TrackValidation_cff::task.

Referenced by push().

◆ resume()

bool SerialTaskQueue::resume ( )

Resumes processing if the queue was paused.

Multiple calls to resume() are allowed if there were multiple calls to pause(). Only when we reach as many resume() calls as pause() calls will the queue restart.

Returns
true if the call really restarts the queue
See also
pause(), isPaused()

Definition at line 35 of file SerialTaskQueue.cc.

35  {
36  if (0 == --m_pauseCount) {
38  if (nullptr != t) {
39  tbb::task::spawn(*t);
40  }
41  return true;
42  }
43  return false;
44 }

References m_pauseCount, pickNextTask(), OrderedSet::t, and TrackValidation_cff::task.

Referenced by edm::EventProcessor::globalEndLumiAsync(), edm::LimitedTaskQueue::Resumer::operator=(), and edm::Worker::RunModuleTask< T >::EnableQueueGuard::~EnableQueueGuard().

Friends And Related Function Documentation

◆ TaskBase

friend class TaskBase
friend

Definition at line 170 of file SerialTaskQueue.h.

Member Data Documentation

◆ m_pauseCount

std::atomic<unsigned long> edm::SerialTaskQueue::m_pauseCount
private

Definition at line 183 of file SerialTaskQueue.h.

Referenced by isPaused(), pause(), pickNextTask(), and resume().

◆ m_taskChosen

std::atomic<bool> edm::SerialTaskQueue::m_taskChosen
private

Definition at line 182 of file SerialTaskQueue.h.

Referenced by finishedTask(), pickNextTask(), SerialTaskQueue(), and ~SerialTaskQueue().

◆ m_tasks

tbb::concurrent_queue<TaskBase*> edm::SerialTaskQueue::m_tasks
private
cms::cuda::assert
assert(be >=bs)
edm::SerialTaskQueue::pickNextTask
TaskBase * pickNextTask()
Definition: SerialTaskQueue.cc:68
edm::SerialTaskQueue::m_pauseCount
std::atomic< unsigned long > m_pauseCount
Definition: SerialTaskQueue.h:183
TrackValidation_cff.task
task
Definition: TrackValidation_cff.py:252
OrderedSet.t
t
Definition: OrderedSet.py:90
edm::SerialTaskQueue::TaskBase
friend class TaskBase
Definition: SerialTaskQueue.h:170
edm::SerialTaskQueue::pushAndWait
void pushAndWait(const T &iAction)
synchronously pushes functor iAction into queue
Definition: SerialTaskQueue.h:194
edm::SerialTaskQueue::pushAndGetNextTask
tbb::task * pushAndGetNextTask(TaskBase *)
Definition: SerialTaskQueue.cc:53
edm::SerialTaskQueue::m_tasks
tbb::concurrent_queue< TaskBase * > m_tasks
Definition: SerialTaskQueue.h:181
eostools.move
def move(src, dest)
Definition: eostools.py:511
LIKELY
#define LIKELY(x)
Definition: Likely.h:20
edm::SerialTaskQueue::m_taskChosen
std::atomic< bool > m_taskChosen
Definition: SerialTaskQueue.h:182
or
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::SerialTaskQueue::pushTask
void pushTask(TaskBase *)
Definition: SerialTaskQueue.cc:46
edm::SerialTaskQueue::isPaused
bool isPaused() const
Checks to see if the queue has been paused.
Definition: SerialTaskQueue.h:84