CMS 3D CMS Logo

List of all members | Public Member Functions | Private Member Functions | Private Attributes
edm::SerialTaskQueueChain Class Reference

#include <SerialTaskQueueChain.h>

Public Member Functions

std::size_t numberOfQueues () const
 
SerialTaskQueueChainoperator= (const SerialTaskQueueChain &)=delete
 
SerialTaskQueueChainoperator= (SerialTaskQueueChain &&iOld)
 
unsigned long outstandingTasks () const
 
template<typename T >
void push (T &&iAction)
 asynchronously pushes functor iAction into queue More...
 
template<typename T >
void pushAndWait (T &&iAction)
 synchronously pushes functor iAction into queue More...
 
 SerialTaskQueueChain ()
 
 SerialTaskQueueChain (std::vector< std::shared_ptr< SerialTaskQueue >> iQueues)
 
 SerialTaskQueueChain (const SerialTaskQueueChain &)=delete
 
 SerialTaskQueueChain (SerialTaskQueueChain &&iOld)
 

Private Member Functions

template<typename T >
void actionToRun (T &&iAction)
 
template<typename T >
void passDownChain (unsigned int iIndex, T &&iAction)
 

Private Attributes

std::atomic< unsigned long > m_outstandingTasks {0}
 
std::vector< std::shared_ptr< SerialTaskQueue > > m_queues
 

Detailed Description

Definition at line 31 of file SerialTaskQueueChain.h.

Constructor & Destructor Documentation

edm::SerialTaskQueueChain::SerialTaskQueueChain ( )
inline

Definition at line 35 of file SerialTaskQueueChain.h.

Referenced by SerialTaskQueueChain().

35 {}
edm::SerialTaskQueueChain::SerialTaskQueueChain ( std::vector< std::shared_ptr< SerialTaskQueue >>  iQueues)
inlineexplicit

Definition at line 36 of file SerialTaskQueueChain.h.

References operator=(), and SerialTaskQueueChain().

36  :
37  m_queues(std::move(iQueues)) {}
std::vector< std::shared_ptr< SerialTaskQueue > > m_queues
def move(src, dest)
Definition: eostools.py:510
edm::SerialTaskQueueChain::SerialTaskQueueChain ( const SerialTaskQueueChain )
delete
edm::SerialTaskQueueChain::SerialTaskQueueChain ( SerialTaskQueueChain &&  iOld)
inline

Definition at line 41 of file SerialTaskQueueChain.h.

41  :
42  m_queues(std::move(iOld.m_queues)),
43  m_outstandingTasks{ iOld.m_outstandingTasks.load() } {}
std::vector< std::shared_ptr< SerialTaskQueue > > m_queues
std::atomic< unsigned long > m_outstandingTasks
def move(src, dest)
Definition: eostools.py:510

Member Function Documentation

template<typename T >
void SerialTaskQueueChain::actionToRun ( T &&  iAction)
private

Definition at line 148 of file SerialTaskQueueChain.h.

Referenced by passDownChain(), and push().

148  {
149  //even if an exception happens we will resume the queues.
150  using Queues= std::vector<std::shared_ptr<SerialTaskQueue>>;
151  auto sentryAction = [](SerialTaskQueueChain* iChain) {
152  auto& vec = iChain->m_queues;
153  for(auto it = vec.rbegin()+1; it != vec.rend(); ++it) {
154  (*it)->resume();
155  }
156  --(iChain->m_outstandingTasks);
157  };
158 
159  std::unique_ptr<SerialTaskQueueChain,decltype(sentryAction)> sentry( this, sentryAction);
160  iAction();
161  }
std::size_t edm::SerialTaskQueueChain::numberOfQueues ( ) const
inline

Definition at line 75 of file SerialTaskQueueChain.h.

References m_queues.

Referenced by edm::SharedResourcesAcquirer::numberOfResources().

75 {return m_queues.size(); }
std::vector< std::shared_ptr< SerialTaskQueue > > m_queues
SerialTaskQueueChain& edm::SerialTaskQueueChain::operator= ( const SerialTaskQueueChain )
delete

Referenced by SerialTaskQueueChain().

SerialTaskQueueChain& edm::SerialTaskQueueChain::operator= ( SerialTaskQueueChain &&  iOld)
inline

Definition at line 45 of file SerialTaskQueueChain.h.

References m_outstandingTasks, m_queues, eostools::move(), push(), and pushAndWait().

45  {
46  m_queues = std::move(iOld.m_queues);
47  m_outstandingTasks.store( iOld.m_outstandingTasks.load());
48  return *this;
49  }
std::vector< std::shared_ptr< SerialTaskQueue > > m_queues
std::atomic< unsigned long > m_outstandingTasks
def move(src, dest)
Definition: eostools.py:510
unsigned long edm::SerialTaskQueueChain::outstandingTasks ( ) const
inline

Definition at line 74 of file SerialTaskQueueChain.h.

References m_outstandingTasks.

74 { return m_outstandingTasks; }
std::atomic< unsigned long > m_outstandingTasks
template<typename T >
void SerialTaskQueueChain::passDownChain ( unsigned int  iIndex,
T &&  iAction 
)
private

Definition at line 132 of file SerialTaskQueueChain.h.

References actionToRun(), and m_queues.

Referenced by push().

132  {
133  //Have to be sure the queue associated to this running task
134  // does not attempt to start another task
135  m_queues[iQueueIndex-1]->pause();
136  //is this the last queue?
137  if(iQueueIndex +1 == m_queues.size()) {
138  m_queues[iQueueIndex]->push([this,iAction]() mutable { this->actionToRun(iAction); });
139  } else {
140  auto nextQueue = iQueueIndex+1;
141  m_queues[iQueueIndex]->push([this, nextQueue, iAction]() mutable {
142  this->passDownChain(nextQueue, iAction);
143  });
144  }
145  }
std::vector< std::shared_ptr< SerialTaskQueue > > m_queues
void passDownChain(unsigned int iIndex, T &&iAction)
template<typename T >
void SerialTaskQueueChain::push ( T &&  iAction)

asynchronously pushes functor iAction into queue

The function will return immediately and iAction will either process concurrently with the calling thread or wait until the protected resource becomes available or until a CPU becomes available.

Parameters
[in]iActionMust be a functor that takes no arguments and return no values.

Definition at line 91 of file SerialTaskQueueChain.h.

References actionToRun(), m_outstandingTasks, m_queues, and passDownChain().

Referenced by edm::EventProcessor::beginLumiAsync(), edm::EventProcessor::handleNextEventForStreamAsync(), operator=(), edm::InputProductResolver::prefetchAsync_(), edm::Worker::TaskQueueAdaptor::push(), pushAndWait(), and edm::ReducedProvenanceReader::ReducedProvenanceReader().

91  {
93  if(m_queues.size() == 1) {
94  m_queues[0]->push( [this,iAction]() mutable {this->actionToRun(iAction);} );
95  } else {
96  assert(!m_queues.empty());
97  m_queues[0]->push([this, iAction]() mutable {
98  this->passDownChain(1, iAction);
99  });
100  }
101  }
std::vector< std::shared_ptr< SerialTaskQueue > > m_queues
std::atomic< unsigned long > m_outstandingTasks
void passDownChain(unsigned int iIndex, T &&iAction)
template<typename T >
void SerialTaskQueueChain::pushAndWait ( T &&  iAction)

synchronously pushes functor iAction into queue

The function will wait until iAction has completed before returning. If another task is already running on the queue, the system is allowed to find another TBB task to execute while waiting for the iAction to finish. In that way the core is not idled while waiting.

Parameters
[in]iActionMust be a functor that takes no arguments and return no values.

Definition at line 104 of file SerialTaskQueueChain.h.

References TauDecayModes::dec, pyrootRender::destroy(), and push().

Referenced by operator=(), and edm::Worker::TaskQueueAdaptor::pushAndWait().

104  {
105  auto destry = [](tbb::task* iTask) { tbb::task::destroy(*iTask); };
106 
107  std::unique_ptr<tbb::task, decltype(destry)> waitTask( new (tbb::task::allocate_root()) tbb::empty_task, destry );
108  waitTask->set_ref_count(3);
109 
110  std::exception_ptr ptr;
111  auto waitTaskPtr = waitTask.get();
112  push([waitTaskPtr, iAction,&ptr](){
113  //must wait until exception ptr would be set
114  auto dec = [](tbb::task* iTask){ iTask->decrement_ref_count();};
115  std::unique_ptr<tbb::task, decltype(dec)> sentry(waitTaskPtr,dec);
116  try {
117  iAction();
118  }catch(...) {
119  ptr = std::current_exception();
120  }
121  });
122 
123  waitTask->decrement_ref_count();
124  waitTask->wait_for_all();
125 
126  if(ptr) {
127  std::rethrow_exception(ptr);
128  }
129  }
def destroy(e)
Definition: pyrootRender.py:13
void push(T &&iAction)
asynchronously pushes functor iAction into queue

Member Data Documentation

std::atomic<unsigned long> edm::SerialTaskQueueChain::m_outstandingTasks {0}
private

Definition at line 80 of file SerialTaskQueueChain.h.

Referenced by operator=(), outstandingTasks(), and push().

std::vector<std::shared_ptr<SerialTaskQueue> > edm::SerialTaskQueueChain::m_queues
private

Definition at line 79 of file SerialTaskQueueChain.h.

Referenced by numberOfQueues(), operator=(), passDownChain(), and push().