CMS 3D CMS Logo

LimitedTaskQueue.h
Go to the documentation of this file.
1 #ifndef FWCore_Concurrency_LimitedTaskQueue_h
2 #define FWCore_Concurrency_LimitedTaskQueue_h
3 // -*- C++ -*-
4 //
5 // Package: Concurrency
6 // Class : LimitedTaskQueue
7 //
21 //
22 // Original Author: Chris Jones
23 // Created: Thu Feb 21 11:14:39 CST 2013
24 // $Id$
25 //
26 
27 // system include files
28 #include <atomic>
29 #include <vector>
30 #include <memory>
31 
33 
34 // user include files
35 
36 // forward declarations
37 namespace edm {
39  {
40  public:
41  LimitedTaskQueue(unsigned int iLimit):
42  m_queues{iLimit}
43  { }
44 
45 
46  // ---------- member functions ---------------------------
47 
49 
55  template<typename T>
56  void push(T&& iAction);
57 
59 
66  template<typename T>
67  void pushAndWait(T&& iAction);
68 
69  class Resumer {
70  public:
71  friend class LimitedTaskQueue;
72 
73  Resumer() = default;
75  resume();
76  }
77 
78  Resumer(Resumer&& iOther): m_queue(iOther.m_queue){
79  iOther.m_queue = nullptr;
80  }
81 
82  Resumer(Resumer const& iOther): m_queue(iOther.m_queue) {
83  if(m_queue) {
84  m_queue->pause();
85  }
86  }
87 
88  Resumer& operator=(Resumer const& iOther) {
89  auto t = iOther;
90  return (*this = std::move(t));
91  }
92  Resumer& operator=(Resumer&& iOther) {
93  if(m_queue) { m_queue->resume();}
94  m_queue = iOther.m_queue;
95  iOther.m_queue = nullptr;
96  return *this;
97  }
98 
99  bool resume() {
100  if(m_queue) {
101  auto q = m_queue;
102  m_queue = nullptr;
103  return q->resume();
104  }
105  return false;
106  }
107  private:
108  Resumer(SerialTaskQueue* iQueue): m_queue{iQueue}{}
110  };
111 
113 
117  template<typename T>
118  void pushAndPause(T&& iAction);
119 
120  unsigned int concurrencyLimit() const { return m_queues.size(); }
121  private:
122  LimitedTaskQueue(const LimitedTaskQueue&) = delete;
123  const LimitedTaskQueue& operator=(const LimitedTaskQueue&) = delete;
124 
125  // ---------- member data --------------------------------
126  std::vector<SerialTaskQueue> m_queues;
127  };
128 
129  template<typename T>
130  void LimitedTaskQueue::push(T&& iAction) {
131  auto set_to_run = std::make_shared<std::atomic<bool>>(false);
132  for(auto& q: m_queues) {
133  q.push([set_to_run,iAction]() mutable{
134  bool expected = false;
135  if(set_to_run->compare_exchange_strong(expected,true)) {
136  iAction();
137  }
138  });
139  }
140  }
141 
142  template<typename T>
144  tbb::empty_task* waitTask = new (tbb::task::allocate_root()) tbb::empty_task;
145  waitTask->set_ref_count(2);
146  auto set_to_run = std::make_shared<std::atomic<bool>>(false);
147  for(auto& q: m_queues) {
148  q.push([set_to_run,waitTask,iAction]() mutable{
149  bool expected = false;
150  if(set_to_run->compare_exchange_strong(expected,true)) {
151  try {
152  iAction();
153  }catch(...) {}
154  waitTask->decrement_ref_count();
155  }
156  });
157  }
158  waitTask->wait_for_all();
159  tbb::task::destroy(*waitTask);
160  }
161 
162  template<typename T>
164  auto set_to_run = std::make_shared<std::atomic<bool>>(false);
165  for(auto& q: m_queues) {
166  q.push([&q,set_to_run,iAction]() mutable{
167  bool expected = false;
168  if(set_to_run->compare_exchange_strong(expected,true)) {
169  q.pause();
170  iAction(Resumer(&q));
171  }
172  });
173  }
174  }
175 
176 }
177 
178 #endif
void push(T &&iAction)
asynchronously pushes functor iAction into queue
Resumer(Resumer const &iOther)
def destroy(e)
Definition: pyrootRender.py:14
unsigned int concurrencyLimit() const
void pushAndWait(T &&iAction)
synchronously pushes functor iAction into queue
bool resume()
Resumes processing if the queue was paused.
std::vector< SerialTaskQueue > m_queues
Resumer & operator=(Resumer &&iOther)
void pushAndPause(T &&iAction)
asynchronously pushes functor iAction into queue then pause the queue and run iAction ...
LimitedTaskQueue(unsigned int iLimit)
Resumer & operator=(Resumer const &iOther)
Resumer(SerialTaskQueue *iQueue)
HLT enums.
long double T
bool pause()
Pauses processing of additional tasks from the queue.
def move(src, dest)
Definition: eostools.py:511