CMS 3D CMS Logo

SerialTaskQueue.cc
Go to the documentation of this file.
1 // -*- C++ -*-
2 //
3 // Package: Concurrency
4 // Class : SerialTaskQueue
5 //
6 // Implementation:
7 // [Notes on implementation]
8 //
9 // Original Author: Chris Jones
10 // Created: Thu Feb 21 11:31:52 CST 2013
11 // $Id$
12 //
13 
14 // system include files
15 #include "tbb/task.h"
16 
17 // user include files
19 
21 
22 using namespace edm;
23 
24 //
25 // member functions
26 //
28  //be certain all tasks have completed
29  bool isEmpty = m_tasks.empty();
30  bool isTaskChosen = m_taskChosen;
31  if ((not isEmpty and not isPaused()) or isTaskChosen) {
32  tbb::task_group g;
33  g.run([&g, this]() {
34  tbb::task::suspend(
35  [&g, this](tbb::task::suspend_point tag) { push(g, [tag]() { tbb::task::resume(tag); }); }); //suspend
36  }); //group run
37  g.wait();
38  }
39 }
40 
42  auto pTask = &iTask;
43  iTask.group()->run([pTask, this]() {
44  TaskBase* t = pTask;
45  auto g = pTask->group();
46  do {
47  t->execute();
48  delete t;
49  t = finishedTask();
50  if (t and t->group() != g) {
51  spawn(*t);
52  t = nullptr;
53  }
54  } while (t != nullptr);
55  });
56 }
57 
59  if (0 == --m_pauseCount) {
60  auto t = pickNextTask();
61  if (nullptr != t) {
62  spawn(*t);
63  }
64  return true;
65  }
66  return false;
67 }
68 
70  auto t = pushAndGetNextTask(iTask);
71  if (nullptr != t) {
72  spawn(*t);
73  }
74 }
75 
77  TaskBase* returnValue{nullptr};
78  if LIKELY (nullptr != iTask) {
79  m_tasks.push(iTask);
80  returnValue = pickNextTask();
81  }
82  return returnValue;
83 }
84 
86  m_taskChosen.store(false);
87  return pickNextTask();
88 }
89 
91  bool expect = false;
92  if LIKELY (0 == m_pauseCount and m_taskChosen.compare_exchange_strong(expect, true)) {
93  TaskBase* t = nullptr;
94  if LIKELY (m_tasks.try_pop(t)) {
95  return t;
96  }
97  //no task was actually pulled
98  m_taskChosen.store(false);
99 
100  //was a new entry added after we called 'try_pop' but before we did the clear?
101  expect = false;
102  if (not m_tasks.empty() and m_taskChosen.compare_exchange_strong(expect, true)) {
103  t = nullptr;
104  if (m_tasks.try_pop(t)) {
105  return t;
106  }
107  //no task was still pulled since a different thread beat us to it
108  m_taskChosen.store(false);
109  }
110  }
111  return nullptr;
112 }
113 
114 //
115 // const member functions
116 //
117 
118 //
119 // static member functions
120 //
Likely.h
edm::SerialTaskQueue::spawn
void spawn(TaskBase &)
Definition: SerialTaskQueue.cc:41
edm::SerialTaskQueue::TaskBase::group
tbb::task_group * group()
Definition: SerialTaskQueue.h:126
edm
HLT enums.
Definition: AlignableModifier.h:19
edm::SerialTaskQueue::resume
bool resume()
Resumes processing if the queue was paused.
Definition: SerialTaskQueue.cc:58
edm::SerialTaskQueue::push
void push(tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
Definition: SerialTaskQueue.h:167
edm::SerialTaskQueue::pushAndGetNextTask
TaskBase * pushAndGetNextTask(TaskBase *)
Definition: SerialTaskQueue.cc:76
edm::SerialTaskQueue::finishedTask
TaskBase * finishedTask()
Definition: SerialTaskQueue.cc:85
edm::SerialTaskQueue::pickNextTask
TaskBase * pickNextTask()
Definition: SerialTaskQueue.cc:90
edm::SerialTaskQueue::~SerialTaskQueue
~SerialTaskQueue()
Definition: SerialTaskQueue.cc:27
edm::SerialTaskQueue::m_pauseCount
std::atomic< unsigned long > m_pauseCount
Definition: SerialTaskQueue.h:163
SerialTaskQueue.h
GlobalPosition_Frontier_DevDB_cff.tag
tag
Definition: GlobalPosition_Frontier_DevDB_cff.py:11
edm::SerialTaskQueue::m_tasks
tbb::concurrent_queue< TaskBase * > m_tasks
Definition: SerialTaskQueue.h:161
LIKELY
#define LIKELY(x)
Definition: Likely.h:20
edm::SerialTaskQueue::m_taskChosen
std::atomic< bool > m_taskChosen
Definition: SerialTaskQueue.h:162
or
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::SerialTaskQueue::TaskBase
Definition: SerialTaskQueue.h:123
edm::SerialTaskQueue::pushTask
void pushTask(TaskBase *)
Definition: SerialTaskQueue.cc:69
submitPVValidationJobs.t
string t
Definition: submitPVValidationJobs.py:644
g
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e g
Definition: Activities.doc:4
edm::SerialTaskQueue::isPaused
bool isPaused() const
Checks to see if the queue has been paused.
Definition: SerialTaskQueue.h:87