CMS 3D CMS Logo

LimitedTaskQueue.h
Go to the documentation of this file.
1 #ifndef FWCore_Concurrency_LimitedTaskQueue_h
2 #define FWCore_Concurrency_LimitedTaskQueue_h
3 // -*- C++ -*-
4 //
5 // Package: Concurrency
6 // Class : LimitedTaskQueue
7 //
21 //
22 // Original Author: Chris Jones
23 // Created: Thu Feb 21 11:14:39 CST 2013
24 // $Id$
25 //
26 
27 // system include files
28 #include <atomic>
29 #include <vector>
30 #include <memory>
31 
33 
34 // user include files
35 
36 // forward declarations
37 namespace edm {
39  public:
40  LimitedTaskQueue(unsigned int iLimit) : m_queues{iLimit} {}
41 
42  // ---------- member functions ---------------------------
43 
45 
51  template <typename T>
52  void push(T&& iAction);
53 
55 
62  template <typename T>
63  void pushAndWait(T&& iAction);
64 
65  class Resumer {
66  public:
67  friend class LimitedTaskQueue;
68 
69  Resumer() = default;
70  ~Resumer() { resume(); }
71 
72  Resumer(Resumer&& iOther) : m_queue(iOther.m_queue) { iOther.m_queue = nullptr; }
73 
74  Resumer(Resumer const& iOther) : m_queue(iOther.m_queue) {
75  if (m_queue) {
76  m_queue->pause();
77  }
78  }
79 
80  Resumer& operator=(Resumer const& iOther) {
81  auto t = iOther;
82  return (*this = std::move(t));
83  }
84  Resumer& operator=(Resumer&& iOther) {
85  if (m_queue) {
86  m_queue->resume();
87  }
88  m_queue = iOther.m_queue;
89  iOther.m_queue = nullptr;
90  return *this;
91  }
92 
93  bool resume() {
94  if (m_queue) {
95  auto q = m_queue;
96  m_queue = nullptr;
97  return q->resume();
98  }
99  return false;
100  }
101 
102  private:
103  Resumer(SerialTaskQueue* iQueue) : m_queue{iQueue} {}
105  };
106 
108 
112  template <typename T>
113  void pushAndPause(T&& iAction);
114 
115  unsigned int concurrencyLimit() const { return m_queues.size(); }
116 
117  private:
118  LimitedTaskQueue(const LimitedTaskQueue&) = delete;
119  const LimitedTaskQueue& operator=(const LimitedTaskQueue&) = delete;
120 
121  // ---------- member data --------------------------------
122  std::vector<SerialTaskQueue> m_queues;
123  };
124 
125  template <typename T>
126  void LimitedTaskQueue::push(T&& iAction) {
127  auto set_to_run = std::make_shared<std::atomic<bool>>(false);
128  for (auto& q : m_queues) {
129  q.push([set_to_run, iAction]() mutable {
130  bool expected = false;
131  if (set_to_run->compare_exchange_strong(expected, true)) {
132  iAction();
133  }
134  });
135  }
136  }
137 
138  template <typename T>
140  tbb::empty_task* waitTask = new (tbb::task::allocate_root()) tbb::empty_task;
141  waitTask->set_ref_count(2);
142  auto set_to_run = std::make_shared<std::atomic<bool>>(false);
143  for (auto& q : m_queues) {
144  q.push([set_to_run, waitTask, iAction]() mutable {
145  bool expected = false;
146  if (set_to_run->compare_exchange_strong(expected, true)) {
147  try {
148  iAction();
149  } catch (...) {
150  }
151  waitTask->decrement_ref_count();
152  }
153  });
154  }
155  waitTask->wait_for_all();
156  tbb::task::destroy(*waitTask);
157  }
158 
159  template <typename T>
161  auto set_to_run = std::make_shared<std::atomic<bool>>(false);
162  for (auto& q : m_queues) {
163  q.push([&q, set_to_run, iAction]() mutable {
164  bool expected = false;
165  if (set_to_run->compare_exchange_strong(expected, true)) {
166  q.pause();
167  iAction(Resumer(&q));
168  }
169  });
170  }
171  }
172 
173 } // namespace edm
174 
175 #endif
void push(T &&iAction)
asynchronously pushes functor iAction into queue
Resumer(Resumer const &iOther)
unsigned int concurrencyLimit() const
void pushAndWait(T &&iAction)
synchronously pushes functor iAction into queue
bool resume()
Resumes processing if the queue was paused.
std::vector< SerialTaskQueue > m_queues
Resumer & operator=(Resumer &&iOther)
void pushAndPause(T &&iAction)
asynchronously pushes functor iAction into queue then pause the queue and run iAction ...
LimitedTaskQueue(unsigned int iLimit)
Resumer & operator=(Resumer const &iOther)
Resumer(SerialTaskQueue *iQueue)
HLT enums.
long double T
bool pause()
Pauses processing of additional tasks from the queue.
def move(src, dest)
Definition: eostools.py:511