test
CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
SerialTaskQueue.cc
Go to the documentation of this file.
1 // -*- C++ -*-
2 //
3 // Package: Concurrency
4 // Class : SerialTaskQueue
5 //
6 // Implementation:
7 // [Notes on implementation]
8 //
9 // Original Author: Chris Jones
10 // Created: Thu Feb 21 11:31:52 CST 2013
11 // $Id$
12 //
13 
14 // system include files
15 
16 // user include files
18 
20 
21 using namespace edm;
22 
23 //
24 // member functions
25 //
26 bool
28  if(0==--m_pauseCount) {
29  tbb::task* t = pickNextTask();
30  if(0 != t) {
31  tbb::task::spawn(*t);
32  }
33  return true;
34  }
35  return false;
36 }
37 
38 void
40  tbb::task* t = pushAndGetNextTask(iTask);
41  if(0!=t) {
42  tbb::task::spawn(*t);
43  }
44 }
45 
46 tbb::task*
48  tbb::task* returnValue{0};
49  if likely(0!=iTask) {
50  m_tasks.push(iTask);
51  returnValue = pickNextTask();
52  }
53  return returnValue;
54 }
55 
56 
57 tbb::task*
59  m_taskChosen.store(false);
60  return pickNextTask();
61 }
62 
65 
66  bool expect = false;
67  if likely(0 == m_pauseCount and m_taskChosen.compare_exchange_strong(expect,true)) {
68  TaskBase* t=0;
69  if likely(m_tasks.try_pop(t)) {
70  return t;
71  }
72  //no task was actually pulled
73  m_taskChosen.store(false);
74 
75  //was a new entry added after we called 'try_pop' but before we did the clear?
76  expect = false;
77  if(not m_tasks.empty() and m_taskChosen.compare_exchange_strong(expect,true)) {
78  TaskBase* t=0;
79  if(m_tasks.try_pop(t)) {
80  return t;
81  }
82  //no task was still pulled since a different thread beat us to it
83  m_taskChosen.store(false);
84 
85  }
86  }
87  return 0;
88 }
89 
90 void SerialTaskQueue::pushAndWait(tbb::empty_task* iWait, TaskBase* iTask) {
91  auto nextTask = pushAndGetNextTask(iTask);
92  if likely(nullptr != nextTask) {
93  if likely(nextTask == iTask) {
94  //spawn and wait for all requires the task to have its parent set
95  iWait->spawn_and_wait_for_all(*nextTask);
96  } else {
97  tbb::task::spawn(*nextTask);
98  iWait->wait_for_all();
99  }
100  } else {
101  //a task must already be running in this queue
102  iWait->wait_for_all();
103  }
104  tbb::task::destroy(*iWait);
105 }
106 
107 
108 //
109 // const member functions
110 //
111 
112 //
113 // static member functions
114 //
tbb::task * finishedTask()
void pushAndWait(const T &iAction)
synchronously pushes functor iAction into queue
std::atomic< unsigned long > m_pauseCount
tbb::task * pushAndGetNextTask(TaskBase *)
void pushTask(TaskBase *)
#define likely(x)
bool resume()
Resumes processing if the queue was paused.
TaskBase * pickNextTask()
std::atomic< bool > m_taskChosen
tbb::concurrent_queue< TaskBase * > m_tasks