CMS 3D CMS Logo

WaitingTaskList.cc
Go to the documentation of this file.
1 // -*- C++ -*-
2 //
3 // Package: Concurrency
4 // Class : WaitingTaskList
5 //
6 // Implementation:
7 // [Notes on implementation]
8 //
9 // Original Author: Chris Jones
10 // Created: Thu Feb 21 13:46:45 CST 2013
11 // $Id$
12 //
13 
14 // system include files
15 
16 // user include files
17 #include "oneapi/tbb/task.h"
18 #include <cassert>
19 #include <memory>
20 
25 
26 using namespace edm;
27 //
28 // constants, enums and typedefs
29 //
30 
31 //
32 // static data member definitions
33 //
34 
35 //
36 // constructors and destructor
37 //
38 WaitingTaskList::WaitingTaskList(unsigned int iInitialSize)
39  : m_head{nullptr},
40  m_nodeCache{new WaitNode[iInitialSize]},
41  m_nodeCacheSize{iInitialSize},
42  m_lastAssignedCacheIndex{0},
43  m_waiting{true} {
44  auto nodeCache = m_nodeCache.get();
45  for (auto it = nodeCache, itEnd = nodeCache + m_nodeCacheSize; it != itEnd; ++it) {
46  it->m_fromCache = true;
47  }
48 }
49 
50 //
51 // member functions
52 //
54  m_exceptionPtr = std::exception_ptr{};
55  unsigned int nSeenTasks = m_lastAssignedCacheIndex;
57  assert(m_head == nullptr);
58  if (nSeenTasks > m_nodeCacheSize) {
59  //need to expand so next time we don't have to do any
60  // memory requests
61  m_nodeCacheSize = nSeenTasks;
62  m_nodeCache = std::make_unique<WaitNode[]>(nSeenTasks);
63  auto nodeCache = m_nodeCache.get();
64  for (auto it = nodeCache, itEnd = nodeCache + m_nodeCacheSize; it != itEnd; ++it) {
65  it->m_fromCache = true;
66  }
67  }
68  //this will make sure all cores see the changes
69  m_waiting = true;
70 }
71 
72 WaitingTaskList::WaitNode* WaitingTaskList::createNode(oneapi::tbb::task_group* iGroup, WaitingTask* iTask) {
73  unsigned int index = m_lastAssignedCacheIndex++;
74 
75  WaitNode* returnValue;
76  if (index < m_nodeCacheSize) {
77  returnValue = m_nodeCache.get() + index;
78  } else {
79  returnValue = new WaitNode;
80  returnValue->m_fromCache = false;
81  }
82  returnValue->m_task = iTask;
83  returnValue->m_group = iGroup;
84  //No other thread can see m_next yet. The caller to create node
85  // will be doing a synchronization operation anyway which will
86  // make sure m_task and m_next are synched across threads
87  returnValue->m_next.store(returnValue, std::memory_order_relaxed);
88 
89  return returnValue;
90 }
91 
93  if (!m_waiting) {
94  if (m_exceptionPtr) {
96  }
97  } else {
98  auto task = iTask.release_no_decrement();
99  WaitNode* newHead = createNode(iTask.group(), task);
100  //This exchange is sequentially consistent thereby
101  // ensuring ordering between it and setNextNode
102  WaitNode* oldHead = m_head.exchange(newHead);
103  newHead->setNextNode(oldHead);
104 
105  //For the case where oldHead != nullptr,
106  // even if 'm_waiting' changed, we don't
107  // have to recheck since we beat 'announce()' in
108  // the ordering of 'm_head.exchange' call so iTask
109  // is guaranteed to be in the link list
110 
111  if (nullptr == oldHead) {
112  newHead->setNextNode(nullptr);
113  if (!m_waiting) {
114  //if finished waiting right before we did the
115  // exchange our task will not be run. Also,
116  // additional threads may be calling add() and swapping
117  // heads and linking us to the new head.
118  // It is safe to call announce from multiple threads
119  announce();
120  }
121  }
122  }
123 }
124 
125 void WaitingTaskList::add(oneapi::tbb::task_group* iGroup, WaitingTask* iTask) {
126  iTask->increment_ref_count();
127  if (!m_waiting) {
128  if (UNLIKELY(bool(m_exceptionPtr))) {
130  }
131  if (0 == iTask->decrement_ref_count()) {
132  iGroup->run([iTask]() {
133  TaskSentry s{iTask};
134  iTask->execute();
135  });
136  }
137  } else {
138  WaitNode* newHead = createNode(iGroup, iTask);
139  //This exchange is sequentially consistent thereby
140  // ensuring ordering between it and setNextNode
141  WaitNode* oldHead = m_head.exchange(newHead);
142  newHead->setNextNode(oldHead);
143 
144  //For the case where oldHead != nullptr,
145  // even if 'm_waiting' changed, we don't
146  // have to recheck since we beat 'announce()' in
147  // the ordering of 'm_head.exchange' call so iTask
148  // is guaranteed to be in the link list
149 
150  if (nullptr == oldHead) {
151  if (!m_waiting) {
152  //if finished waiting right before we did the
153  // exchange our task will not be run. Also,
154  // additional threads may be calling add() and swapping
155  // heads and linking us to the new head.
156  // It is safe to call announce from multiple threads
157  announce();
158  }
159  }
160  }
161 }
162 
163 void WaitingTaskList::presetTaskAsFailed(std::exception_ptr iExcept) {
164  if (iExcept and m_waiting) {
165  WaitNode* node = m_head.load();
166  while (node) {
167  WaitNode* next;
168  while (node == (next = node->nextNode())) {
169  hardware_pause();
170  }
171  node->m_task->dependentTaskFailed(iExcept);
172  node = next;
173  }
174  }
175 }
176 
178  //Need a temporary storage since one of these tasks could
179  // cause the next event to start processing which would refill
180  // this waiting list after it has been reset
181  WaitNode* n = m_head.exchange(nullptr);
182  WaitNode* next;
183  while (n) {
184  //it is possible that 'WaitingTaskList::add' is running in a different
185  // thread and we have a new 'head' but the old head has not yet been
186  // attached to the new head (we identify this since 'nextNode' will return itself).
187  // In that case we have to wait until the link has been established before going on.
188  while (n == (next = n->nextNode())) {
189  hardware_pause();
190  }
191  auto t = n->m_task;
192  auto g = n->m_group;
193  if (UNLIKELY(bool(m_exceptionPtr))) {
194  t->dependentTaskFailed(m_exceptionPtr);
195  }
196  if (!n->m_fromCache) {
197  delete n;
198  }
199  n = next;
200 
201  //the task may indirectly call WaitingTaskList::reset
202  // so we need to call spawn after we are done using the node.
203  if (0 == t->decrement_ref_count()) {
204  g->run([t]() {
205  TaskSentry s{t};
206  t->execute();
207  });
208  }
209  }
210 }
211 
212 void WaitingTaskList::doneWaiting(std::exception_ptr iPtr) {
213  m_exceptionPtr = iPtr;
214  m_waiting = false;
215  announce();
216 }
oneapi::tbb::task_group * m_group
WaitNode * createNode(oneapi::tbb::task_group *iGroup, WaitingTask *iTask)
std::atomic< bool > m_waiting
virtual void execute()=0
std::atomic< unsigned int > m_lastAssignedCacheIndex
std::atomic< WaitNode * > m_head
void reset()
Resets access to the resource so that added tasks will wait.
assert(be >=bs)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e g
Definition: Activities.doc:4
std::atomic< WaitNode * > m_next
void dependentTaskFailed(std::exception_ptr iPtr)
Called if waited for task failed.
Definition: WaitingTask.h:59
oneapi::tbb::task_group * group() const noexcept
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
WaitingTask * release_no_decrement() noexcept
void doneWaiting(std::exception_ptr iExcept)
std::unique_ptr< WaitNode[]> m_nodeCache
std::exception_ptr m_exceptionPtr
void add(oneapi::tbb::task_group *, WaitingTask *)
Adds task to the waiting list.
WaitingTaskList(unsigned int iInitialSize=2)
Constructor.
unsigned int m_nodeCacheSize
void presetTaskAsFailed(std::exception_ptr iExcept)
HLT enums.
void setNextNode(WaitNode *iNext)
unsigned int decrement_ref_count()
Definition: TaskBase.h:42
#define UNLIKELY(x)
Definition: Likely.h:21
void increment_ref_count()
Definition: TaskBase.h:41