CMS 3D CMS Logo

LimitedTaskQueue.h
Go to the documentation of this file.
1 #ifndef FWCore_Concurrency_LimitedTaskQueue_h
2 #define FWCore_Concurrency_LimitedTaskQueue_h
3 // -*- C++ -*-
4 //
5 // Package: Concurrency
6 // Class : LimitedTaskQueue
7 //
21 //
22 // Original Author: Chris Jones
23 // Created: Thu Feb 21 11:14:39 CST 2013
24 // $Id$
25 //
26 
27 // system include files
28 #include <atomic>
29 #include <vector>
30 #include <memory>
31 
34 
35 // user include files
36 
37 // forward declarations
38 namespace edm {
40  public:
41  LimitedTaskQueue(unsigned int iLimit) : m_queues{iLimit} {}
42  LimitedTaskQueue(const LimitedTaskQueue&) = delete;
43  const LimitedTaskQueue& operator=(const LimitedTaskQueue&) = delete;
44 
45  // ---------- member functions ---------------------------
46 
48 
54  template <typename T>
55  void push(tbb::task_group& iGroup, T&& iAction);
56 
57  class Resumer {
58  public:
59  friend class LimitedTaskQueue;
60 
61  Resumer() = default;
62  ~Resumer() { resume(); }
63 
64  Resumer(Resumer&& iOther) : m_queue(iOther.m_queue) { iOther.m_queue = nullptr; }
65 
66  Resumer(Resumer const& iOther) : m_queue(iOther.m_queue) {
67  if (m_queue) {
68  m_queue->pause();
69  }
70  }
71 
72  Resumer& operator=(Resumer const& iOther) {
73  auto t = iOther;
74  return (*this = std::move(t));
75  }
76  Resumer& operator=(Resumer&& iOther) {
77  if (m_queue) {
78  m_queue->resume();
79  }
80  m_queue = iOther.m_queue;
81  iOther.m_queue = nullptr;
82  return *this;
83  }
84 
85  bool resume() {
86  if (m_queue) {
87  auto q = m_queue;
88  m_queue = nullptr;
89  return q->resume();
90  }
91  return false;
92  }
93 
94  private:
95  Resumer(SerialTaskQueue* iQueue) : m_queue{iQueue} {}
97  };
98 
100 
104  template <typename T>
105  void pushAndPause(tbb::task_group& iGroup, T&& iAction);
106 
107  unsigned int concurrencyLimit() const { return m_queues.size(); }
108 
109  private:
110  // ---------- member data --------------------------------
111  std::vector<SerialTaskQueue> m_queues;
112  };
113 
114  template <typename T>
115  void LimitedTaskQueue::push(tbb::task_group& iGroup, T&& iAction) {
116  auto set_to_run = std::make_shared<std::atomic<bool>>(false);
117  for (auto& q : m_queues) {
118  q.push(iGroup, [set_to_run, iAction]() mutable {
119  bool expected = false;
120  if (set_to_run->compare_exchange_strong(expected, true)) {
121  iAction();
122  }
123  });
124  }
125  }
126 
127  template <typename T>
128  void LimitedTaskQueue::pushAndPause(tbb::task_group& iGroup, T&& iAction) {
129  auto set_to_run = std::make_shared<std::atomic<bool>>(false);
130  for (auto& q : m_queues) {
131  q.push(iGroup, [&q, set_to_run, iAction]() mutable {
132  bool expected = false;
133  if (set_to_run->compare_exchange_strong(expected, true)) {
134  q.pause();
135  iAction(Resumer(&q));
136  }
137  });
138  }
139  }
140 
141 } // namespace edm
142 
143 #endif
edm
HLT enums.
Definition: AlignableModifier.h:19
edm::LimitedTaskQueue::Resumer::Resumer
Resumer(Resumer &&iOther)
Definition: LimitedTaskQueue.h:64
edm::SerialTaskQueue::resume
bool resume()
Resumes processing if the queue was paused.
Definition: SerialTaskQueue.cc:58
edm::LimitedTaskQueue::operator=
const LimitedTaskQueue & operator=(const LimitedTaskQueue &)=delete
edm::SerialTaskQueue
Definition: SerialTaskQueue.h:67
edm::LimitedTaskQueue::Resumer::Resumer
Resumer()=default
edm::LimitedTaskQueue
Definition: LimitedTaskQueue.h:39
SerialTaskQueue.h
edm::LimitedTaskQueue::m_queues
std::vector< SerialTaskQueue > m_queues
Definition: LimitedTaskQueue.h:111
LimitedTaskQueue
edm::LimitedTaskQueue::LimitedTaskQueue
LimitedTaskQueue(unsigned int iLimit)
Definition: LimitedTaskQueue.h:41
edm::SerialTaskQueue::pause
bool pause()
Pauses processing of additional tasks from the queue.
Definition: SerialTaskQueue.h:99
edm::LimitedTaskQueue::concurrencyLimit
unsigned int concurrencyLimit() const
Definition: LimitedTaskQueue.h:107
edm::LimitedTaskQueue::push
void push(tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
Definition: LimitedTaskQueue.h:115
thread_safety_macros.h
edm::LimitedTaskQueue::Resumer::operator=
Resumer & operator=(Resumer &&iOther)
Definition: LimitedTaskQueue.h:76
edm::LimitedTaskQueue::pushAndPause
void pushAndPause(tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue then pause the queue and run iAction
Definition: LimitedTaskQueue.h:128
submitPVResolutionJobs.q
q
Definition: submitPVResolutionJobs.py:84
edm::LimitedTaskQueue::Resumer::m_queue
SerialTaskQueue * m_queue
Definition: LimitedTaskQueue.h:96
edm::LimitedTaskQueue::Resumer
Definition: LimitedTaskQueue.h:57
eostools.move
def move(src, dest)
Definition: eostools.py:511
T
long double T
Definition: Basic3DVectorLD.h:48
edm::LimitedTaskQueue::Resumer::operator=
Resumer & operator=(Resumer const &iOther)
Definition: LimitedTaskQueue.h:72
edm::LimitedTaskQueue::Resumer::resume
bool resume()
Definition: LimitedTaskQueue.h:85
edm::LimitedTaskQueue::Resumer::~Resumer
~Resumer()
Definition: LimitedTaskQueue.h:62
edm::LimitedTaskQueue::Resumer::Resumer
Resumer(SerialTaskQueue *iQueue)
Definition: LimitedTaskQueue.h:95
submitPVValidationJobs.t
string t
Definition: submitPVValidationJobs.py:644
edm::LimitedTaskQueue::Resumer::Resumer
Resumer(Resumer const &iOther)
Definition: LimitedTaskQueue.h:66