CMS 3D CMS Logo

LimitedTaskQueue.h
Go to the documentation of this file.
1 #ifndef FWCore_Concurrency_LimitedTaskQueue_h
2 #define FWCore_Concurrency_LimitedTaskQueue_h
3 // -*- C++ -*-
4 //
5 // Package: Concurrency
6 // Class : LimitedTaskQueue
7 //
21 //
22 // Original Author: Chris Jones
23 // Created: Thu Feb 21 11:14:39 CST 2013
24 // $Id$
25 //
26 
27 // system include files
28 #include <atomic>
29 #include <vector>
30 #include <memory>
31 
34 
35 // user include files
36 
37 // forward declarations
38 namespace edm {
40  public:
41  LimitedTaskQueue(unsigned int iLimit) : m_queues{iLimit} {}
42  LimitedTaskQueue(const LimitedTaskQueue&) = delete;
43  const LimitedTaskQueue& operator=(const LimitedTaskQueue&) = delete;
44 
45  // ---------- member functions ---------------------------
46 
48 
54  template <typename T>
55  void push(T&& iAction);
56 
58 
65  template <typename T>
66  void pushAndWait(T&& iAction);
67 
68  class Resumer {
69  public:
70  friend class LimitedTaskQueue;
71 
72  Resumer() = default;
73  ~Resumer() { resume(); }
74 
75  Resumer(Resumer&& iOther) : m_queue(iOther.m_queue) { iOther.m_queue = nullptr; }
76 
77  Resumer(Resumer const& iOther) : m_queue(iOther.m_queue) {
78  if (m_queue) {
79  m_queue->pause();
80  }
81  }
82 
83  Resumer& operator=(Resumer const& iOther) {
84  auto t = iOther;
85  return (*this = std::move(t));
86  }
87  Resumer& operator=(Resumer&& iOther) {
88  if (m_queue) {
89  m_queue->resume();
90  }
91  m_queue = iOther.m_queue;
92  iOther.m_queue = nullptr;
93  return *this;
94  }
95 
96  bool resume() {
97  if (m_queue) {
98  auto q = m_queue;
99  m_queue = nullptr;
100  return q->resume();
101  }
102  return false;
103  }
104 
105  private:
106  Resumer(SerialTaskQueue* iQueue) : m_queue{iQueue} {}
108  };
109 
111 
115  template <typename T>
116  void pushAndPause(T&& iAction);
117 
118  unsigned int concurrencyLimit() const { return m_queues.size(); }
119 
120  private:
121  // ---------- member data --------------------------------
122  std::vector<SerialTaskQueue> m_queues;
123  };
124 
125  template <typename T>
126  void LimitedTaskQueue::push(T&& iAction) {
127  auto set_to_run = std::make_shared<std::atomic<bool>>(false);
128  for (auto& q : m_queues) {
129  q.push([set_to_run, iAction]() mutable {
130  bool expected = false;
131  if (set_to_run->compare_exchange_strong(expected, true)) {
132  iAction();
133  }
134  });
135  }
136  }
137 
138  template <typename T>
140  tbb::empty_task* waitTask = new (tbb::task::allocate_root()) tbb::empty_task;
141  waitTask->set_ref_count(2);
142  auto set_to_run = std::make_shared<std::atomic<bool>>(false);
143  for (auto& q : m_queues) {
144  q.push([set_to_run, waitTask, iAction]() mutable {
145  bool expected = false;
146  if (set_to_run->compare_exchange_strong(expected, true)) {
147  // Exception needs to be caught in order to decrease the waitTask reference count at the end. The user of SerialTaskQueue should handle exceptions within iAction.
148  CMS_SA_ALLOW try { iAction(); } catch (...) {
149  }
150  waitTask->decrement_ref_count();
151  }
152  });
153  }
154  waitTask->wait_for_all();
155  tbb::task::destroy(*waitTask);
156  }
157 
158  template <typename T>
160  auto set_to_run = std::make_shared<std::atomic<bool>>(false);
161  for (auto& q : m_queues) {
162  q.push([&q, set_to_run, iAction]() mutable {
163  bool expected = false;
164  if (set_to_run->compare_exchange_strong(expected, true)) {
165  q.pause();
166  iAction(Resumer(&q));
167  }
168  });
169  }
170  }
171 
172 } // namespace edm
173 
174 #endif
edm
HLT enums.
Definition: AlignableModifier.h:19
edm::LimitedTaskQueue::Resumer::Resumer
Resumer(Resumer &&iOther)
Definition: LimitedTaskQueue.h:75
edm::SerialTaskQueue::resume
bool resume()
Resumes processing if the queue was paused.
Definition: SerialTaskQueue.cc:35
edm::LimitedTaskQueue::operator=
const LimitedTaskQueue & operator=(const LimitedTaskQueue &)=delete
edm::SerialTaskQueue
Definition: SerialTaskQueue.h:67
CMS_SA_ALLOW
#define CMS_SA_ALLOW
Definition: thread_safety_macros.h:5
edm::LimitedTaskQueue::Resumer::Resumer
Resumer()=default
edm::LimitedTaskQueue
Definition: LimitedTaskQueue.h:39
SerialTaskQueue.h
edm::LimitedTaskQueue::m_queues
std::vector< SerialTaskQueue > m_queues
Definition: LimitedTaskQueue.h:122
edm::LimitedTaskQueue::pushAndWait
void pushAndWait(T &&iAction)
synchronously pushes functor iAction into queue
Definition: LimitedTaskQueue.h:139
LimitedTaskQueue
edm::LimitedTaskQueue::LimitedTaskQueue
LimitedTaskQueue(unsigned int iLimit)
Definition: LimitedTaskQueue.h:41
edm::SerialTaskQueue::pause
bool pause()
Pauses processing of additional tasks from the queue.
Definition: SerialTaskQueue.h:99
edm::LimitedTaskQueue::concurrencyLimit
unsigned int concurrencyLimit() const
Definition: LimitedTaskQueue.h:118
thread_safety_macros.h
edm::LimitedTaskQueue::Resumer::operator=
Resumer & operator=(Resumer &&iOther)
Definition: LimitedTaskQueue.h:87
submitPVResolutionJobs.q
q
Definition: submitPVResolutionJobs.py:84
edm::LimitedTaskQueue::Resumer::m_queue
SerialTaskQueue * m_queue
Definition: LimitedTaskQueue.h:107
edm::LimitedTaskQueue::Resumer
Definition: LimitedTaskQueue.h:68
eostools.move
def move(src, dest)
Definition: eostools.py:511
edm::LimitedTaskQueue::pushAndPause
void pushAndPause(T &&iAction)
asynchronously pushes functor iAction into queue then pause the queue and run iAction
Definition: LimitedTaskQueue.h:159
T
long double T
Definition: Basic3DVectorLD.h:48
edm::LimitedTaskQueue::Resumer::operator=
Resumer & operator=(Resumer const &iOther)
Definition: LimitedTaskQueue.h:83
edm::LimitedTaskQueue::Resumer::resume
bool resume()
Definition: LimitedTaskQueue.h:96
edm::LimitedTaskQueue::Resumer::~Resumer
~Resumer()
Definition: LimitedTaskQueue.h:73
edm::LimitedTaskQueue::Resumer::Resumer
Resumer(SerialTaskQueue *iQueue)
Definition: LimitedTaskQueue.h:106
submitPVValidationJobs.t
string t
Definition: submitPVValidationJobs.py:644
edm::LimitedTaskQueue::Resumer::Resumer
Resumer(Resumer const &iOther)
Definition: LimitedTaskQueue.h:77
edm::LimitedTaskQueue::push
void push(T &&iAction)
asynchronously pushes functor iAction into queue
Definition: LimitedTaskQueue.h:126