1 #ifndef FWCore_Concurrency_SerialTaskQueueChain_h 2 #define FWCore_Concurrency_SerialTaskQueueChain_h 56 void push(
T&& iAction);
74 std::vector<std::shared_ptr<SerialTaskQueue>>
m_queues;
99 std::unique_ptr<tbb::task, decltype(destry)> waitTask(
new (tbb::task::allocate_root()) tbb::empty_task, destry);
100 waitTask->set_ref_count(3);
102 std::exception_ptr ptr;
103 auto waitTaskPtr = waitTask.get();
104 push([waitTaskPtr, iAction, &ptr]() {
106 auto dec = [](
tbb::task* iTask) { iTask->decrement_ref_count(); };
107 std::unique_ptr<tbb::task, decltype(dec)> sentry(waitTaskPtr,
dec);
111 ptr = std::current_exception();
115 waitTask->decrement_ref_count();
116 waitTask->wait_for_all();
119 std::rethrow_exception(ptr);
123 template <
typename T>
129 if (iQueueIndex + 1 ==
m_queues.size()) {
132 auto nextQueue = iQueueIndex + 1;
133 m_queues[iQueueIndex]->push([
this, nextQueue, iAction]()
mutable { this->
passDownChain(nextQueue, iAction); });
137 template <
typename T>
140 using Queues = std::vector<std::shared_ptr<SerialTaskQueue>>;
142 auto& vec = iChain->m_queues;
143 for (
auto it = vec.rbegin() + 1; it != vec.rend(); ++it) {
146 --(iChain->m_outstandingTasks);
149 std::unique_ptr<SerialTaskQueueChain, decltype(sentryAction)> sentry(
this, sentryAction);
unsigned long outstandingTasks() const
SerialTaskQueueChain & operator=(const SerialTaskQueueChain &)=delete
std::vector< std::shared_ptr< SerialTaskQueue > > m_queues
void push(T &&iAction)
asynchronously pushes functor iAction into queue
std::atomic< unsigned long > m_outstandingTasks
std::size_t numberOfQueues() const
SerialTaskQueueChain(SerialTaskQueueChain &&iOld)
SerialTaskQueueChain & operator=(SerialTaskQueueChain &&iOld)
void pushAndWait(T &&iAction)
synchronously pushes functor iAction into queue
SerialTaskQueueChain(std::vector< std::shared_ptr< SerialTaskQueue >> iQueues)
void passDownChain(unsigned int iIndex, T &&iAction)
void actionToRun(T &&iAction)