CMS 3D CMS Logo

WaitingTaskList.cc
Go to the documentation of this file.
1 // -*- C++ -*-
2 //
3 // Package: Concurrency
4 // Class : WaitingTaskList
5 //
6 // Implementation:
7 // [Notes on implementation]
8 //
9 // Original Author: Chris Jones
10 // Created: Thu Feb 21 13:46:45 CST 2013
11 // $Id$
12 //
13 
14 // system include files
15 
16 // user include files
17 #include "tbb/task.h"
18 #include <cassert>
19 
23 
24 using namespace edm;
25 //
26 // constants, enums and typedefs
27 //
28 
29 //
30 // static data member definitions
31 //
32 
33 //
34 // constructors and destructor
35 //
36 WaitingTaskList::WaitingTaskList(unsigned int iInitialSize):
37 m_head{nullptr},
38 m_nodeCache{new WaitNode[iInitialSize]},
39 m_nodeCacheSize{iInitialSize},
41 m_waiting{true}
42 {
43  auto nodeCache = m_nodeCache.get();
44  for(auto it = nodeCache, itEnd = nodeCache+m_nodeCacheSize; it!=itEnd; ++it) {
45  it->m_fromCache=true;
46  }
47 }
48 
49 //
50 // member functions
51 //
52 void
54 {
55  m_exceptionPtr = std::exception_ptr{};
56  unsigned int nSeenTasks = m_lastAssignedCacheIndex;
58  assert(m_head == nullptr);
59  if (nSeenTasks > m_nodeCacheSize) {
60  //need to expand so next time we don't have to do any
61  // memory requests
62  m_nodeCacheSize = nSeenTasks;
63  m_nodeCache.reset( new WaitNode[nSeenTasks] );
64  auto nodeCache = m_nodeCache.get();
65  for(auto it = nodeCache, itEnd = nodeCache+m_nodeCacheSize; it!=itEnd; ++it) {
66  it->m_fromCache=true;
67  }
68  }
69  //this will make sure all cores see the changes
70  m_waiting = true;
71 }
72 
75 {
76  unsigned int index = m_lastAssignedCacheIndex++;
77 
78  WaitNode* returnValue;
79  if( index < m_nodeCacheSize) {
80  returnValue = m_nodeCache.get()+index;
81  } else {
82  returnValue = new WaitNode;
83  returnValue->m_fromCache=false;
84  }
85  returnValue->m_task = iTask;
86  //No other thread can see m_next yet. The caller to create node
87  // will be doing a synchronization operation anyway which will
88  // make sure m_task and m_next are synched across threads
89  returnValue->m_next.store(returnValue,std::memory_order_relaxed);
90 
91  return returnValue;
92 }
93 
94 
95 void
97  iTask->increment_ref_count();
98  if(!m_waiting) {
99  if(UNLIKELY(bool(m_exceptionPtr))) {
101  }
102  if(0==iTask->decrement_ref_count()) {
103  tbb::task::spawn(*iTask);
104  }
105  } else {
106  WaitNode* newHead = createNode(iTask);
107  //This exchange is sequentially consistent thereby
108  // ensuring ordering between it and setNextNode
109  WaitNode* oldHead = m_head.exchange(newHead);
110  newHead->setNextNode(oldHead);
111 
112  //For the case where oldHead != nullptr,
113  // even if 'm_waiting' changed, we don't
114  // have to recheck since we beat 'announce()' in
115  // the ordering of 'm_head.exchange' call so iTask
116  // is guaranteed to be in the link list
117 
118  if(nullptr == oldHead) {
119  if(!m_waiting) {
120  //if finished waiting right before we did the
121  // exchange our task will not be spawned. Also,
122  // additional threads may be calling add() and swapping
123  // heads and linking us to the new head.
124  // It is safe to call announce from multiple threads
125  announce();
126  }
127  }
128  }
129 }
130 
131 void
132 WaitingTaskList::presetTaskAsFailed(std::exception_ptr iExcept) {
133  if(iExcept and m_waiting) {
134  WaitNode* node = m_head.load();
135  while(node) {
136  WaitNode* next;
137  while(node == (next=node->nextNode())) {
138  hardware_pause();
139  }
140  node->m_task->dependentTaskFailed(iExcept);
141  node = next;
142  }
143  }
144 }
145 
146 void
148 {
149  //Need a temporary storage since one of these tasks could
150  // cause the next event to start processing which would refill
151  // this waiting list after it has been reset
152  WaitNode* n = m_head.exchange(nullptr);
153  WaitNode* next;
154  while(n) {
155  //it is possible that 'WaitingTaskList::add' is running in a different
156  // thread and we have a new 'head' but the old head has not yet been
157  // attached to the new head (we identify this since 'nextNode' will return itself).
158  // In that case we have to wait until the link has been established before going on.
159  while(n == (next=n->nextNode())) {
160  hardware_pause();
161  }
162  auto t = n->m_task;
163  if(UNLIKELY(bool(m_exceptionPtr))) {
164  t->dependentTaskFailed(m_exceptionPtr);
165  }
166  if(0==t->decrement_ref_count()){
167  tbb::task::spawn(*t);
168  }
169  if(!n->m_fromCache ) {
170  delete n;
171  }
172  n=next;
173  }
174 }
175 
176 void
177 WaitingTaskList::doneWaiting(std::exception_ptr iPtr)
178 {
179  m_exceptionPtr = iPtr;
180  m_waiting=false;
181  announce();
182 }
std::atomic< bool > m_waiting
void add(WaitingTask *)
Adds task to the waiting list.
std::atomic< unsigned int > m_lastAssignedCacheIndex
std::atomic< WaitNode * > m_head
void reset()
Resets access to the resource so that added tasks will wait.
std::atomic< WaitNode * > m_next
void dependentTaskFailed(std::exception_ptr iPtr)
Called if waited for task failed.
Definition: WaitingTask.h:64
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
std::unique_ptr< WaitNode[]> m_nodeCache
std::exception_ptr m_exceptionPtr
WaitingTaskList(unsigned int iInitialSize=2)
Constructor.
unsigned int m_nodeCacheSize
void presetTaskAsFailed(std::exception_ptr iExcept)
HLT enums.
void setNextNode(WaitNode *iNext)
WaitNode * createNode(WaitingTask *iTask)
#define UNLIKELY(x)