CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
WaitingTaskList.cc
Go to the documentation of this file.
1 // -*- C++ -*-
2 //
3 // Package: Concurrency
4 // Class : WaitingTaskList
5 //
6 // Implementation:
7 // [Notes on implementation]
8 //
9 // Original Author: Chris Jones
10 // Created: Thu Feb 21 13:46:45 CST 2013
11 // $Id$
12 //
13 
14 // system include files
15 
16 // user include files
17 #include "tbb/task.h"
18 #include <cassert>
19 
22 
23 using namespace edm;
24 //
25 // constants, enums and typedefs
26 //
27 
28 //
29 // static data member definitions
30 //
31 
32 //
33 // constructors and destructor
34 //
35 WaitingTaskList::WaitingTaskList(unsigned int iInitialSize):
36 m_head{nullptr},
37 m_nodeCache{new WaitNode[iInitialSize]},
38 m_nodeCacheSize{iInitialSize},
39 m_lastAssignedCacheIndex{0},
40 m_waiting{true}
41 {
42  auto nodeCache = m_nodeCache.get();
43  for(auto it = nodeCache, itEnd = nodeCache+m_nodeCacheSize; it!=itEnd; ++it) {
44  it->m_fromCache=true;
45  }
46 }
47 
48 //
49 // member functions
50 //
51 void
53 {
54  m_exceptionPtr = std::exception_ptr{};
55  unsigned int nSeenTasks = m_lastAssignedCacheIndex;
57  assert(m_head == nullptr);
58  if (nSeenTasks > m_nodeCacheSize) {
59  //need to expand so next time we don't have to do any
60  // memory requests
61  m_nodeCacheSize = nSeenTasks;
62  m_nodeCache.reset( new WaitNode[nSeenTasks] );
63  auto nodeCache = m_nodeCache.get();
64  for(auto it = nodeCache, itEnd = nodeCache+m_nodeCacheSize; it!=itEnd; ++it) {
65  it->m_fromCache=true;
66  }
67  }
68  //this will make sure all cores see the changes
69  m_waiting = true;
70 }
71 
74 {
75  unsigned int index = m_lastAssignedCacheIndex++;
76 
77  WaitNode* returnValue;
78  if( index < m_nodeCacheSize) {
79  returnValue = m_nodeCache.get()+index;
80  } else {
81  returnValue = new WaitNode;
82  returnValue->m_fromCache=false;
83  }
84  returnValue->m_task = iTask;
85  returnValue->m_next = returnValue;
86 
87  return returnValue;
88 }
89 
90 
91 void
93  iTask->increment_ref_count();
94  if(!m_waiting) {
95  if(m_exceptionPtr) {
97  }
98  if(0==iTask->decrement_ref_count()) {
99  tbb::task::spawn(*iTask);
100  }
101  } else {
102  WaitNode* newHead = createNode(iTask);
103  WaitNode* oldHead = m_head.exchange(newHead);
104  newHead->setNextNode(oldHead);
105 
106  //For the case where oldHead != nullptr,
107  // even if 'm_waiting' changed, we don't
108  // have to recheck since we beat 'announce()' in
109  // the ordering of 'm_head.exchange' call so iTask
110  // is guaranteed to be in the link list
111 
112  if(nullptr == oldHead) {
113  if(!m_waiting) {
114  //if finished waiting right before we did the
115  // exchange our task will not be spawned. Also,
116  // additional threads may be calling add() and swapping
117  // heads and linking us to the new head.
118  // It is safe to call announce from multiple threads
119  announce();
120  }
121  }
122  }
123 }
124 
125 void
127 {
128  //Need a temporary storage since one of these tasks could
129  // cause the next event to start processing which would refill
130  // this waiting list after it has been reset
131  WaitNode* n = m_head.exchange(nullptr);
132  WaitNode* next;
133  while(n) {
134  //it is possible that 'WaitingTaskList::add' is running in a different
135  // thread and we have a new 'head' but the old head has not yet been
136  // attached to the new head (we identify this since 'nextNode' will return itself).
137  // In that case we have to wait until the link has been established before going on.
138  while(n == (next=n->nextNode())) {
139  hardware_pause();
140  }
141  auto t = n->m_task;
142  if(m_exceptionPtr) {
143  t->dependentTaskFailed(m_exceptionPtr);
144  }
145  if(0==t->decrement_ref_count()){
146  tbb::task::spawn(*t);
147  }
148  if(!n->m_fromCache ) {
149  delete n;
150  }
151  n=next;
152  }
153 }
154 
155 void
156 WaitingTaskList::doneWaiting(std::exception_ptr iPtr)
157 {
158  m_exceptionPtr = iPtr;
159  m_waiting=false;
160  announce();
161 }
std::atomic< bool > m_waiting
void add(WaitingTask *)
Adds task to the waiting list.
assert(m_qm.get())
std::atomic< unsigned int > m_lastAssignedCacheIndex
std::atomic< WaitNode * > m_head
void reset()
Resets access to the resource so that added tasks will wait.
std::atomic< WaitNode * > m_next
void dependentTaskFailed(std::exception_ptr iPtr)
Called if waited for task failed.
Definition: WaitingTask.h:62
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
std::unique_ptr< WaitNode[]> m_nodeCache
std::exception_ptr m_exceptionPtr
WaitingTaskList(unsigned int iInitialSize=2)
Constructor.
unsigned int m_nodeCacheSize
void setNextNode(WaitNode *iNext)
WaitNode * createNode(WaitingTask *iTask)