CMS 3D CMS Logo

LimitedTaskQueue.h
Go to the documentation of this file.
1 #ifndef FWCore_Concurrency_LimitedTaskQueue_h
2 #define FWCore_Concurrency_LimitedTaskQueue_h
3 // -*- C++ -*-
4 //
5 // Package: Concurrency
6 // Class : LimitedTaskQueue
7 //
21 //
22 // Original Author: Chris Jones
23 // Created: Thu Feb 21 11:14:39 CST 2013
24 // $Id$
25 //
26 
27 // system include files
28 #include <atomic>
29 #include <vector>
30 #include <memory>
31 
34 
35 // user include files
36 
37 // forward declarations
38 namespace edm {
40  public:
41  LimitedTaskQueue(unsigned int iLimit) : m_queues{iLimit} {}
42 
43  // ---------- member functions ---------------------------
44 
46 
52  template <typename T>
53  void push(T&& iAction);
54 
56 
63  template <typename T>
64  void pushAndWait(T&& iAction);
65 
66  class Resumer {
67  public:
68  friend class LimitedTaskQueue;
69 
70  Resumer() = default;
71  ~Resumer() { resume(); }
72 
73  Resumer(Resumer&& iOther) : m_queue(iOther.m_queue) { iOther.m_queue = nullptr; }
74 
75  Resumer(Resumer const& iOther) : m_queue(iOther.m_queue) {
76  if (m_queue) {
77  m_queue->pause();
78  }
79  }
80 
81  Resumer& operator=(Resumer const& iOther) {
82  auto t = iOther;
83  return (*this = std::move(t));
84  }
85  Resumer& operator=(Resumer&& iOther) {
86  if (m_queue) {
87  m_queue->resume();
88  }
89  m_queue = iOther.m_queue;
90  iOther.m_queue = nullptr;
91  return *this;
92  }
93 
94  bool resume() {
95  if (m_queue) {
96  auto q = m_queue;
97  m_queue = nullptr;
98  return q->resume();
99  }
100  return false;
101  }
102 
103  private:
104  Resumer(SerialTaskQueue* iQueue) : m_queue{iQueue} {}
106  };
107 
109 
113  template <typename T>
114  void pushAndPause(T&& iAction);
115 
116  unsigned int concurrencyLimit() const { return m_queues.size(); }
117 
118  private:
119  LimitedTaskQueue(const LimitedTaskQueue&) = delete;
120  const LimitedTaskQueue& operator=(const LimitedTaskQueue&) = delete;
121 
122  // ---------- member data --------------------------------
123  std::vector<SerialTaskQueue> m_queues;
124  };
125 
126  template <typename T>
127  void LimitedTaskQueue::push(T&& iAction) {
128  auto set_to_run = std::make_shared<std::atomic<bool>>(false);
129  for (auto& q : m_queues) {
130  q.push([set_to_run, iAction]() mutable {
131  bool expected = false;
132  if (set_to_run->compare_exchange_strong(expected, true)) {
133  iAction();
134  }
135  });
136  }
137  }
138 
139  template <typename T>
141  tbb::empty_task* waitTask = new (tbb::task::allocate_root()) tbb::empty_task;
142  waitTask->set_ref_count(2);
143  auto set_to_run = std::make_shared<std::atomic<bool>>(false);
144  for (auto& q : m_queues) {
145  q.push([set_to_run, waitTask, iAction]() mutable {
146  bool expected = false;
147  if (set_to_run->compare_exchange_strong(expected, true)) {
148  // Exception needs to be caught in order to decrease the waitTask reference count at the end. The user of SerialTaskQueue should handle exceptions within iAction.
149  CMS_SA_ALLOW try { iAction(); } catch (...) {
150  }
151  waitTask->decrement_ref_count();
152  }
153  });
154  }
155  waitTask->wait_for_all();
156  tbb::task::destroy(*waitTask);
157  }
158 
159  template <typename T>
161  auto set_to_run = std::make_shared<std::atomic<bool>>(false);
162  for (auto& q : m_queues) {
163  q.push([&q, set_to_run, iAction]() mutable {
164  bool expected = false;
165  if (set_to_run->compare_exchange_strong(expected, true)) {
166  q.pause();
167  iAction(Resumer(&q));
168  }
169  });
170  }
171  }
172 
173 } // namespace edm
174 
175 #endif
edm
HLT enums.
Definition: AlignableModifier.h:19
edm::LimitedTaskQueue::Resumer::Resumer
Resumer(Resumer &&iOther)
Definition: LimitedTaskQueue.h:73
edm::SerialTaskQueue::resume
bool resume()
Resumes processing if the queue was paused.
Definition: SerialTaskQueue.cc:35
data-class-funcs.q
q
Definition: data-class-funcs.py:169
edm::LimitedTaskQueue::operator=
const LimitedTaskQueue & operator=(const LimitedTaskQueue &)=delete
edm::SerialTaskQueue
Definition: SerialTaskQueue.h:67
CMS_SA_ALLOW
#define CMS_SA_ALLOW
Definition: thread_safety_macros.h:5
edm::LimitedTaskQueue::Resumer::Resumer
Resumer()=default
edm::LimitedTaskQueue
Definition: LimitedTaskQueue.h:39
SerialTaskQueue.h
edm::LimitedTaskQueue::m_queues
std::vector< SerialTaskQueue > m_queues
Definition: LimitedTaskQueue.h:123
edm::LimitedTaskQueue::pushAndWait
void pushAndWait(T &&iAction)
synchronously pushes functor iAction into queue
Definition: LimitedTaskQueue.h:140
OrderedSet.t
t
Definition: OrderedSet.py:90
edm::LimitedTaskQueue::LimitedTaskQueue
LimitedTaskQueue(unsigned int iLimit)
Definition: LimitedTaskQueue.h:41
edm::SerialTaskQueue::pause
bool pause()
Pauses processing of additional tasks from the queue.
Definition: SerialTaskQueue.h:96
edm::LimitedTaskQueue::concurrencyLimit
unsigned int concurrencyLimit() const
Definition: LimitedTaskQueue.h:116
thread_safety_macros.h
edm::LimitedTaskQueue::Resumer::operator=
Resumer & operator=(Resumer &&iOther)
Definition: LimitedTaskQueue.h:85
edm::LimitedTaskQueue::Resumer::m_queue
SerialTaskQueue * m_queue
Definition: LimitedTaskQueue.h:105
edm::LimitedTaskQueue::Resumer
Definition: LimitedTaskQueue.h:66
eostools.move
def move(src, dest)
Definition: eostools.py:511
edm::LimitedTaskQueue::pushAndPause
void pushAndPause(T &&iAction)
asynchronously pushes functor iAction into queue then pause the queue and run iAction
Definition: LimitedTaskQueue.h:160
T
long double T
Definition: Basic3DVectorLD.h:48
edm::LimitedTaskQueue::Resumer::operator=
Resumer & operator=(Resumer const &iOther)
Definition: LimitedTaskQueue.h:81
edm::LimitedTaskQueue::Resumer::resume
bool resume()
Definition: LimitedTaskQueue.h:94
edm::LimitedTaskQueue::Resumer::~Resumer
~Resumer()
Definition: LimitedTaskQueue.h:71
edm::LimitedTaskQueue::Resumer::Resumer
Resumer(SerialTaskQueue *iQueue)
Definition: LimitedTaskQueue.h:104
edm::LimitedTaskQueue::Resumer::Resumer
Resumer(Resumer const &iOther)
Definition: LimitedTaskQueue.h:75
edm::LimitedTaskQueue::push
void push(T &&iAction)
asynchronously pushes functor iAction into queue
Definition: LimitedTaskQueue.h:127