00001 #ifndef FWCore_Concurrency_SerialTaskQueue_h 00002 #define FWCore_Concurrency_SerialTaskQueue_h 00003 // -*- C++ -*- 00004 // 00005 // Package: Concurrency 00006 // Class : SerialTaskQueue 00007 // 00049 // 00050 // Original Author: Chris Jones 00051 // Created: Thu Feb 21 11:14:39 CST 2013 00052 // $Id: SerialTaskQueue.h,v 1.1 2013/02/21 22:14:10 chrjones Exp $ 00053 // 00054 00055 // system include files 00056 #include <atomic> 00057 00058 #include "tbb/task.h" 00059 #include "tbb/concurrent_queue.h" 00060 00061 // user include files 00062 00063 // forward declarations 00064 namespace edm { 00065 class SerialTaskQueue 00066 { 00067 public: 00068 SerialTaskQueue(): 00069 m_taskChosen{ATOMIC_FLAG_INIT}, 00070 m_pauseCount{0} 00071 { } 00072 00073 // ---------- const member functions --------------------- 00075 00078 bool isPaused() const { return m_pauseCount.load()==0;} 00079 00080 // ---------- member functions --------------------------- 00082 00090 bool pause() { 00091 return 1 == ++m_pauseCount; 00092 } 00093 00095 00102 bool resume(); 00103 00105 00111 template<typename T> 00112 void push(const T& iAction); 00113 00115 00122 template<typename T> 00123 void pushAndWait(const T& iAction); 00124 00126 00134 template<typename T> 00135 tbb::task* pushAndGetNextTaskToRun(const T& iAction); 00136 00137 private: 00138 SerialTaskQueue(const SerialTaskQueue&) = delete; 00139 const SerialTaskQueue& operator=(const SerialTaskQueue&) = delete; 00140 00142 class TaskBase : public tbb::task { 00143 friend class SerialTaskQueue; 00144 TaskBase(): m_queue(0) {} 00145 00146 protected: 00147 tbb::task* finishedTask(); 00148 private: 00149 void setQueue(SerialTaskQueue* iQueue) { m_queue = iQueue;} 00150 00151 SerialTaskQueue* m_queue; 00152 }; 00153 00154 template< typename T> 00155 class QueuedTask : public TaskBase { 00156 public: 00157 QueuedTask( const T& iAction): 00158 m_action(iAction) {} 00159 00160 private: 00161 tbb::task* execute(); 00162 00163 T m_action; 00164 }; 00165 00166 friend class TaskBase; 00167 00168 void pushTask(TaskBase*); 00169 tbb::task* pushAndGetNextTask(TaskBase*); 00170 tbb::task* finishedTask(); 00171 //returns nullptr if a task is already being processed 00172 TaskBase* pickNextTask(); 00173 00174 void pushAndWait(tbb::empty_task* iWait,TaskBase*); 00175 00176 00177 // ---------- member data -------------------------------- 00178 tbb::concurrent_queue<TaskBase*> m_tasks; 00179 std::atomic_flag m_taskChosen; 00180 std::atomic<unsigned long> m_pauseCount; 00181 }; 00182 00183 template<typename T> 00184 void SerialTaskQueue::push(const T& iAction) { 00185 QueuedTask<T>* pTask{ new (tbb::task::allocate_root()) QueuedTask<T>{iAction} }; 00186 pTask->setQueue(this); 00187 pushTask(pTask); 00188 } 00189 00190 template<typename T> 00191 void SerialTaskQueue::pushAndWait(const T& iAction) { 00192 tbb::empty_task* waitTask = new (tbb::task::allocate_root()) tbb::empty_task; 00193 waitTask->set_ref_count(2); 00194 QueuedTask<T>* pTask{ new (waitTask->allocate_child()) QueuedTask<T>{iAction} }; 00195 pTask->setQueue(this); 00196 pushAndWait(waitTask,pTask); 00197 } 00198 00199 template<typename T> 00200 tbb::task* SerialTaskQueue::pushAndGetNextTaskToRun(const T& iAction) { 00201 QueuedTask<T>* pTask{ new (tbb::task::allocate_root()) QueuedTask<T>{iAction} }; 00202 pTask->setQueue(this); 00203 return pushAndGetNextTask(pTask); 00204 } 00205 00206 inline 00207 tbb::task* 00208 SerialTaskQueue::TaskBase::finishedTask() {return m_queue->finishedTask();} 00209 00210 template <typename T> 00211 tbb::task* 00212 SerialTaskQueue::QueuedTask<T>::execute() { 00213 try { 00214 this->m_action(); 00215 } catch(...) {} 00216 return this->finishedTask(); 00217 } 00218 00219 } 00220 00221 #endif