CMS 3D CMS Logo

WaitingTaskList.cc
Go to the documentation of this file.
1 // -*- C++ -*-
2 //
3 // Package: Concurrency
4 // Class : WaitingTaskList
5 //
6 // Implementation:
7 // [Notes on implementation]
8 //
9 // Original Author: Chris Jones
10 // Created: Thu Feb 21 13:46:45 CST 2013
11 // $Id$
12 //
13 
14 // system include files
15 
16 // user include files
17 #include "tbb/task.h"
18 #include <cassert>
19 
23 
24 using namespace edm;
25 //
26 // constants, enums and typedefs
27 //
28 
29 //
30 // static data member definitions
31 //
32 
33 //
34 // constructors and destructor
35 //
36 WaitingTaskList::WaitingTaskList(unsigned int iInitialSize)
37  : m_head{nullptr},
38  m_nodeCache{new WaitNode[iInitialSize]},
39  m_nodeCacheSize{iInitialSize},
41  m_waiting{true} {
42  auto nodeCache = m_nodeCache.get();
43  for (auto it = nodeCache, itEnd = nodeCache + m_nodeCacheSize; it != itEnd; ++it) {
44  it->m_fromCache = true;
45  }
46 }
47 
48 //
49 // member functions
50 //
52  m_exceptionPtr = std::exception_ptr{};
53  unsigned int nSeenTasks = m_lastAssignedCacheIndex;
55  assert(m_head == nullptr);
56  if (nSeenTasks > m_nodeCacheSize) {
57  //need to expand so next time we don't have to do any
58  // memory requests
59  m_nodeCacheSize = nSeenTasks;
60  m_nodeCache.reset(new WaitNode[nSeenTasks]);
61  auto nodeCache = m_nodeCache.get();
62  for (auto it = nodeCache, itEnd = nodeCache + m_nodeCacheSize; it != itEnd; ++it) {
63  it->m_fromCache = true;
64  }
65  }
66  //this will make sure all cores see the changes
67  m_waiting = true;
68 }
69 
71  unsigned int index = m_lastAssignedCacheIndex++;
72 
73  WaitNode* returnValue;
74  if (index < m_nodeCacheSize) {
75  returnValue = m_nodeCache.get() + index;
76  } else {
77  returnValue = new WaitNode;
78  returnValue->m_fromCache = false;
79  }
80  returnValue->m_task = iTask;
81  //No other thread can see m_next yet. The caller to create node
82  // will be doing a synchronization operation anyway which will
83  // make sure m_task and m_next are synched across threads
84  returnValue->m_next.store(returnValue, std::memory_order_relaxed);
85 
86  return returnValue;
87 }
88 
90  iTask->increment_ref_count();
91  if (!m_waiting) {
92  if (UNLIKELY(bool(m_exceptionPtr))) {
94  }
95  if (0 == iTask->decrement_ref_count()) {
96  tbb::task::spawn(*iTask);
97  }
98  } else {
99  WaitNode* newHead = createNode(iTask);
100  //This exchange is sequentially consistent thereby
101  // ensuring ordering between it and setNextNode
102  WaitNode* oldHead = m_head.exchange(newHead);
103  newHead->setNextNode(oldHead);
104 
105  //For the case where oldHead != nullptr,
106  // even if 'm_waiting' changed, we don't
107  // have to recheck since we beat 'announce()' in
108  // the ordering of 'm_head.exchange' call so iTask
109  // is guaranteed to be in the link list
110 
111  if (nullptr == oldHead) {
112  if (!m_waiting) {
113  //if finished waiting right before we did the
114  // exchange our task will not be spawned. Also,
115  // additional threads may be calling add() and swapping
116  // heads and linking us to the new head.
117  // It is safe to call announce from multiple threads
118  announce();
119  }
120  }
121  }
122 }
123 
124 void WaitingTaskList::presetTaskAsFailed(std::exception_ptr iExcept) {
125  if (iExcept and m_waiting) {
126  WaitNode* node = m_head.load();
127  while (node) {
128  WaitNode* next;
129  while (node == (next = node->nextNode())) {
130  hardware_pause();
131  }
132  node->m_task->dependentTaskFailed(iExcept);
133  node = next;
134  }
135  }
136 }
137 
139  //Need a temporary storage since one of these tasks could
140  // cause the next event to start processing which would refill
141  // this waiting list after it has been reset
142  WaitNode* n = m_head.exchange(nullptr);
143  WaitNode* next;
144  while (n) {
145  //it is possible that 'WaitingTaskList::add' is running in a different
146  // thread and we have a new 'head' but the old head has not yet been
147  // attached to the new head (we identify this since 'nextNode' will return itself).
148  // In that case we have to wait until the link has been established before going on.
149  while (n == (next = n->nextNode())) {
150  hardware_pause();
151  }
152  auto t = n->m_task;
153  if (UNLIKELY(bool(m_exceptionPtr))) {
154  t->dependentTaskFailed(m_exceptionPtr);
155  }
156  if (!n->m_fromCache) {
157  delete n;
158  }
159  n = next;
160 
161  //the task may indirectly call WaitingTaskList::reset
162  // so we need to call spawn after we are done using the node.
163  if (0 == t->decrement_ref_count()) {
164  tbb::task::spawn(*t);
165  }
166  }
167 }
168 
169 void WaitingTaskList::doneWaiting(std::exception_ptr iPtr) {
170  m_exceptionPtr = iPtr;
171  m_waiting = false;
172  announce();
173 }
std::atomic< bool > m_waiting
void add(WaitingTask *)
Adds task to the waiting list.
std::atomic< unsigned int > m_lastAssignedCacheIndex
std::atomic< WaitNode * > m_head
void reset()
Resets access to the resource so that added tasks will wait.
std::atomic< WaitNode * > m_next
void dependentTaskFailed(std::exception_ptr iPtr)
Called if waited for task failed.
Definition: WaitingTask.h:59
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
std::unique_ptr< WaitNode[]> m_nodeCache
std::exception_ptr m_exceptionPtr
WaitingTaskList(unsigned int iInitialSize=2)
Constructor.
unsigned int m_nodeCacheSize
void presetTaskAsFailed(std::exception_ptr iExcept)
HLT enums.
void setNextNode(WaitNode *iNext)
#define UNLIKELY(x)
Definition: Likely.h:21
WaitNode * createNode(WaitingTask *iTask)