CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
WaitingTaskList.cc
Go to the documentation of this file.
1 // -*- C++ -*-
2 //
3 // Package: Concurrency
4 // Class : WaitingTaskList
5 //
6 // Implementation:
7 // [Notes on implementation]
8 //
9 // Original Author: Chris Jones
10 // Created: Thu Feb 21 13:46:45 CST 2013
11 // $Id$
12 //
13 
14 // system include files
15 
16 // user include files
17 #include "tbb/task.h"
18 #include <cassert>
19 
22 
23 using namespace edm;
24 //
25 // constants, enums and typedefs
26 //
27 
28 //
29 // static data member definitions
30 //
31 
32 //
33 // constructors and destructor
34 //
35 WaitingTaskList::WaitingTaskList(unsigned int iInitialSize):
36 m_head{0},
37 m_nodeCache{new WaitNode[iInitialSize]},
38 m_nodeCacheSize{iInitialSize},
39 m_lastAssignedCacheIndex{0},
40 m_waiting{true}
41 {
42  for(auto it = m_nodeCache, itEnd = m_nodeCache+m_nodeCacheSize; it!=itEnd; ++it) {
43  it->m_fromCache=true;
44  }
45 }
46 
47 // WaitingTaskList::WaitingTaskList(const WaitingTaskList& rhs)
48 // {
49 // // do actual copying here;
50 // }
51 
53 {
54  delete [] m_nodeCache;
55 }
56 
57 //
58 // assignment operators
59 //
60 // const WaitingTaskList& WaitingTaskList::operator=(const WaitingTaskList& rhs)
61 // {
62 // //An exception safe implementation is
63 // WaitingTaskList temp(rhs);
64 // swap(rhs);
65 //
66 // return *this;
67 // }
68 
69 //
70 // member functions
71 //
72 void
74 {
75  m_waiting = true;
76  unsigned int nSeenTasks = m_lastAssignedCacheIndex;
78  assert(m_head == 0);
79  if (nSeenTasks > m_nodeCacheSize) {
80  //need to expand so next time we don't have to do any
81  // memory requests
82  delete [] m_nodeCache;
83  m_nodeCacheSize = nSeenTasks;
84  m_nodeCache = new WaitNode[nSeenTasks];
85  for(auto it = m_nodeCache, itEnd = m_nodeCache+m_nodeCacheSize; it!=itEnd; ++it) {
86  it->m_fromCache=true;
87  }
88  }
89 }
90 
92 WaitingTaskList::createNode(tbb::task* iTask)
93 {
94  unsigned int index = m_lastAssignedCacheIndex++;
95 
96  WaitNode* returnValue;
97  if( index < m_nodeCacheSize) {
98  returnValue = m_nodeCache+index;
99  } else {
100  returnValue = new WaitNode;
101  returnValue->m_fromCache=false;
102  }
103  returnValue->m_task = iTask;
104  returnValue->m_next = returnValue;
105 
106  return returnValue;
107 }
108 
109 
110 void
111 WaitingTaskList::add(tbb::task* iTask) {
112  iTask->increment_ref_count();
113  if(!m_waiting) {
114  if(0==iTask->decrement_ref_count()) {
115  tbb::task::spawn(*iTask);
116  }
117  } else {
118  WaitNode* newHead = createNode(iTask);
119  WaitNode* oldHead = m_head.exchange(newHead);
120  if(oldHead) {
121  newHead->setNextNode(oldHead);
122  //NOTE: even if 'm_waiting' changed, we don't
123  // have to recheck since we beat 'announce()' in
124  // the ordering of 'm_head.exchange' call so iTask
125  // is guaranteed to be in the link list
126  } else {
127  newHead->setNextNode(0);
128  if(!m_waiting) {
129  //if finished waiting right before we did the
130  // exchange our task will not be spawned. Also,
131  // additional threads may be calling add() and swapping
132  // heads and linking us to the new head.
133  // It is safe to call announce from multiple threads
134  announce();
135  }
136 
137  }
138  }
139 }
140 
141 void
143 {
144  //Need a temporary storage since one of these tasks could
145  // cause the next event to start processing which would refill
146  // this waiting list after it has been reset
147  WaitNode* n = m_head.exchange(0);
148  WaitNode* next;
149  while(n) {
150  //it is possible that 'WaitingTaskList::add' is running in a different
151  // thread and we have a new 'head' but the old head has not yet been
152  // attached to the new head (we identify this since 'nextNode' will return itself).
153  // In that case we have to wait until the link has been established before going on.
154  while(n == (next=n->nextNode())) {
155  hardware_pause();
156  }
157  auto t = n->m_task;
158  if(0==t->decrement_ref_count()){
159  tbb::task::spawn(*t);
160  }
161  if(!n->m_fromCache ) {
162  delete n;
163  }
164  n=next;
165  }
166 }
167 
168 void
170 {
171  m_waiting=false;
172  announce();
173 }
WaitNode * createNode(tbb::task *iTask)
std::atomic< bool > m_waiting
assert(m_qm.get())
std::atomic< unsigned int > m_lastAssignedCacheIndex
std::atomic< WaitNode * > m_head
void reset()
Resets access to the resource so that added tasks will wait.
std::atomic< WaitNode * > m_next
void add(tbb::task *)
Adds task to the waiting list.
WaitingTaskList(unsigned int iInitialSize=2)
Constructor.
void doneWaiting()
Signals that the resource is now available and tasks should be spawned.
unsigned int m_nodeCacheSize
void setNextNode(WaitNode *iNext)