CMS 3D CMS Logo

/afs/cern.ch/work/a/aaltunda/public/www/CMSSW_6_2_7/src/FWCore/Concurrency/interface/SerialTaskQueue.h

Go to the documentation of this file.
00001 #ifndef FWCore_Concurrency_SerialTaskQueue_h
00002 #define FWCore_Concurrency_SerialTaskQueue_h
00003 // -*- C++ -*-
00004 //
00005 // Package:     Concurrency
00006 // Class  :     SerialTaskQueue
00007 // 
00049 //
00050 // Original Author:  Chris Jones
00051 //         Created:  Thu Feb 21 11:14:39 CST 2013
00052 // $Id: SerialTaskQueue.h,v 1.1 2013/02/21 22:14:10 chrjones Exp $
00053 //
00054 
00055 // system include files
00056 #include <atomic>
00057 
00058 #include "tbb/task.h"
00059 #include "tbb/concurrent_queue.h"
00060 
00061 // user include files
00062 
00063 // forward declarations
00064 namespace edm {
00065    class SerialTaskQueue
00066    {
00067    public:
00068       SerialTaskQueue():
00069       m_taskChosen{ATOMIC_FLAG_INIT},
00070       m_pauseCount{0}
00071       {  }
00072       
00073       // ---------- const member functions ---------------------
00075 
00078       bool isPaused() const { return m_pauseCount.load()==0;}
00079       
00080       // ---------- member functions ---------------------------
00082 
00090       bool pause() {
00091          return 1 == ++m_pauseCount;
00092       }
00093       
00095 
00102       bool resume();
00103       
00105 
00111       template<typename T>
00112       void push(const T& iAction);
00113       
00115 
00122       template<typename T>
00123       void pushAndWait(const T& iAction);
00124       
00126 
00134       template<typename T>
00135       tbb::task* pushAndGetNextTaskToRun(const T& iAction);
00136       
00137    private:
00138       SerialTaskQueue(const SerialTaskQueue&) = delete;
00139       const SerialTaskQueue& operator=(const SerialTaskQueue&) = delete;
00140       
00142       class TaskBase : public tbb::task {
00143          friend class SerialTaskQueue;
00144          TaskBase(): m_queue(0) {}
00145          
00146       protected:
00147          tbb::task* finishedTask();
00148       private:
00149          void setQueue(SerialTaskQueue* iQueue) { m_queue = iQueue;}
00150          
00151          SerialTaskQueue* m_queue;
00152       };
00153       
00154       template< typename T>
00155       class QueuedTask : public TaskBase {
00156       public:
00157          QueuedTask( const T& iAction):
00158          m_action(iAction) {}
00159          
00160       private:
00161          tbb::task* execute();
00162          
00163          T m_action;
00164       };
00165       
00166       friend class TaskBase;
00167       
00168       void pushTask(TaskBase*);
00169       tbb::task* pushAndGetNextTask(TaskBase*);
00170       tbb::task* finishedTask();
00171       //returns nullptr if a task is already being processed
00172       TaskBase* pickNextTask();
00173       
00174       void pushAndWait(tbb::empty_task* iWait,TaskBase*);
00175       
00176       
00177       // ---------- member data --------------------------------
00178       tbb::concurrent_queue<TaskBase*> m_tasks;
00179       std::atomic_flag m_taskChosen;
00180       std::atomic<unsigned long> m_pauseCount;
00181    };
00182    
00183    template<typename T>
00184    void SerialTaskQueue::push(const T& iAction) {
00185       QueuedTask<T>* pTask{ new (tbb::task::allocate_root()) QueuedTask<T>{iAction} };
00186       pTask->setQueue(this);
00187       pushTask(pTask);
00188    }
00189    
00190    template<typename T>
00191    void SerialTaskQueue::pushAndWait(const T& iAction) {
00192       tbb::empty_task* waitTask = new (tbb::task::allocate_root()) tbb::empty_task;
00193       waitTask->set_ref_count(2);
00194       QueuedTask<T>* pTask{ new (waitTask->allocate_child()) QueuedTask<T>{iAction} };
00195       pTask->setQueue(this);
00196       pushAndWait(waitTask,pTask);
00197    }
00198    
00199    template<typename T>
00200    tbb::task* SerialTaskQueue::pushAndGetNextTaskToRun(const T& iAction) {
00201       QueuedTask<T>* pTask{ new (tbb::task::allocate_root()) QueuedTask<T>{iAction} };
00202       pTask->setQueue(this);
00203       return pushAndGetNextTask(pTask);
00204    }
00205    
00206    inline
00207    tbb::task*
00208    SerialTaskQueue::TaskBase::finishedTask() {return m_queue->finishedTask();}
00209    
00210    template <typename T>
00211    tbb::task*
00212    SerialTaskQueue::QueuedTask<T>::execute() {
00213       try {
00214          this->m_action();
00215       } catch(...) {}
00216       return this->finishedTask();
00217    }
00218    
00219 }
00220 
00221 #endif