CMS 3D CMS Logo

List of all members | Classes | Public Member Functions | Private Member Functions | Private Attributes
edm::WaitingTaskList Class Reference

#include <WaitingTaskList.h>

Classes

struct  WaitNode
 

Public Member Functions

void add (WaitingTask *)
 Adds task to the waiting list. More...
 
void doneWaiting (std::exception_ptr iPtr)
 Signals that the resource is now available and tasks should be spawned. More...
 
void presetTaskAsFailed (std::exception_ptr iExcept)
 
void reset ()
 Resets access to the resource so that added tasks will wait. More...
 
 WaitingTaskList (unsigned int iInitialSize=2)
 Constructor. More...
 
 ~WaitingTaskList ()=default
 

Private Member Functions

void announce ()
 
WaitNodecreateNode (WaitingTask *iTask)
 
const WaitingTaskListoperator= (const WaitingTaskList &)=delete
 
 WaitingTaskList (const WaitingTaskList &)=delete
 

Private Attributes

std::exception_ptr m_exceptionPtr
 
std::atomic< WaitNode * > m_head
 
std::atomic< unsigned int > m_lastAssignedCacheIndex
 
std::unique_ptr< WaitNode[]> m_nodeCache
 
unsigned int m_nodeCacheSize
 
std::atomic< bool > m_waiting
 

Detailed Description

Definition at line 102 of file WaitingTaskList.h.

Constructor & Destructor Documentation

WaitingTaskList::WaitingTaskList ( unsigned int  iInitialSize = 2)
explicit

Constructor.

The WaitingTaskList is initial set to waiting.

Parameters
[in]iInitialSizespecifies the initial size of the cache used to hold waiting tasks. The value is only useful for optimization as the object can resize itself.

Definition at line 36 of file WaitingTaskList.cc.

References m_lastAssignedCacheIndex, m_nodeCache, m_nodeCacheSize, and m_waiting.

36  :
37 m_head{nullptr},
38 m_nodeCache{new WaitNode[iInitialSize]},
39 m_nodeCacheSize{iInitialSize},
41 m_waiting{true}
42 {
43  auto nodeCache = m_nodeCache.get();
44  for(auto it = nodeCache, itEnd = nodeCache+m_nodeCacheSize; it!=itEnd; ++it) {
45  it->m_fromCache=true;
46  }
47 }
std::atomic< bool > m_waiting
std::atomic< unsigned int > m_lastAssignedCacheIndex
std::atomic< WaitNode * > m_head
std::unique_ptr< WaitNode[]> m_nodeCache
unsigned int m_nodeCacheSize
edm::WaitingTaskList::~WaitingTaskList ( )
default
edm::WaitingTaskList::WaitingTaskList ( const WaitingTaskList )
privatedelete

Member Function Documentation

void WaitingTaskList::add ( WaitingTask iTask)

Adds task to the waiting list.

If doneWaiting() has already been called then the added task will immediately be spawned. If that is not the case then the task will be held until doneWaiting() is called and will then be spawned. Calls to add() and doneWaiting() can safely be done concurrently.

Definition at line 96 of file WaitingTaskList.cc.

References announce(), createNode(), edm::WaitingTask::dependentTaskFailed(), m_exceptionPtr, m_head, m_waiting, edm::WaitingTaskList::WaitNode::setNextNode(), and UNLIKELY.

Referenced by edm::NoProcessProductResolver::prefetchAsync_(), edm::Path::processOneOccurrenceAsync(), counter.Counter::register(), SequenceTypes.Task::remove(), and SequenceTypes.Task::replace().

96  {
97  iTask->increment_ref_count();
98  if(!m_waiting) {
99  if(UNLIKELY(bool(m_exceptionPtr))) {
101  }
102  if(0==iTask->decrement_ref_count()) {
103  tbb::task::spawn(*iTask);
104  }
105  } else {
106  WaitNode* newHead = createNode(iTask);
107  //This exchange is sequentially consistent thereby
108  // ensuring ordering between it and setNextNode
109  WaitNode* oldHead = m_head.exchange(newHead);
110  newHead->setNextNode(oldHead);
111 
112  //For the case where oldHead != nullptr,
113  // even if 'm_waiting' changed, we don't
114  // have to recheck since we beat 'announce()' in
115  // the ordering of 'm_head.exchange' call so iTask
116  // is guaranteed to be in the link list
117 
118  if(nullptr == oldHead) {
119  if(!m_waiting) {
120  //if finished waiting right before we did the
121  // exchange our task will not be spawned. Also,
122  // additional threads may be calling add() and swapping
123  // heads and linking us to the new head.
124  // It is safe to call announce from multiple threads
125  announce();
126  }
127  }
128  }
129 }
std::atomic< bool > m_waiting
std::atomic< WaitNode * > m_head
void dependentTaskFailed(std::exception_ptr iPtr)
Called if waited for task failed.
Definition: WaitingTask.h:64
std::exception_ptr m_exceptionPtr
WaitNode * createNode(WaitingTask *iTask)
#define UNLIKELY(x)
void WaitingTaskList::announce ( )
private

Handles spawning the tasks, safe to call from multiple threads

Definition at line 147 of file WaitingTaskList.cc.

References m_exceptionPtr, edm::WaitingTaskList::WaitNode::m_fromCache, m_head, edm::WaitingTaskList::WaitNode::m_task, gen::n, GetRecoTauVFromDQM_MC_cff::next, edm::WaitingTaskList::WaitNode::nextNode(), lumiQTWidget::t, and UNLIKELY.

Referenced by add(), and doneWaiting().

148 {
149  //Need a temporary storage since one of these tasks could
150  // cause the next event to start processing which would refill
151  // this waiting list after it has been reset
152  WaitNode* n = m_head.exchange(nullptr);
153  WaitNode* next;
154  while(n) {
155  //it is possible that 'WaitingTaskList::add' is running in a different
156  // thread and we have a new 'head' but the old head has not yet been
157  // attached to the new head (we identify this since 'nextNode' will return itself).
158  // In that case we have to wait until the link has been established before going on.
159  while(n == (next=n->nextNode())) {
160  hardware_pause();
161  }
162  auto t = n->m_task;
163  if(UNLIKELY(bool(m_exceptionPtr))) {
164  t->dependentTaskFailed(m_exceptionPtr);
165  }
166  if(0==t->decrement_ref_count()){
167  tbb::task::spawn(*t);
168  }
169  if(!n->m_fromCache ) {
170  delete n;
171  }
172  n=next;
173  }
174 }
std::atomic< WaitNode * > m_head
std::exception_ptr m_exceptionPtr
#define UNLIKELY(x)
WaitingTaskList::WaitNode * WaitingTaskList::createNode ( WaitingTask iTask)
private

Definition at line 74 of file WaitingTaskList.cc.

References edm::WaitingTaskList::WaitNode::m_fromCache, m_lastAssignedCacheIndex, edm::WaitingTaskList::WaitNode::m_next, m_nodeCache, m_nodeCacheSize, and edm::WaitingTaskList::WaitNode::m_task.

Referenced by add().

75 {
76  unsigned int index = m_lastAssignedCacheIndex++;
77 
78  WaitNode* returnValue;
79  if( index < m_nodeCacheSize) {
80  returnValue = m_nodeCache.get()+index;
81  } else {
82  returnValue = new WaitNode;
83  returnValue->m_fromCache=false;
84  }
85  returnValue->m_task = iTask;
86  //No other thread can see m_next yet. The caller to create node
87  // will be doing a synchronization operation anyway which will
88  // make sure m_task and m_next are synched across threads
89  returnValue->m_next.store(returnValue,std::memory_order_relaxed);
90 
91  return returnValue;
92 }
std::atomic< unsigned int > m_lastAssignedCacheIndex
std::unique_ptr< WaitNode[]> m_nodeCache
unsigned int m_nodeCacheSize
void WaitingTaskList::doneWaiting ( std::exception_ptr  iPtr)

Signals that the resource is now available and tasks should be spawned.

The owner of the resource calls this function to allow the waiting tasks to start accessing it. If the task fails, a non 'null' std::exception_ptr should be used. To have tasks wait again one must call reset(). Calls to add() and doneWaiting() can safely be done concurrently.

Definition at line 177 of file WaitingTaskList.cc.

References announce(), m_exceptionPtr, and m_waiting.

Referenced by edm::Path::finished(), edm::Worker::prePrefetchSelectionAsync(), edm::NoProcessProductResolver::setCache(), and edm::Worker::skipOnPath().

178 {
179  m_exceptionPtr = iPtr;
180  m_waiting=false;
181  announce();
182 }
std::atomic< bool > m_waiting
std::exception_ptr m_exceptionPtr
const WaitingTaskList& edm::WaitingTaskList::operator= ( const WaitingTaskList )
privatedelete
void WaitingTaskList::presetTaskAsFailed ( std::exception_ptr  iExcept)

Use in the case where you need to inform the parent task of a failure before some other child task which may be run later reports a different, but related failure. You must later call doneWaiting with same exception later in the same thread.

Definition at line 132 of file WaitingTaskList.cc.

References edm::WaitingTask::dependentTaskFailed(), m_head, edm::WaitingTaskList::WaitNode::m_task, m_waiting, GetRecoTauVFromDQM_MC_cff::next, and edm::WaitingTaskList::WaitNode::nextNode().

Referenced by edm::Path::workerFinished().

132  {
133  if(iExcept and m_waiting) {
134  WaitNode* node = m_head.load();
135  while(node) {
136  WaitNode* next;
137  while(node == (next=node->nextNode())) {
138  hardware_pause();
139  }
140  node->m_task->dependentTaskFailed(iExcept);
141  node = next;
142  }
143  }
144 }
std::atomic< bool > m_waiting
std::atomic< WaitNode * > m_head
void WaitingTaskList::reset ( void  )

Resets access to the resource so that added tasks will wait.

The owner of the resouce calls reset() to make tasks wait. Calling reset() is NOT thread safe. The system must guarantee that no tasks are using the resource when reset() is called and neither add() nor doneWaiting() can be called concurrently with reset().

Definition at line 53 of file WaitingTaskList.cc.

References m_exceptionPtr, m_head, m_lastAssignedCacheIndex, m_nodeCache, m_nodeCacheSize, and m_waiting.

Referenced by edm::Path::processOneOccurrenceAsync(), and edm::NoProcessProductResolver::resetProductData_().

54 {
55  m_exceptionPtr = std::exception_ptr{};
56  unsigned int nSeenTasks = m_lastAssignedCacheIndex;
58  assert(m_head == nullptr);
59  if (nSeenTasks > m_nodeCacheSize) {
60  //need to expand so next time we don't have to do any
61  // memory requests
62  m_nodeCacheSize = nSeenTasks;
63  m_nodeCache.reset( new WaitNode[nSeenTasks] );
64  auto nodeCache = m_nodeCache.get();
65  for(auto it = nodeCache, itEnd = nodeCache+m_nodeCacheSize; it!=itEnd; ++it) {
66  it->m_fromCache=true;
67  }
68  }
69  //this will make sure all cores see the changes
70  m_waiting = true;
71 }
std::atomic< bool > m_waiting
std::atomic< unsigned int > m_lastAssignedCacheIndex
std::atomic< WaitNode * > m_head
std::unique_ptr< WaitNode[]> m_nodeCache
std::exception_ptr m_exceptionPtr
unsigned int m_nodeCacheSize

Member Data Documentation

std::exception_ptr edm::WaitingTaskList::m_exceptionPtr
private

Definition at line 177 of file WaitingTaskList.h.

Referenced by add(), announce(), doneWaiting(), and reset().

std::atomic<WaitNode*> edm::WaitingTaskList::m_head
private

Definition at line 175 of file WaitingTaskList.h.

Referenced by add(), announce(), presetTaskAsFailed(), and reset().

std::atomic<unsigned int> edm::WaitingTaskList::m_lastAssignedCacheIndex
private

Definition at line 179 of file WaitingTaskList.h.

Referenced by createNode(), reset(), and WaitingTaskList().

std::unique_ptr<WaitNode[]> edm::WaitingTaskList::m_nodeCache
private

Definition at line 176 of file WaitingTaskList.h.

Referenced by createNode(), reset(), and WaitingTaskList().

unsigned int edm::WaitingTaskList::m_nodeCacheSize
private

Definition at line 178 of file WaitingTaskList.h.

Referenced by createNode(), reset(), and WaitingTaskList().

std::atomic<bool> edm::WaitingTaskList::m_waiting
private

Definition at line 180 of file WaitingTaskList.h.

Referenced by add(), doneWaiting(), presetTaskAsFailed(), reset(), and WaitingTaskList().