CMS 3D CMS Logo

List of all members | Public Member Functions | Private Member Functions | Private Attributes
edm::SerialTaskQueueChain Class Reference

#include <SerialTaskQueueChain.h>

Public Member Functions

std::size_t numberOfQueues () const
 
SerialTaskQueueChainoperator= (const SerialTaskQueueChain &)=delete
 
SerialTaskQueueChainoperator= (SerialTaskQueueChain &&iOld)
 
unsigned long outstandingTasks () const
 
template<typename T >
void push (oneapi::tbb::task_group &iGroup, T &&iAction)
 asynchronously pushes functor iAction into queue More...
 
 SerialTaskQueueChain ()
 
 SerialTaskQueueChain (std::vector< std::shared_ptr< SerialTaskQueue >> iQueues)
 
 SerialTaskQueueChain (const SerialTaskQueueChain &)=delete
 
 SerialTaskQueueChain (SerialTaskQueueChain &&iOld)
 

Private Member Functions

template<typename T >
void actionToRun (T &&iAction)
 
template<typename T >
void passDownChain (unsigned int iIndex, oneapi::tbb::task_group &iGroup, T &&iAction)
 

Private Attributes

std::atomic< unsigned long > m_outstandingTasks {0}
 
std::vector< std::shared_ptr< SerialTaskQueue > > m_queues
 

Detailed Description

Definition at line 32 of file SerialTaskQueueChain.h.

Constructor & Destructor Documentation

◆ SerialTaskQueueChain() [1/4]

edm::SerialTaskQueueChain::SerialTaskQueueChain ( )
inline

Definition at line 34 of file SerialTaskQueueChain.h.

34 {}

◆ SerialTaskQueueChain() [2/4]

edm::SerialTaskQueueChain::SerialTaskQueueChain ( std::vector< std::shared_ptr< SerialTaskQueue >>  iQueues)
inlineexplicit

Definition at line 35 of file SerialTaskQueueChain.h.

36  : m_queues(std::move(iQueues)) {}
std::vector< std::shared_ptr< SerialTaskQueue > > m_queues
def move(src, dest)
Definition: eostools.py:511

◆ SerialTaskQueueChain() [3/4]

edm::SerialTaskQueueChain::SerialTaskQueueChain ( const SerialTaskQueueChain )
delete

◆ SerialTaskQueueChain() [4/4]

edm::SerialTaskQueueChain::SerialTaskQueueChain ( SerialTaskQueueChain &&  iOld)
inline

Definition at line 40 of file SerialTaskQueueChain.h.

41  : m_queues(std::move(iOld.m_queues)), m_outstandingTasks{iOld.m_outstandingTasks.load()} {}
std::vector< std::shared_ptr< SerialTaskQueue > > m_queues
std::atomic< unsigned long > m_outstandingTasks
def move(src, dest)
Definition: eostools.py:511

Member Function Documentation

◆ actionToRun()

template<typename T >
void SerialTaskQueueChain::actionToRun ( T &&  iAction)
private

Definition at line 101 of file SerialTaskQueueChain.h.

Referenced by passDownChain(), and push().

101  {
102  //even if an exception happens we will resume the queues.
103  using Queues = std::vector<std::shared_ptr<SerialTaskQueue>>;
104  auto sentryAction = [](SerialTaskQueueChain* iChain) {
105  auto& vec = iChain->m_queues;
106  for (auto it = vec.rbegin() + 1; it != vec.rend(); ++it) {
107  (*it)->resume();
108  }
109  --(iChain->m_outstandingTasks);
110  };
111 
112  std::unique_ptr<SerialTaskQueueChain, decltype(sentryAction)> sentry(this, sentryAction);
113  iAction();
114  }

◆ numberOfQueues()

std::size_t edm::SerialTaskQueueChain::numberOfQueues ( ) const
inline

Definition at line 60 of file SerialTaskQueueChain.h.

References m_queues.

Referenced by edm::SharedResourcesAcquirer::numberOfResources().

60 { return m_queues.size(); }
std::vector< std::shared_ptr< SerialTaskQueue > > m_queues

◆ operator=() [1/2]

SerialTaskQueueChain& edm::SerialTaskQueueChain::operator= ( const SerialTaskQueueChain )
delete

◆ operator=() [2/2]

SerialTaskQueueChain& edm::SerialTaskQueueChain::operator= ( SerialTaskQueueChain &&  iOld)
inline

Definition at line 43 of file SerialTaskQueueChain.h.

References m_outstandingTasks, m_queues, and eostools::move().

43  {
44  m_queues = std::move(iOld.m_queues);
45  m_outstandingTasks.store(iOld.m_outstandingTasks.load());
46  return *this;
47  }
std::vector< std::shared_ptr< SerialTaskQueue > > m_queues
std::atomic< unsigned long > m_outstandingTasks
def move(src, dest)
Definition: eostools.py:511

◆ outstandingTasks()

unsigned long edm::SerialTaskQueueChain::outstandingTasks ( ) const
inline

Definition at line 59 of file SerialTaskQueueChain.h.

References m_outstandingTasks.

59 { return m_outstandingTasks; }
std::atomic< unsigned long > m_outstandingTasks

◆ passDownChain()

template<typename T >
void SerialTaskQueueChain::passDownChain ( unsigned int  iIndex,
oneapi::tbb::task_group &  iGroup,
T &&  iAction 
)
private

Definition at line 86 of file SerialTaskQueueChain.h.

References actionToRun(), and m_queues.

Referenced by push().

86  {
87  //Have to be sure the queue associated to this running task
88  // does not attempt to start another task
89  m_queues[iQueueIndex - 1]->pause();
90  //is this the last queue?
91  if (iQueueIndex + 1 == m_queues.size()) {
92  m_queues[iQueueIndex]->push(iGroup, [this, iAction]() mutable { this->actionToRun(iAction); });
93  } else {
94  auto nextQueue = iQueueIndex + 1;
95  m_queues[iQueueIndex]->push(
96  iGroup, [this, nextQueue, &iGroup, iAction]() mutable { this->passDownChain(nextQueue, iGroup, iAction); });
97  }
98  }
std::vector< std::shared_ptr< SerialTaskQueue > > m_queues
void passDownChain(unsigned int iIndex, oneapi::tbb::task_group &iGroup, T &&iAction)

◆ push()

template<typename T >
void SerialTaskQueueChain::push ( oneapi::tbb::task_group &  iGroup,
T &&  iAction 
)

asynchronously pushes functor iAction into queue

The function will return immediately and iAction will either process concurrently with the calling thread or wait until the protected resource becomes available or until a CPU becomes available.

Parameters
[in]iActionMust be a functor that takes no arguments and return no values.

Definition at line 75 of file SerialTaskQueueChain.h.

References actionToRun(), cms::cuda::assert(), m_outstandingTasks, m_queues, and passDownChain().

Referenced by edm::EventProcessor::beginLumiAsync(), edm::EventProcessor::beginRunAsync(), edm::EventProcessor::handleNextEventForStreamAsync(), edm::eventsetup::CallbackExternalWork< T, TAcquireFunc, TAcquireReturn, TProduceFunc, TProduceReturn, TRecord, TDecorator >::makeAcquireTask(), edm::Worker::TaskQueueAdaptor::push(), edm::EventProcessor::readAndMergeLumiEntriesAsync(), and edm::EventProcessor::readAndMergeRunEntriesAsync().

75  {
77  if (m_queues.size() == 1) {
78  m_queues[0]->push(iGroup, [this, iAction]() mutable { this->actionToRun(iAction); });
79  } else {
80  assert(!m_queues.empty());
81  m_queues[0]->push(iGroup, [this, &iGroup, iAction]() mutable { this->passDownChain(1, iGroup, iAction); });
82  }
83  }
std::vector< std::shared_ptr< SerialTaskQueue > > m_queues
assert(be >=bs)
void passDownChain(unsigned int iIndex, oneapi::tbb::task_group &iGroup, T &&iAction)
std::atomic< unsigned long > m_outstandingTasks

Member Data Documentation

◆ m_outstandingTasks

std::atomic<unsigned long> edm::SerialTaskQueueChain::m_outstandingTasks {0}
private

Definition at line 65 of file SerialTaskQueueChain.h.

Referenced by operator=(), outstandingTasks(), and push().

◆ m_queues

std::vector<std::shared_ptr<SerialTaskQueue> > edm::SerialTaskQueueChain::m_queues
private

Definition at line 64 of file SerialTaskQueueChain.h.

Referenced by numberOfQueues(), operator=(), passDownChain(), and push().