CMS 3D CMS Logo

LimitedTaskQueue.h
Go to the documentation of this file.
1 #ifndef FWCore_Concurrency_LimitedTaskQueue_h
2 #define FWCore_Concurrency_LimitedTaskQueue_h
3 // -*- C++ -*-
4 //
5 // Package: Concurrency
6 // Class : LimitedTaskQueue
7 //
21 //
22 // Original Author: Chris Jones
23 // Created: Thu Feb 21 11:14:39 CST 2013
24 // $Id$
25 //
26 
27 // system include files
28 #include <atomic>
29 #include <vector>
30 #include <memory>
31 
33 
34 // user include files
35 
36 // forward declarations
37 namespace edm {
39  {
40  public:
41  LimitedTaskQueue(unsigned int iLimit):
42  m_queues{iLimit}
43  { }
44 
45 
46  // ---------- member functions ---------------------------
47 
49 
55  template<typename T>
56  void push(const T& iAction);
57 
59 
66  template<typename T>
67  void pushAndWait(const T& iAction);
68 
69  unsigned int concurrencyLimit() const { return m_queues.size(); }
70  private:
71  LimitedTaskQueue(const LimitedTaskQueue&) = delete;
72  const LimitedTaskQueue& operator=(const LimitedTaskQueue&) = delete;
73 
74  // ---------- member data --------------------------------
75  std::vector<SerialTaskQueue> m_queues;
76  };
77 
78  template<typename T>
79  void LimitedTaskQueue::push(const T& iAction) {
80  auto set_to_run = std::make_shared<std::atomic<bool>>(false);
81  for(auto& q: m_queues) {
82  q.push([set_to_run,iAction]() {
83  bool expected = false;
84  if(set_to_run->compare_exchange_strong(expected,true)) {
85  iAction();
86  }
87  });
88  }
89  }
90 
91  template<typename T>
92  void LimitedTaskQueue::pushAndWait(const T& iAction) {
93  tbb::empty_task* waitTask = new (tbb::task::allocate_root()) tbb::empty_task;
94  waitTask->set_ref_count(2);
95  auto set_to_run = std::make_shared<std::atomic<bool>>(false);
96  for(auto& q: m_queues) {
97  q.push([set_to_run,waitTask,iAction]() {
98  bool expected = false;
99  if(set_to_run->compare_exchange_strong(expected,true)) {
100  try {
101  iAction();
102  }catch(...) {}
103  waitTask->decrement_ref_count();
104  }
105  });
106  }
107  waitTask->wait_for_all();
108  tbb::task::destroy(*waitTask);
109  }
110 
111 }
112 
113 #endif
def destroy(e)
Definition: pyrootRender.py:13
unsigned int concurrencyLimit() const
const LimitedTaskQueue & operator=(const LimitedTaskQueue &)=delete
std::vector< SerialTaskQueue > m_queues
void pushAndWait(const T &iAction)
synchronously pushes functor iAction into queue
LimitedTaskQueue(unsigned int iLimit)
void push(const T &iAction)
asynchronously pushes functor iAction into queue
HLT enums.
long double T