CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
LimitedTaskQueue.h
Go to the documentation of this file.
1 #ifndef FWCore_Concurrency_LimitedTaskQueue_h
2 #define FWCore_Concurrency_LimitedTaskQueue_h
3 // -*- C++ -*-
4 //
5 // Package: Concurrency
6 // Class : LimitedTaskQueue
7 //
21 //
22 // Original Author: Chris Jones
23 // Created: Thu Feb 21 11:14:39 CST 2013
24 // $Id$
25 //
26 
27 // system include files
28 #include <atomic>
29 #include <vector>
30 #include <memory>
31 
34 
35 // user include files
36 
37 // forward declarations
38 namespace edm {
40  public:
41  LimitedTaskQueue(unsigned int iLimit) : m_queues{iLimit} {}
42  LimitedTaskQueue(const LimitedTaskQueue&) = delete;
43  const LimitedTaskQueue& operator=(const LimitedTaskQueue&) = delete;
44 
45  // ---------- member functions ---------------------------
46 
48 
54  template <typename T>
55  void push(oneapi::tbb::task_group& iGroup, T&& iAction);
56 
57  class Resumer {
58  public:
59  friend class LimitedTaskQueue;
60 
61  Resumer() = default;
62  ~Resumer() { resume(); }
63 
64  Resumer(Resumer&& iOther) : m_queue(iOther.m_queue) { iOther.m_queue = nullptr; }
65 
66  Resumer(Resumer const& iOther) : m_queue(iOther.m_queue) {
67  if (m_queue) {
68  m_queue->pause();
69  }
70  }
71 
72  Resumer& operator=(Resumer const& iOther) {
73  auto t = iOther;
74  return (*this = std::move(t));
75  }
76  Resumer& operator=(Resumer&& iOther) {
77  if (m_queue) {
78  m_queue->resume();
79  }
80  m_queue = iOther.m_queue;
81  iOther.m_queue = nullptr;
82  return *this;
83  }
84 
85  bool resume() {
86  if (m_queue) {
87  auto q = m_queue;
88  m_queue = nullptr;
89  return q->resume();
90  }
91  return false;
92  }
93 
94  private:
95  Resumer(SerialTaskQueue* iQueue) : m_queue{iQueue} {}
97  };
98 
100 
104  template <typename T>
105  void pushAndPause(oneapi::tbb::task_group& iGroup, T&& iAction);
106 
107  unsigned int concurrencyLimit() const { return m_queues.size(); }
108 
109  private:
110  // ---------- member data --------------------------------
111  std::vector<SerialTaskQueue> m_queues;
112  };
113 
114  template <typename T>
115  void LimitedTaskQueue::push(oneapi::tbb::task_group& iGroup, T&& iAction) {
116  auto set_to_run = std::make_shared<std::atomic<bool>>(false);
117  for (auto& q : m_queues) {
118  q.push(iGroup, [set_to_run, iAction]() mutable {
119  bool expected = false;
120  if (set_to_run->compare_exchange_strong(expected, true)) {
121  iAction();
122  }
123  });
124  }
125  }
126 
127  template <typename T>
128  void LimitedTaskQueue::pushAndPause(oneapi::tbb::task_group& iGroup, T&& iAction) {
129  auto set_to_run = std::make_shared<std::atomic<bool>>(false);
130  for (auto& q : m_queues) {
131  q.push(iGroup, [&q, set_to_run, iAction]() mutable {
132  bool expected = false;
133  if (set_to_run->compare_exchange_strong(expected, true)) {
134  q.pause();
135  iAction(Resumer(&q));
136  }
137  });
138  }
139  }
140 
141 } // namespace edm
142 
143 #endif
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
Resumer(Resumer const &iOther)
unsigned int concurrencyLimit() const
const LimitedTaskQueue & operator=(const LimitedTaskQueue &)=delete
bool resume()
Resumes processing if the queue was paused.
std::vector< SerialTaskQueue > m_queues
def move
Definition: eostools.py:511
void pushAndPause(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue then pause the queue and run iAction ...
Resumer & operator=(Resumer &&iOther)
LimitedTaskQueue(unsigned int iLimit)
Resumer & operator=(Resumer const &iOther)
Resumer(SerialTaskQueue *iQueue)
long double T
bool pause()
Pauses processing of additional tasks from the queue.