CMS 3D CMS Logo

List of all members | Classes | Public Member Functions | Private Member Functions | Private Attributes | Friends
edm::SerialTaskQueue Class Reference

#include <SerialTaskQueue.h>

Classes

class  QueuedTask
 
class  TaskBase
 

Public Member Functions

bool isPaused () const
 Checks to see if the queue has been paused. More...
 
const SerialTaskQueueoperator= (const SerialTaskQueue &)=delete
 
bool pause ()
 Pauses processing of additional tasks from the queue. More...
 
template<typename T >
void push (oneapi::tbb::task_group &, const T &iAction)
 asynchronously pushes functor iAction into queue More...
 
bool resume ()
 Resumes processing if the queue was paused. More...
 
 SerialTaskQueue ()
 
 SerialTaskQueue (SerialTaskQueue &&iOther)
 
 SerialTaskQueue (const SerialTaskQueue &)=delete
 
 ~SerialTaskQueue ()
 

Private Member Functions

TaskBasefinishedTask ()
 
TaskBasepickNextTask ()
 
TaskBasepushAndGetNextTask (TaskBase *)
 
void pushTask (TaskBase *)
 
void spawn (TaskBase &)
 

Private Attributes

std::atomic< unsigned long > m_pauseCount
 
std::atomic< bool > m_taskChosen
 
oneapi::tbb::concurrent_queue< TaskBase * > m_tasks
 

Friends

class TaskBase
 

Detailed Description

Definition at line 67 of file SerialTaskQueue.h.

Constructor & Destructor Documentation

◆ SerialTaskQueue() [1/3]

edm::SerialTaskQueue::SerialTaskQueue ( )
inline

Definition at line 69 of file SerialTaskQueue.h.

69 : m_taskChosen(false), m_pauseCount{0} {}
std::atomic< unsigned long > m_pauseCount
std::atomic< bool > m_taskChosen

◆ SerialTaskQueue() [2/3]

edm::SerialTaskQueue::SerialTaskQueue ( SerialTaskQueue &&  iOther)
inline

Definition at line 71 of file SerialTaskQueue.h.

References cms::cuda::assert(), m_taskChosen, and m_tasks.

72  : m_tasks(std::move(iOther.m_tasks)),
73  m_taskChosen(iOther.m_taskChosen.exchange(false)),
74  m_pauseCount(iOther.m_pauseCount.exchange(0)) {
75  assert(m_tasks.empty() and m_taskChosen == false);
76  }
std::atomic< unsigned long > m_pauseCount
assert(be >=bs)
std::atomic< bool > m_taskChosen
oneapi::tbb::concurrent_queue< TaskBase * > m_tasks
def move(src, dest)
Definition: eostools.py:511

◆ SerialTaskQueue() [3/3]

edm::SerialTaskQueue::SerialTaskQueue ( const SerialTaskQueue )
delete

◆ ~SerialTaskQueue()

SerialTaskQueue::~SerialTaskQueue ( )

Definition at line 27 of file SerialTaskQueue.cc.

References g, isPaused(), dqmdumpme::last, m_taskChosen, m_tasks, eostools::move(), or, and push().

27  {
28  //be certain all tasks have completed
29  bool isEmpty = m_tasks.empty();
30  bool isTaskChosen = m_taskChosen;
31  if ((not isEmpty and not isPaused()) or isTaskChosen) {
32  oneapi::tbb::task_group g;
33  tbb::task_handle last{g.defer([]() {})};
34  push(g, [&g, &last]() { g.run(std::move(last)); });
35  g.wait();
36  }
37 }
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e g
Definition: Activities.doc:4
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
bool isPaused() const
Checks to see if the queue has been paused.
std::atomic< bool > m_taskChosen
oneapi::tbb::concurrent_queue< TaskBase * > m_tasks
def move(src, dest)
Definition: eostools.py:511

Member Function Documentation

◆ finishedTask()

SerialTaskQueue::TaskBase * SerialTaskQueue::finishedTask ( )
private

Definition at line 83 of file SerialTaskQueue.cc.

References m_taskChosen, and pickNextTask().

Referenced by spawn().

83  {
84  m_taskChosen.store(false);
85  return pickNextTask();
86 }
TaskBase * pickNextTask()
std::atomic< bool > m_taskChosen

◆ isPaused()

bool edm::SerialTaskQueue::isPaused ( ) const
inline

Checks to see if the queue has been paused.

Returns
true if the queue is paused
See also
pause(), resume()

Definition at line 87 of file SerialTaskQueue.h.

References m_pauseCount.

Referenced by ~SerialTaskQueue().

87 { return m_pauseCount.load() != 0; }
std::atomic< unsigned long > m_pauseCount

◆ operator=()

const SerialTaskQueue& edm::SerialTaskQueue::operator= ( const SerialTaskQueue )
delete

◆ pause()

bool edm::SerialTaskQueue::pause ( )
inline

Pauses processing of additional tasks from the queue.

Any task already running will not be paused however once that running task finishes no further tasks will be started. Multiple calls to pause() are allowed, however each call to pause() must be balanced by a call to resume().

Returns
false if queue was already paused.
See also
resume(), isPaused()

Definition at line 99 of file SerialTaskQueue.h.

References m_pauseCount.

Referenced by edm::LimitedTaskQueue::Resumer::Resumer().

99 { return 1 == ++m_pauseCount; }
std::atomic< unsigned long > m_pauseCount

◆ pickNextTask()

SerialTaskQueue::TaskBase * SerialTaskQueue::pickNextTask ( )
private

Definition at line 88 of file SerialTaskQueue.cc.

References LIKELY, m_pauseCount, m_taskChosen, m_tasks, and submitPVValidationJobs::t.

Referenced by finishedTask(), pushAndGetNextTask(), and resume().

88  {
89  bool expect = false;
90  if LIKELY (0 == m_pauseCount and m_taskChosen.compare_exchange_strong(expect, true)) {
91  TaskBase* t = nullptr;
92  if LIKELY (m_tasks.try_pop(t)) {
93  return t;
94  }
95  //no task was actually pulled
96  m_taskChosen.store(false);
97 
98  //was a new entry added after we called 'try_pop' but before we did the clear?
99  expect = false;
100  if (not m_tasks.empty() and m_taskChosen.compare_exchange_strong(expect, true)) {
101  t = nullptr;
102  if (m_tasks.try_pop(t)) {
103  return t;
104  }
105  //no task was still pulled since a different thread beat us to it
106  m_taskChosen.store(false);
107  }
108  }
109  return nullptr;
110 }
std::atomic< unsigned long > m_pauseCount
#define LIKELY(x)
Definition: Likely.h:20
std::atomic< bool > m_taskChosen
oneapi::tbb::concurrent_queue< TaskBase * > m_tasks

◆ push()

template<typename T >
void SerialTaskQueue::push ( oneapi::tbb::task_group &  iGroup,
const T iAction 
)

asynchronously pushes functor iAction into queue

The function will return immediately and iAction will either process concurrently with the calling thread or wait until the protected resource becomes available or until a CPU becomes available.

Parameters
[in]iActionMust be a functor that takes no arguments and return no values.

Definition at line 167 of file SerialTaskQueue.h.

References pushTask().

Referenced by edm::EventProcessor::beginLumiAsync(), edm::EventProcessor::beginRunAsync(), evf::GlobalEvFOutputEventWriter::doOutputEventAsync(), edm::EventProcessor::endRunAsync(), edm::eventsetup::ESSourceDataProxyNonConcurrentBase::prefetchAsyncImpl(), and ~SerialTaskQueue().

167  {
168  QueuedTask<T>* pTask{new QueuedTask<T>{iGroup, iAction}};
169  pushTask(pTask);
170  }
void pushTask(TaskBase *)

◆ pushAndGetNextTask()

SerialTaskQueue::TaskBase * SerialTaskQueue::pushAndGetNextTask ( TaskBase iTask)
private

Definition at line 74 of file SerialTaskQueue.cc.

References LIKELY, m_tasks, and pickNextTask().

Referenced by pushTask().

74  {
75  TaskBase* returnValue{nullptr};
76  if LIKELY (nullptr != iTask) {
77  m_tasks.push(iTask);
78  returnValue = pickNextTask();
79  }
80  return returnValue;
81 }
#define LIKELY(x)
Definition: Likely.h:20
TaskBase * pickNextTask()
oneapi::tbb::concurrent_queue< TaskBase * > m_tasks

◆ pushTask()

void SerialTaskQueue::pushTask ( TaskBase iTask)
private

Definition at line 67 of file SerialTaskQueue.cc.

References pushAndGetNextTask(), spawn(), and submitPVValidationJobs::t.

Referenced by push().

67  {
68  auto t = pushAndGetNextTask(iTask);
69  if (nullptr != t) {
70  spawn(*t);
71  }
72 }
TaskBase * pushAndGetNextTask(TaskBase *)
void spawn(TaskBase &)

◆ resume()

bool SerialTaskQueue::resume ( )

Resumes processing if the queue was paused.

Multiple calls to resume() are allowed if there were multiple calls to pause(). Only when we reach as many resume() calls as pause() calls will the queue restart.

Returns
true if the call really restarts the queue
See also
pause(), isPaused()

Definition at line 56 of file SerialTaskQueue.cc.

References m_pauseCount, pickNextTask(), spawn(), and submitPVValidationJobs::t.

Referenced by edm::EventProcessor::beginLumiAsync(), edm::EventProcessor::beginRunAsync(), edm::EventProcessor::globalEndLumiAsync(), edm::EventProcessor::globalEndRunAsync(), edm::LimitedTaskQueue::Resumer::operator=(), edm::EventProcessor::releaseBeginRunResources(), and edm::Worker::RunModuleTask< T >::EnableQueueGuard::~EnableQueueGuard().

56  {
57  if (0 == --m_pauseCount) {
58  auto t = pickNextTask();
59  if (nullptr != t) {
60  spawn(*t);
61  }
62  return true;
63  }
64  return false;
65 }
std::atomic< unsigned long > m_pauseCount
TaskBase * pickNextTask()
void spawn(TaskBase &)

◆ spawn()

void SerialTaskQueue::spawn ( TaskBase iTask)
private

Definition at line 39 of file SerialTaskQueue.cc.

References finishedTask(), g, edm::SerialTaskQueue::TaskBase::group(), and submitPVValidationJobs::t.

Referenced by pushTask(), and resume().

39  {
40  auto pTask = &iTask;
41  iTask.group()->run([pTask, this]() {
42  TaskBase* t = pTask;
43  auto g = pTask->group();
44  do {
45  t->execute();
46  delete t;
47  t = finishedTask();
48  if (t and t->group() != g) {
49  spawn(*t);
50  t = nullptr;
51  }
52  } while (t != nullptr);
53  });
54 }
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e g
Definition: Activities.doc:4
TaskBase * finishedTask()
void spawn(TaskBase &)

Friends And Related Function Documentation

◆ TaskBase

friend class TaskBase
friend

Definition at line 150 of file SerialTaskQueue.h.

Member Data Documentation

◆ m_pauseCount

std::atomic<unsigned long> edm::SerialTaskQueue::m_pauseCount
private

Definition at line 163 of file SerialTaskQueue.h.

Referenced by isPaused(), pause(), pickNextTask(), and resume().

◆ m_taskChosen

std::atomic<bool> edm::SerialTaskQueue::m_taskChosen
private

Definition at line 162 of file SerialTaskQueue.h.

Referenced by finishedTask(), pickNextTask(), SerialTaskQueue(), and ~SerialTaskQueue().

◆ m_tasks

oneapi::tbb::concurrent_queue<TaskBase*> edm::SerialTaskQueue::m_tasks
private