1 #ifndef FWCore_Concurrency_SerialTaskQueueChain_h 2 #define FWCore_Concurrency_SerialTaskQueueChain_h 61 void push(
const T& iAction);
79 std::vector<std::shared_ptr<SerialTaskQueue>>
m_queues;
97 m_queues[0]->push([
this, iAction]() {
107 std::unique_ptr<tbb::task, decltype(destry)> waitTask(
new (tbb::task::allocate_root()) tbb::empty_task, destry );
108 waitTask->set_ref_count(3);
110 std::exception_ptr ptr;
111 auto waitTaskPtr = waitTask.get();
112 push([waitTaskPtr, iAction,&ptr](){
114 auto dec = [](tbb::task* iTask){ iTask->decrement_ref_count();};
115 std::unique_ptr<tbb::task, decltype(dec)> sentry(waitTaskPtr,
dec);
119 ptr = std::current_exception();
123 waitTask->decrement_ref_count();
124 waitTask->wait_for_all();
127 std::rethrow_exception(ptr);
137 if(iQueueIndex +1 ==
m_queues.size()) {
140 auto nextQueue = iQueueIndex+1;
141 m_queues[iQueueIndex]->push([
this, nextQueue, iAction]() {
150 using Queues= std::vector<std::shared_ptr<SerialTaskQueue>>;
152 auto& vec = iChain->m_queues;
153 for(
auto it = vec.rbegin()+1; it != vec.rend(); ++it) {
156 --(iChain->m_outstandingTasks);
159 std::unique_ptr<SerialTaskQueueChain,decltype(sentryAction)> sentry(
this, sentryAction);
unsigned long outstandingTasks() const
SerialTaskQueueChain & operator=(const SerialTaskQueueChain &)=delete
void push(const T &iAction)
asynchronously pushes functor iAction into queue
std::vector< std::shared_ptr< SerialTaskQueue > > m_queues
void actionToRun(const T &iAction)
std::atomic< unsigned long > m_outstandingTasks
std::size_t numberOfQueues() const
void pushAndWait(const T &iAction)
synchronously pushes functor iAction into queue
SerialTaskQueueChain(SerialTaskQueueChain &&iOld)
SerialTaskQueueChain & operator=(SerialTaskQueueChain &&iOld)
SerialTaskQueueChain(std::vector< std::shared_ptr< SerialTaskQueue >> iQueues)
void passDownChain(unsigned int iIndex, const T &iAction)