CMS 3D CMS Logo

List of all members | Public Member Functions | Private Member Functions | Private Attributes
edm::SerialTaskQueueChain Class Reference

#include <SerialTaskQueueChain.h>

Public Member Functions

std::size_t numberOfQueues () const
 
SerialTaskQueueChainoperator= (const SerialTaskQueueChain &)=delete
 
SerialTaskQueueChainoperator= (SerialTaskQueueChain &&iOld)
 
unsigned long outstandingTasks () const
 
template<typename T >
void push (T &&iAction)
 asynchronously pushes functor iAction into queue More...
 
template<typename T >
void pushAndWait (T &&iAction)
 synchronously pushes functor iAction into queue More...
 
 SerialTaskQueueChain ()
 
 SerialTaskQueueChain (const SerialTaskQueueChain &)=delete
 
 SerialTaskQueueChain (SerialTaskQueueChain &&iOld)
 
 SerialTaskQueueChain (std::vector< std::shared_ptr< SerialTaskQueue >> iQueues)
 

Private Member Functions

template<typename T >
void actionToRun (T &&iAction)
 
template<typename T >
void passDownChain (unsigned int iIndex, T &&iAction)
 

Private Attributes

std::atomic< unsigned long > m_outstandingTasks {0}
 
std::vector< std::shared_ptr< SerialTaskQueue > > m_queues
 

Detailed Description

Definition at line 32 of file SerialTaskQueueChain.h.

Constructor & Destructor Documentation

◆ SerialTaskQueueChain() [1/4]

edm::SerialTaskQueueChain::SerialTaskQueueChain ( )
inline

Definition at line 34 of file SerialTaskQueueChain.h.

34 {}

Referenced by actionToRun().

◆ SerialTaskQueueChain() [2/4]

edm::SerialTaskQueueChain::SerialTaskQueueChain ( std::vector< std::shared_ptr< SerialTaskQueue >>  iQueues)
inlineexplicit

Definition at line 35 of file SerialTaskQueueChain.h.

36  : m_queues(std::move(iQueues)) {}

◆ SerialTaskQueueChain() [3/4]

edm::SerialTaskQueueChain::SerialTaskQueueChain ( const SerialTaskQueueChain )
delete

◆ SerialTaskQueueChain() [4/4]

edm::SerialTaskQueueChain::SerialTaskQueueChain ( SerialTaskQueueChain &&  iOld)
inline

Definition at line 40 of file SerialTaskQueueChain.h.

41  : m_queues(std::move(iOld.m_queues)), m_outstandingTasks{iOld.m_outstandingTasks.load()} {}

Member Function Documentation

◆ actionToRun()

template<typename T >
void SerialTaskQueueChain::actionToRun ( T &&  iAction)
private

Definition at line 138 of file SerialTaskQueueChain.h.

138  {
139  //even if an exception happens we will resume the queues.
140  using Queues = std::vector<std::shared_ptr<SerialTaskQueue>>;
141  auto sentryAction = [](SerialTaskQueueChain* iChain) {
142  auto& vec = iChain->m_queues;
143  for (auto it = vec.rbegin() + 1; it != vec.rend(); ++it) {
144  (*it)->resume();
145  }
146  --(iChain->m_outstandingTasks);
147  };
148 
149  std::unique_ptr<SerialTaskQueueChain, decltype(sentryAction)> sentry(this, sentryAction);
150  iAction();
151  }

References SerialTaskQueueChain().

Referenced by passDownChain(), and push().

◆ numberOfQueues()

std::size_t edm::SerialTaskQueueChain::numberOfQueues ( ) const
inline

Definition at line 71 of file SerialTaskQueueChain.h.

71 { return m_queues.size(); }

References m_queues.

Referenced by edm::SharedResourcesAcquirer::numberOfResources().

◆ operator=() [1/2]

SerialTaskQueueChain& edm::SerialTaskQueueChain::operator= ( const SerialTaskQueueChain )
delete

◆ operator=() [2/2]

SerialTaskQueueChain& edm::SerialTaskQueueChain::operator= ( SerialTaskQueueChain &&  iOld)
inline

Definition at line 43 of file SerialTaskQueueChain.h.

43  {
44  m_queues = std::move(iOld.m_queues);
45  m_outstandingTasks.store(iOld.m_outstandingTasks.load());
46  return *this;
47  }

References m_outstandingTasks, m_queues, and eostools::move().

◆ outstandingTasks()

unsigned long edm::SerialTaskQueueChain::outstandingTasks ( ) const
inline

Definition at line 70 of file SerialTaskQueueChain.h.

70 { return m_outstandingTasks; }

References m_outstandingTasks.

◆ passDownChain()

template<typename T >
void SerialTaskQueueChain::passDownChain ( unsigned int  iIndex,
T &&  iAction 
)
private

Definition at line 124 of file SerialTaskQueueChain.h.

124  {
125  //Have to be sure the queue associated to this running task
126  // does not attempt to start another task
127  m_queues[iQueueIndex - 1]->pause();
128  //is this the last queue?
129  if (iQueueIndex + 1 == m_queues.size()) {
130  m_queues[iQueueIndex]->push([this, iAction]() mutable { this->actionToRun(iAction); });
131  } else {
132  auto nextQueue = iQueueIndex + 1;
133  m_queues[iQueueIndex]->push([this, nextQueue, iAction]() mutable { this->passDownChain(nextQueue, iAction); });
134  }
135  }

References actionToRun(), and m_queues.

Referenced by push().

◆ push()

template<typename T >
void SerialTaskQueueChain::push ( T &&  iAction)

asynchronously pushes functor iAction into queue

The function will return immediately and iAction will either process concurrently with the calling thread or wait until the protected resource becomes available or until a CPU becomes available.

Parameters
[in]iActionMust be a functor that takes no arguments and return no values.

Definition at line 86 of file SerialTaskQueueChain.h.

86  {
88  if (m_queues.size() == 1) {
89  m_queues[0]->push([this, iAction]() mutable { this->actionToRun(iAction); });
90  } else {
91  assert(!m_queues.empty());
92  m_queues[0]->push([this, iAction]() mutable { this->passDownChain(1, iAction); });
93  }
94  }

References actionToRun(), cms::cuda::assert(), m_outstandingTasks, m_queues, and passDownChain().

Referenced by edm::EventProcessor::beginLumiAsync(), edm::EventProcessor::handleNextEventForStreamAsync(), edm::Worker::TaskQueueAdaptor::push(), and pushAndWait().

◆ pushAndWait()

template<typename T >
void SerialTaskQueueChain::pushAndWait ( T &&  iAction)

synchronously pushes functor iAction into queue

The function will wait until iAction has completed before returning. If another task is already running on the queue, the system is allowed to find another TBB task to execute while waiting for the iAction to finish. In that way the core is not idled while waiting.

Parameters
[in]iActionMust be a functor that takes no arguments and return no values.

Definition at line 97 of file SerialTaskQueueChain.h.

97  {
98  auto destry = [](tbb::task* iTask) { tbb::task::destroy(*iTask); };
99 
100  std::unique_ptr<tbb::task, decltype(destry)> waitTask(new (tbb::task::allocate_root()) tbb::empty_task, destry);
101  waitTask->set_ref_count(3);
102 
103  std::exception_ptr ptr;
104  auto waitTaskPtr = waitTask.get();
105  push([waitTaskPtr, iAction, &ptr]() {
106  //must wait until exception ptr would be set
107  auto dec = [](tbb::task* iTask) { iTask->decrement_ref_count(); };
108  std::unique_ptr<tbb::task, decltype(dec)> sentry(waitTaskPtr, dec);
109  // Caught exception is rethrown further below.
110  CMS_SA_ALLOW try { iAction(); } catch (...) {
111  ptr = std::current_exception();
112  }
113  });
114 
115  waitTask->decrement_ref_count();
116  waitTask->wait_for_all();
117 
118  if (ptr) {
119  std::rethrow_exception(ptr);
120  }
121  }

References CMS_SA_ALLOW, TauDecayModes::dec, push(), and TrackValidation_cff::task.

Referenced by edm::Worker::TaskQueueAdaptor::pushAndWait().

Member Data Documentation

◆ m_outstandingTasks

std::atomic<unsigned long> edm::SerialTaskQueueChain::m_outstandingTasks {0}
private

Definition at line 76 of file SerialTaskQueueChain.h.

Referenced by operator=(), outstandingTasks(), and push().

◆ m_queues

std::vector<std::shared_ptr<SerialTaskQueue> > edm::SerialTaskQueueChain::m_queues
private

Definition at line 75 of file SerialTaskQueueChain.h.

Referenced by numberOfQueues(), operator=(), passDownChain(), and push().

edm::SerialTaskQueueChain::actionToRun
void actionToRun(T &&iAction)
Definition: SerialTaskQueueChain.h:138
edm::SerialTaskQueueChain::SerialTaskQueueChain
SerialTaskQueueChain()
Definition: SerialTaskQueueChain.h:34
cms::cuda::assert
assert(be >=bs)
CMS_SA_ALLOW
#define CMS_SA_ALLOW
Definition: thread_safety_macros.h:5
edm::SerialTaskQueueChain::m_outstandingTasks
std::atomic< unsigned long > m_outstandingTasks
Definition: SerialTaskQueueChain.h:76
edm::SerialTaskQueueChain::push
void push(T &&iAction)
asynchronously pushes functor iAction into queue
Definition: SerialTaskQueueChain.h:86
TrackValidation_cff.task
task
Definition: TrackValidation_cff.py:252
cms::cuda::device::unique_ptr
std::unique_ptr< T, impl::DeviceDeleter > unique_ptr
Definition: device_unique_ptr.h:33
eostools.move
def move(src, dest)
Definition: eostools.py:511
SerialTaskQueueChain
edm::SerialTaskQueueChain::m_queues
std::vector< std::shared_ptr< SerialTaskQueue > > m_queues
Definition: SerialTaskQueueChain.h:75
TauDecayModes.dec
dec
Definition: TauDecayModes.py:143
edm::SerialTaskQueueChain::passDownChain
void passDownChain(unsigned int iIndex, T &&iAction)
Definition: SerialTaskQueueChain.h:124