CMS 3D CMS Logo

List of all members | Classes | Public Member Functions | Private Member Functions | Private Attributes
edm::WaitingTaskList Class Reference

#include <WaitingTaskList.h>

Classes

struct  WaitNode
 

Public Member Functions

void add (oneapi::tbb::task_group *, WaitingTask *)
 Adds task to the waiting list. More...
 
void add (WaitingTaskHolder)
 Adds task to the waiting list. More...
 
void doneWaiting (std::exception_ptr iPtr)
 Signals that the resource is now available and tasks should be spawned. More...
 
const WaitingTaskListoperator= (const WaitingTaskList &)=delete
 
void presetTaskAsFailed (std::exception_ptr iExcept)
 
void reset ()
 Resets access to the resource so that added tasks will wait. More...
 
 WaitingTaskList (unsigned int iInitialSize=2)
 Constructor. More...
 
 WaitingTaskList (const WaitingTaskList &)=delete
 
 ~WaitingTaskList ()=default
 

Private Member Functions

void announce ()
 
WaitNodecreateNode (oneapi::tbb::task_group *iGroup, WaitingTask *iTask)
 

Private Attributes

std::exception_ptr m_exceptionPtr
 
std::atomic< WaitNode * > m_head
 
std::atomic< unsigned int > m_lastAssignedCacheIndex
 
std::unique_ptr< WaitNode[]> m_nodeCache
 
unsigned int m_nodeCacheSize
 
std::atomic< bool > m_waiting
 

Detailed Description

Definition at line 84 of file WaitingTaskList.h.

Constructor & Destructor Documentation

◆ WaitingTaskList() [1/2]

WaitingTaskList::WaitingTaskList ( unsigned int  iInitialSize = 2)
explicit

Constructor.

The WaitingTaskList is initial set to waiting.

Parameters
[in]iInitialSizespecifies the initial size of the cache used to hold waiting tasks. The value is only useful for optimization as the object can resize itself.

Definition at line 38 of file WaitingTaskList.cc.

39  : m_head{nullptr},
40  m_nodeCache{new WaitNode[iInitialSize]},
41  m_nodeCacheSize{iInitialSize},
43  m_waiting{true} {
44  auto nodeCache = m_nodeCache.get();
45  for (auto it = nodeCache, itEnd = nodeCache + m_nodeCacheSize; it != itEnd; ++it) {
46  it->m_fromCache = true;
47  }
48 }
std::atomic< bool > m_waiting
std::atomic< unsigned int > m_lastAssignedCacheIndex
std::atomic< WaitNode * > m_head
std::unique_ptr< WaitNode[]> m_nodeCache
unsigned int m_nodeCacheSize

◆ WaitingTaskList() [2/2]

edm::WaitingTaskList::WaitingTaskList ( const WaitingTaskList )
delete

◆ ~WaitingTaskList()

edm::WaitingTaskList::~WaitingTaskList ( )
default

Member Function Documentation

◆ add() [1/2]

void WaitingTaskList::add ( oneapi::tbb::task_group *  iGroup,
WaitingTask iTask 
)

Adds task to the waiting list.

If doneWaiting() has already been called then the added task will immediately be run. If that is not the case then the task will be held until doneWaiting() is called and will then be run. Calls to add() and doneWaiting() can safely be done concurrently.

Definition at line 125 of file WaitingTaskList.cc.

References announce(), createNode(), edm::TaskBase::decrement_ref_count(), edm::WaitingTask::dependentTaskFailed(), edm::TaskBase::execute(), edm::TaskBase::increment_ref_count(), m_exceptionPtr, m_head, m_waiting, alignCSCRings::s, edm::WaitingTaskList::WaitNode::setNextNode(), and UNLIKELY.

Referenced by edm::Worker::callWhenDoneAsync(), edm::eventsetup::EventSetupRecordIOVQueue::checkForNewIOVs(), edm::Worker::doWorkAsync(), edm::Worker::doWorkNoPrefetchingAsync(), edm::eventsetup::EventSetupRecordIOVQueue::endIOVAsync(), edm::eventsetup::ESSourceProductResolverBase::needToPrefetch(), edm::DelayedReaderInputProductResolver::prefetchAsync_(), edm::PuttableProductResolver::prefetchAsync_(), edm::UnscheduledProductResolver::prefetchAsync_(), edm::TransformingProductResolver::prefetchAsync_(), edm::SwitchProducerProductResolver::prefetchAsync_(), edm::SwitchAliasProductResolver::prefetchAsync_(), edm::NoProcessProductResolver::prefetchAsync_(), edm::eventsetup::ESProductResolverTemplate< ESTestRecordJ, ESTestDataJ >::prefetchAsyncImpl(), edm::eventsetup::CallbackBase< T, TProduceFunc, TProduceReturn, TRecord, TDecorator >::prefetchAsyncImpl(), edm::Path::processOneOccurrenceAsync(), counter.Counter::register(), SequenceTypes._TaskBase::remove(), SequenceTypes._TaskBase::replace(), and edm::eventsetup::EventSetupRecordIOVQueue::startNewIOVAsync().

125  {
126  iTask->increment_ref_count();
127  if (!m_waiting) {
128  if (UNLIKELY(bool(m_exceptionPtr))) {
130  }
131  if (0 == iTask->decrement_ref_count()) {
132  iGroup->run([iTask]() {
133  TaskSentry s{iTask};
134  iTask->execute();
135  });
136  }
137  } else {
138  WaitNode* newHead = createNode(iGroup, iTask);
139  //This exchange is sequentially consistent thereby
140  // ensuring ordering between it and setNextNode
141  WaitNode* oldHead = m_head.exchange(newHead);
142  newHead->setNextNode(oldHead);
143 
144  //For the case where oldHead != nullptr,
145  // even if 'm_waiting' changed, we don't
146  // have to recheck since we beat 'announce()' in
147  // the ordering of 'm_head.exchange' call so iTask
148  // is guaranteed to be in the link list
149 
150  if (nullptr == oldHead) {
151  if (!m_waiting) {
152  //if finished waiting right before we did the
153  // exchange our task will not be run. Also,
154  // additional threads may be calling add() and swapping
155  // heads and linking us to the new head.
156  // It is safe to call announce from multiple threads
157  announce();
158  }
159  }
160  }
161 }
WaitNode * createNode(oneapi::tbb::task_group *iGroup, WaitingTask *iTask)
std::atomic< bool > m_waiting
virtual void execute()=0
std::atomic< WaitNode * > m_head
void dependentTaskFailed(std::exception_ptr iPtr)
Called if waited for task failed.
Definition: WaitingTask.h:68
std::exception_ptr m_exceptionPtr
unsigned int decrement_ref_count()
Definition: TaskBase.h:42
#define UNLIKELY(x)
Definition: Likely.h:21
void increment_ref_count()
Definition: TaskBase.h:41

◆ add() [2/2]

void WaitingTaskList::add ( WaitingTaskHolder  iTask)

Adds task to the waiting list.

Calls to add() and doneWaiting() can safely be done concurrently.

Definition at line 92 of file WaitingTaskList.cc.

References announce(), createNode(), edm::WaitingTaskHolder::doneWaiting(), edm::WaitingTaskHolder::group(), m_exceptionPtr, m_head, m_waiting, edm::WaitingTaskHolder::release_no_decrement(), edm::WaitingTaskList::WaitNode::setNextNode(), and TrackValidation_cff::task.

Referenced by counter.Counter::register(), SequenceTypes._TaskBase::remove(), and SequenceTypes._TaskBase::replace().

92  {
93  if (!m_waiting) {
94  if (m_exceptionPtr) {
96  }
97  } else {
98  auto task = iTask.release_no_decrement();
99  WaitNode* newHead = createNode(iTask.group(), task);
100  //This exchange is sequentially consistent thereby
101  // ensuring ordering between it and setNextNode
102  WaitNode* oldHead = m_head.exchange(newHead);
103  newHead->setNextNode(oldHead);
104 
105  //For the case where oldHead != nullptr,
106  // even if 'm_waiting' changed, we don't
107  // have to recheck since we beat 'announce()' in
108  // the ordering of 'm_head.exchange' call so iTask
109  // is guaranteed to be in the link list
110 
111  if (nullptr == oldHead) {
112  newHead->setNextNode(nullptr);
113  if (!m_waiting) {
114  //if finished waiting right before we did the
115  // exchange our task will not be run. Also,
116  // additional threads may be calling add() and swapping
117  // heads and linking us to the new head.
118  // It is safe to call announce from multiple threads
119  announce();
120  }
121  }
122  }
123 }
WaitNode * createNode(oneapi::tbb::task_group *iGroup, WaitingTask *iTask)
std::atomic< bool > m_waiting
std::atomic< WaitNode * > m_head
oneapi::tbb::task_group * group() const noexcept
WaitingTask * release_no_decrement() noexcept
void doneWaiting(std::exception_ptr iExcept)
std::exception_ptr m_exceptionPtr

◆ announce()

void WaitingTaskList::announce ( )
private

Handles running the tasks, safe to call from multiple threads

Definition at line 177 of file WaitingTaskList.cc.

References g, m_exceptionPtr, m_head, dqmiodumpmetadata::n, GetRecoTauVFromDQM_MC_cff::next, alignCSCRings::s, submitPVValidationJobs::t, and UNLIKELY.

Referenced by add(), and doneWaiting().

177  {
178  //Need a temporary storage since one of these tasks could
179  // cause the next event to start processing which would refill
180  // this waiting list after it has been reset
181  WaitNode* n = m_head.exchange(nullptr);
182  WaitNode* next;
183  while (n) {
184  //it is possible that 'WaitingTaskList::add' is running in a different
185  // thread and we have a new 'head' but the old head has not yet been
186  // attached to the new head (we identify this since 'nextNode' will return itself).
187  // In that case we have to wait until the link has been established before going on.
188  while (n == (next = n->nextNode())) {
189  hardware_pause();
190  }
191  auto t = n->m_task;
192  auto g = n->m_group;
193  if (UNLIKELY(bool(m_exceptionPtr))) {
194  t->dependentTaskFailed(m_exceptionPtr);
195  }
196  if (!n->m_fromCache) {
197  delete n;
198  }
199  n = next;
200 
201  //the task may indirectly call WaitingTaskList::reset
202  // so we need to call spawn after we are done using the node.
203  if (0 == t->decrement_ref_count()) {
204  g->run([t]() {
205  TaskSentry s{t};
206  t->execute();
207  });
208  }
209  }
210 }
std::atomic< WaitNode * > m_head
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e g
Definition: Activities.doc:4
std::exception_ptr m_exceptionPtr
#define UNLIKELY(x)
Definition: Likely.h:21

◆ createNode()

WaitingTaskList::WaitNode * WaitingTaskList::createNode ( oneapi::tbb::task_group *  iGroup,
WaitingTask iTask 
)
private

Definition at line 72 of file WaitingTaskList.cc.

References edm::WaitingTaskList::WaitNode::m_fromCache, edm::WaitingTaskList::WaitNode::m_group, m_lastAssignedCacheIndex, edm::WaitingTaskList::WaitNode::m_next, m_nodeCache, m_nodeCacheSize, and edm::WaitingTaskList::WaitNode::m_task.

Referenced by add().

72  {
73  unsigned int index = m_lastAssignedCacheIndex++;
74 
75  WaitNode* returnValue;
76  if (index < m_nodeCacheSize) {
77  returnValue = m_nodeCache.get() + index;
78  } else {
79  returnValue = new WaitNode;
80  returnValue->m_fromCache = false;
81  }
82  returnValue->m_task = iTask;
83  returnValue->m_group = iGroup;
84  //No other thread can see m_next yet. The caller to create node
85  // will be doing a synchronization operation anyway which will
86  // make sure m_task and m_next are synched across threads
87  returnValue->m_next.store(returnValue, std::memory_order_relaxed);
88 
89  return returnValue;
90 }
std::atomic< unsigned int > m_lastAssignedCacheIndex
std::unique_ptr< WaitNode[]> m_nodeCache
unsigned int m_nodeCacheSize

◆ doneWaiting()

void WaitingTaskList::doneWaiting ( std::exception_ptr  iPtr)

Signals that the resource is now available and tasks should be spawned.

The owner of the resource calls this function to allow the waiting tasks to start accessing it. If the task fails, a non 'null' std::exception_ptr should be used. To have tasks wait again one must call reset(). Calls to add() and doneWaiting() can safely be done concurrently.

Definition at line 212 of file WaitingTaskList.cc.

References announce(), m_exceptionPtr, and m_waiting.

Referenced by edm::Worker::doWorkNoPrefetchingAsync(), edm::Path::finished(), edm::eventsetup::CallbackExternalWork< T, TAcquireFunc, TAcquireReturn, TProduceFunc, TProduceReturn, TRecord, TDecorator >::makeAcquireTask(), edm::eventsetup::CallbackBase< T, TProduceFunc, TProduceReturn, TRecord, TDecorator >::makeProduceTask(), edm::DelayedReaderInputProductResolver::prefetchAsync_(), edm::UnscheduledProductResolver::prefetchAsync_(), edm::TransformingProductResolver::prefetchAsync_(), edm::SwitchProducerProductResolver::prefetchAsync_(), edm::SwitchAliasProductResolver::prefetchAsync_(), edm::eventsetup::ESProductResolverTemplate< ESTestRecordJ, ESTestDataJ >::prefetchAsyncImpl(), edm::eventsetup::ESSourceProductResolverBase::prefetchAsyncImplTemplate(), edm::Worker::prePrefetchSelectionAsync(), edm::SwitchProducerProductResolver::putProduct(), edm::RunProcessingStatus::resetBeginResources(), edm::RunProcessingStatus::resetEndResources(), edm::LuminosityBlockProcessingStatus::resetResources(), edm::Worker::runModuleAfterAsyncPrefetch(), edm::NoProcessProductResolver::setCache(), edm::Worker::skipOnPath(), edm::eventsetup::EventSetupRecordIOVQueue::startNewIOVAsync(), edm::eventsetup::synchronousEventSetupForInstance(), and edm::LuminosityBlockProcessingStatus::~LuminosityBlockProcessingStatus().

212  {
213  m_exceptionPtr = iPtr;
214  m_waiting = false;
215  announce();
216 }
std::atomic< bool > m_waiting
std::exception_ptr m_exceptionPtr

◆ operator=()

const WaitingTaskList& edm::WaitingTaskList::operator= ( const WaitingTaskList )
delete

◆ presetTaskAsFailed()

void WaitingTaskList::presetTaskAsFailed ( std::exception_ptr  iExcept)

Use in the case where you need to inform the parent task of a failure before some other child task which may be run later reports a different, but related failure. You must later call doneWaiting with same exception later in the same thread.

Definition at line 163 of file WaitingTaskList.cc.

References edm::WaitingTask::dependentTaskFailed(), m_head, edm::WaitingTaskList::WaitNode::m_task, m_waiting, GetRecoTauVFromDQM_MC_cff::next, and edm::WaitingTaskList::WaitNode::nextNode().

Referenced by edm::Path::workerFinished().

163  {
164  if (iExcept and m_waiting) {
165  WaitNode* node = m_head.load();
166  while (node) {
167  WaitNode* next;
168  while (node == (next = node->nextNode())) {
169  hardware_pause();
170  }
171  node->m_task->dependentTaskFailed(iExcept);
172  node = next;
173  }
174  }
175 }
std::atomic< bool > m_waiting
std::atomic< WaitNode * > m_head

◆ reset()

void WaitingTaskList::reset ( void  )

Resets access to the resource so that added tasks will wait.

The owner of the resouce calls reset() to make tasks wait. Calling reset() is NOT thread safe. The system must guarantee that no tasks are using the resource when reset() is called and neither add() nor doneWaiting() can be called concurrently with reset().

Definition at line 53 of file WaitingTaskList.cc.

References cms::cuda::assert(), m_exceptionPtr, m_head, m_lastAssignedCacheIndex, m_nodeCache, m_nodeCacheSize, and m_waiting.

Referenced by edm::eventsetup::EventSetupRecordIOVQueue::endIOVAsync(), edm::eventsetup::ESSourceProductResolverBase::invalidateCache(), edm::eventsetup::ESProductResolverTemplate< ESTestRecordJ, ESTestDataJ >::invalidateCache(), edm::eventsetup::CallbackBase< T, TProduceFunc, TProduceReturn, TRecord, TDecorator >::newRecordComing(), edm::Path::processOneOccurrenceAsync(), edm::Worker::reset(), edm::DelayedReaderInputProductResolver::resetProductData_(), edm::UnscheduledProductResolver::resetProductData_(), edm::TransformingProductResolver::resetProductData_(), edm::SwitchBaseProductResolver::resetProductData_(), and edm::NoProcessProductResolver::resetProductData_().

53  {
54  m_exceptionPtr = std::exception_ptr{};
55  unsigned int nSeenTasks = m_lastAssignedCacheIndex;
57  assert(m_head == nullptr);
58  if (nSeenTasks > m_nodeCacheSize) {
59  //need to expand so next time we don't have to do any
60  // memory requests
61  m_nodeCacheSize = nSeenTasks;
62  m_nodeCache = std::make_unique<WaitNode[]>(nSeenTasks);
63  auto nodeCache = m_nodeCache.get();
64  for (auto it = nodeCache, itEnd = nodeCache + m_nodeCacheSize; it != itEnd; ++it) {
65  it->m_fromCache = true;
66  }
67  }
68  //this will make sure all cores see the changes
69  m_waiting = true;
70 }
std::atomic< bool > m_waiting
std::atomic< unsigned int > m_lastAssignedCacheIndex
std::atomic< WaitNode * > m_head
assert(be >=bs)
std::unique_ptr< WaitNode[]> m_nodeCache
std::exception_ptr m_exceptionPtr
unsigned int m_nodeCacheSize

Member Data Documentation

◆ m_exceptionPtr

std::exception_ptr edm::WaitingTaskList::m_exceptionPtr
private

Definition at line 157 of file WaitingTaskList.h.

Referenced by add(), announce(), doneWaiting(), and reset().

◆ m_head

std::atomic<WaitNode*> edm::WaitingTaskList::m_head
private

Definition at line 155 of file WaitingTaskList.h.

Referenced by add(), announce(), presetTaskAsFailed(), and reset().

◆ m_lastAssignedCacheIndex

std::atomic<unsigned int> edm::WaitingTaskList::m_lastAssignedCacheIndex
private

Definition at line 159 of file WaitingTaskList.h.

Referenced by createNode(), and reset().

◆ m_nodeCache

std::unique_ptr<WaitNode[]> edm::WaitingTaskList::m_nodeCache
private

Definition at line 156 of file WaitingTaskList.h.

Referenced by createNode(), and reset().

◆ m_nodeCacheSize

unsigned int edm::WaitingTaskList::m_nodeCacheSize
private

Definition at line 158 of file WaitingTaskList.h.

Referenced by createNode(), and reset().

◆ m_waiting

std::atomic<bool> edm::WaitingTaskList::m_waiting
private

Definition at line 160 of file WaitingTaskList.h.

Referenced by add(), doneWaiting(), presetTaskAsFailed(), and reset().