CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
SerialTaskQueue.cc
Go to the documentation of this file.
1 // -*- C++ -*-
2 //
3 // Package: Concurrency
4 // Class : SerialTaskQueue
5 //
6 // Implementation:
7 // [Notes on implementation]
8 //
9 // Original Author: Chris Jones
10 // Created: Thu Feb 21 11:31:52 CST 2013
11 // $Id$
12 //
13 
14 // system include files
15 
16 // user include files
18 
20 
21 using namespace edm;
22 
23 //
24 // member functions
25 //
26 bool
28  if(0==--m_pauseCount) {
29  tbb::task* t = pickNextTask();
30  if(0 != t) {
31  tbb::task::spawn(*t);
32  }
33  return true;
34  }
35  return false;
36 }
37 
38 void
40  tbb::task* t = pushAndGetNextTask(iTask);
41  if(0!=t) {
42  tbb::task::spawn(*t);
43  }
44 }
45 
46 tbb::task*
48  tbb::task* returnValue{0};
49  if likely(0!=iTask) {
50  m_tasks.push(iTask);
51  returnValue = pickNextTask();
52  }
53  return returnValue;
54 }
55 
56 
57 tbb::task*
59  m_taskChosen.clear();
60  return pickNextTask();
61 }
62 
65 
66  if likely(0 == m_pauseCount and not m_taskChosen.test_and_set()) {
67  TaskBase* t=0;
68  if likely(m_tasks.try_pop(t)) {
69  return t;
70  }
71  //no task was actually pulled
72  m_taskChosen.clear();
73 
74  //was a new entry added after we called 'try_pop' but before we did the clear?
75  if(not m_tasks.empty() and not m_taskChosen.test_and_set()) {
76  TaskBase* t=0;
77  if(m_tasks.try_pop(t)) {
78  return t;
79  }
80  //no task was still pulled since a different thread beat us to it
81  m_taskChosen.clear();
82 
83  }
84  }
85  return 0;
86 }
87 
88 void SerialTaskQueue::pushAndWait(tbb::empty_task* iWait, TaskBase* iTask) {
89  auto nextTask = pushAndGetNextTask(iTask);
90  if likely(nullptr != nextTask) {
91  if likely(nextTask == iTask) {
92  //spawn and wait for all requires the task to have its parent set
93  iWait->spawn_and_wait_for_all(*nextTask);
94  } else {
95  tbb::task::spawn(*nextTask);
96  iWait->wait_for_all();
97  }
98  } else {
99  //a task must already be running in this queue
100  iWait->wait_for_all();
101  }
102  tbb::task::destroy(*iWait);
103 }
104 
105 
106 //
107 // const member functions
108 //
109 
110 //
111 // static member functions
112 //
tbb::task * finishedTask()
void pushAndWait(const T &iAction)
synchronously pushes functor iAction into queue
std::atomic< unsigned long > m_pauseCount
tbb::task * pushAndGetNextTask(TaskBase *)
void pushTask(TaskBase *)
#define likely(x)
bool resume()
Resumes processing if the queue was paused.
std::atomic_flag m_taskChosen
TaskBase * pickNextTask()
tbb::concurrent_queue< TaskBase * > m_tasks