CMS 3D CMS Logo

List of all members | Classes | Public Member Functions | Private Member Functions | Private Attributes
edm::WaitingTaskList Class Reference

#include <WaitingTaskList.h>

Classes

struct  WaitNode
 

Public Member Functions

void add (WaitingTask *)
 Adds task to the waiting list. More...
 
void doneWaiting (std::exception_ptr iPtr)
 Signals that the resource is now available and tasks should be spawned. More...
 
void presetTaskAsFailed (std::exception_ptr iExcept)
 
void reset ()
 Resets access to the resource so that added tasks will wait. More...
 
 WaitingTaskList (unsigned int iInitialSize=2)
 Constructor. More...
 
 ~WaitingTaskList ()=default
 

Private Member Functions

void announce ()
 
WaitNodecreateNode (WaitingTask *iTask)
 
const WaitingTaskListoperator= (const WaitingTaskList &)=delete
 
 WaitingTaskList (const WaitingTaskList &)=delete
 

Private Attributes

std::exception_ptr m_exceptionPtr
 
std::atomic< WaitNode * > m_head
 
std::atomic< unsigned int > m_lastAssignedCacheIndex
 
std::unique_ptr< WaitNode[]> m_nodeCache
 
unsigned int m_nodeCacheSize
 
std::atomic< bool > m_waiting
 

Detailed Description

Definition at line 101 of file WaitingTaskList.h.

Constructor & Destructor Documentation

◆ WaitingTaskList() [1/2]

WaitingTaskList::WaitingTaskList ( unsigned int  iInitialSize = 2)
explicit

Constructor.

The WaitingTaskList is initial set to waiting.

Parameters
[in]iInitialSizespecifies the initial size of the cache used to hold waiting tasks. The value is only useful for optimization as the object can resize itself.

Definition at line 36 of file WaitingTaskList.cc.

37  : m_head{nullptr},
38  m_nodeCache{new WaitNode[iInitialSize]},
39  m_nodeCacheSize{iInitialSize},
41  m_waiting{true} {
42  auto nodeCache = m_nodeCache.get();
43  for (auto it = nodeCache, itEnd = nodeCache + m_nodeCacheSize; it != itEnd; ++it) {
44  it->m_fromCache = true;
45  }
46 }

◆ ~WaitingTaskList()

edm::WaitingTaskList::~WaitingTaskList ( )
default

◆ WaitingTaskList() [2/2]

edm::WaitingTaskList::WaitingTaskList ( const WaitingTaskList )
privatedelete

Member Function Documentation

◆ add()

void WaitingTaskList::add ( WaitingTask iTask)

Adds task to the waiting list.

If doneWaiting() has already been called then the added task will immediately be spawned. If that is not the case then the task will be held until doneWaiting() is called and will then be spawned. Calls to add() and doneWaiting() can safely be done concurrently.

Definition at line 89 of file WaitingTaskList.cc.

89  {
90  iTask->increment_ref_count();
91  if (!m_waiting) {
92  if (UNLIKELY(bool(m_exceptionPtr))) {
94  }
95  if (0 == iTask->decrement_ref_count()) {
96  tbb::task::spawn(*iTask);
97  }
98  } else {
99  WaitNode* newHead = createNode(iTask);
100  //This exchange is sequentially consistent thereby
101  // ensuring ordering between it and setNextNode
102  WaitNode* oldHead = m_head.exchange(newHead);
103  newHead->setNextNode(oldHead);
104 
105  //For the case where oldHead != nullptr,
106  // even if 'm_waiting' changed, we don't
107  // have to recheck since we beat 'announce()' in
108  // the ordering of 'm_head.exchange' call so iTask
109  // is guaranteed to be in the link list
110 
111  if (nullptr == oldHead) {
112  if (!m_waiting) {
113  //if finished waiting right before we did the
114  // exchange our task will not be spawned. Also,
115  // additional threads may be calling add() and swapping
116  // heads and linking us to the new head.
117  // It is safe to call announce from multiple threads
118  announce();
119  }
120  }
121  }
122 }

References announce(), createNode(), edm::WaitingTask::dependentTaskFailed(), m_exceptionPtr, m_head, m_waiting, edm::WaitingTaskList::WaitNode::setNextNode(), and UNLIKELY.

Referenced by edm::Worker::callWhenDoneAsync(), edm::eventsetup::EventSetupRecordIOVQueue::checkForNewIOVs(), edm::Worker::doWork(), edm::Worker::doWorkAsync(), edm::Worker::doWorkNoPrefetchingAsync(), edm::InputProductResolver::prefetchAsync_(), edm::PuttableProductResolver::prefetchAsync_(), edm::UnscheduledProductResolver::prefetchAsync_(), edm::SwitchProducerProductResolver::prefetchAsync_(), edm::SwitchAliasProductResolver::prefetchAsync_(), edm::NoProcessProductResolver::prefetchAsync_(), edm::Path::processOneOccurrenceAsync(), counter.Counter::register(), SequenceTypes.Task::remove(), SequenceTypes.Task::replace(), and edm::eventsetup::EventSetupRecordIOVQueue::startNewIOVAsync().

◆ announce()

void WaitingTaskList::announce ( )
private

Handles spawning the tasks, safe to call from multiple threads

Definition at line 138 of file WaitingTaskList.cc.

138  {
139  //Need a temporary storage since one of these tasks could
140  // cause the next event to start processing which would refill
141  // this waiting list after it has been reset
142  WaitNode* n = m_head.exchange(nullptr);
143  WaitNode* next;
144  while (n) {
145  //it is possible that 'WaitingTaskList::add' is running in a different
146  // thread and we have a new 'head' but the old head has not yet been
147  // attached to the new head (we identify this since 'nextNode' will return itself).
148  // In that case we have to wait until the link has been established before going on.
149  while (n == (next = n->nextNode())) {
150  hardware_pause();
151  }
152  auto t = n->m_task;
153  if (UNLIKELY(bool(m_exceptionPtr))) {
154  t->dependentTaskFailed(m_exceptionPtr);
155  }
156  if (!n->m_fromCache) {
157  delete n;
158  }
159  n = next;
160 
161  //the task may indirectly call WaitingTaskList::reset
162  // so we need to call spawn after we are done using the node.
163  if (0 == t->decrement_ref_count()) {
164  tbb::task::spawn(*t);
165  }
166  }
167 }

References m_exceptionPtr, m_head, dqmiodumpmetadata::n, GetRecoTauVFromDQM_MC_cff::next, OrderedSet::t, and UNLIKELY.

Referenced by add(), and doneWaiting().

◆ createNode()

WaitingTaskList::WaitNode * WaitingTaskList::createNode ( WaitingTask iTask)
private

Definition at line 70 of file WaitingTaskList.cc.

70  {
71  unsigned int index = m_lastAssignedCacheIndex++;
72 
73  WaitNode* returnValue;
74  if (index < m_nodeCacheSize) {
75  returnValue = m_nodeCache.get() + index;
76  } else {
77  returnValue = new WaitNode;
78  returnValue->m_fromCache = false;
79  }
80  returnValue->m_task = iTask;
81  //No other thread can see m_next yet. The caller to create node
82  // will be doing a synchronization operation anyway which will
83  // make sure m_task and m_next are synched across threads
84  returnValue->m_next.store(returnValue, std::memory_order_relaxed);
85 
86  return returnValue;
87 }

References edm::WaitingTaskList::WaitNode::m_fromCache, m_lastAssignedCacheIndex, edm::WaitingTaskList::WaitNode::m_next, m_nodeCache, m_nodeCacheSize, and edm::WaitingTaskList::WaitNode::m_task.

Referenced by add().

◆ doneWaiting()

void WaitingTaskList::doneWaiting ( std::exception_ptr  iPtr)

◆ operator=()

const WaitingTaskList& edm::WaitingTaskList::operator= ( const WaitingTaskList )
privatedelete

◆ presetTaskAsFailed()

void WaitingTaskList::presetTaskAsFailed ( std::exception_ptr  iExcept)

Use in the case where you need to inform the parent task of a failure before some other child task which may be run later reports a different, but related failure. You must later call doneWaiting with same exception later in the same thread.

Definition at line 124 of file WaitingTaskList.cc.

124  {
125  if (iExcept and m_waiting) {
126  WaitNode* node = m_head.load();
127  while (node) {
128  WaitNode* next;
129  while (node == (next = node->nextNode())) {
130  hardware_pause();
131  }
132  node->m_task->dependentTaskFailed(iExcept);
133  node = next;
134  }
135  }
136 }

References edm::WaitingTask::dependentTaskFailed(), m_head, edm::WaitingTaskList::WaitNode::m_task, m_waiting, GetRecoTauVFromDQM_MC_cff::next, and edm::WaitingTaskList::WaitNode::nextNode().

Referenced by edm::Path::workerFinished().

◆ reset()

void WaitingTaskList::reset ( void  )

Resets access to the resource so that added tasks will wait.

The owner of the resouce calls reset() to make tasks wait. Calling reset() is NOT thread safe. The system must guarantee that no tasks are using the resource when reset() is called and neither add() nor doneWaiting() can be called concurrently with reset().

Definition at line 51 of file WaitingTaskList.cc.

51  {
52  m_exceptionPtr = std::exception_ptr{};
53  unsigned int nSeenTasks = m_lastAssignedCacheIndex;
55  assert(m_head == nullptr);
56  if (nSeenTasks > m_nodeCacheSize) {
57  //need to expand so next time we don't have to do any
58  // memory requests
59  m_nodeCacheSize = nSeenTasks;
60  m_nodeCache.reset(new WaitNode[nSeenTasks]);
61  auto nodeCache = m_nodeCache.get();
62  for (auto it = nodeCache, itEnd = nodeCache + m_nodeCacheSize; it != itEnd; ++it) {
63  it->m_fromCache = true;
64  }
65  }
66  //this will make sure all cores see the changes
67  m_waiting = true;
68 }

References cms::cuda::assert(), m_exceptionPtr, m_head, m_lastAssignedCacheIndex, m_nodeCache, m_nodeCacheSize, and m_waiting.

Referenced by edm::Path::processOneOccurrenceAsync(), edm::Worker::reset(), edm::InputProductResolver::resetProductData_(), edm::PuttableProductResolver::resetProductData_(), edm::UnscheduledProductResolver::resetProductData_(), edm::SwitchBaseProductResolver::resetProductData_(), and edm::NoProcessProductResolver::resetProductData_().

Member Data Documentation

◆ m_exceptionPtr

std::exception_ptr edm::WaitingTaskList::m_exceptionPtr
private

Definition at line 169 of file WaitingTaskList.h.

Referenced by add(), announce(), doneWaiting(), and reset().

◆ m_head

std::atomic<WaitNode*> edm::WaitingTaskList::m_head
private

Definition at line 167 of file WaitingTaskList.h.

Referenced by add(), announce(), presetTaskAsFailed(), and reset().

◆ m_lastAssignedCacheIndex

std::atomic<unsigned int> edm::WaitingTaskList::m_lastAssignedCacheIndex
private

Definition at line 171 of file WaitingTaskList.h.

Referenced by createNode(), and reset().

◆ m_nodeCache

std::unique_ptr<WaitNode[]> edm::WaitingTaskList::m_nodeCache
private

Definition at line 168 of file WaitingTaskList.h.

Referenced by createNode(), and reset().

◆ m_nodeCacheSize

unsigned int edm::WaitingTaskList::m_nodeCacheSize
private

Definition at line 170 of file WaitingTaskList.h.

Referenced by createNode(), and reset().

◆ m_waiting

std::atomic<bool> edm::WaitingTaskList::m_waiting
private

Definition at line 172 of file WaitingTaskList.h.

Referenced by add(), doneWaiting(), presetTaskAsFailed(), and reset().

dqmiodumpmetadata.n
n
Definition: dqmiodumpmetadata.py:28
edm::WaitingTaskList::m_head
std::atomic< WaitNode * > m_head
Definition: WaitingTaskList.h:167
edm::WaitingTaskList::createNode
WaitNode * createNode(WaitingTask *iTask)
Definition: WaitingTaskList.cc:70
edm::WaitingTaskList::m_nodeCache
std::unique_ptr< WaitNode[]> m_nodeCache
Definition: WaitingTaskList.h:168
cms::cuda::assert
assert(be >=bs)
edm::WaitingTaskList::m_waiting
std::atomic< bool > m_waiting
Definition: WaitingTaskList.h:172
edm::WaitingTaskList::m_lastAssignedCacheIndex
std::atomic< unsigned int > m_lastAssignedCacheIndex
Definition: WaitingTaskList.h:171
UNLIKELY
#define UNLIKELY(x)
Definition: Likely.h:21
OrderedSet.t
t
Definition: OrderedSet.py:90
edm::WaitingTask::dependentTaskFailed
void dependentTaskFailed(std::exception_ptr iPtr)
Called if waited for task failed.
Definition: WaitingTask.h:59
hardware_pause
edm::WaitingTaskList::m_nodeCacheSize
unsigned int m_nodeCacheSize
Definition: WaitingTaskList.h:170
edm::WaitingTaskList::announce
void announce()
Definition: WaitingTaskList.cc:138
edm::WaitingTaskList::m_exceptionPtr
std::exception_ptr m_exceptionPtr
Definition: WaitingTaskList.h:169
AlignmentPI::index
index
Definition: AlignmentPayloadInspectorHelper.h:46
GetRecoTauVFromDQM_MC_cff.next
next
Definition: GetRecoTauVFromDQM_MC_cff.py:31