CMS 3D CMS Logo

ControllerChannel.h
Go to the documentation of this file.
1 #ifndef FWCore_SharedMemory_ControllerChannel_h
2 #define FWCore_SharedMemory_ControllerChannel_h
3 // -*- C++ -*-
4 //
5 // Package: FWCore/SharedMemory
6 // Class : ControllerChannel
7 //
16 //
17 // Original Author: Chris Jones
18 // Created: 21/01/2020
19 //
20 
21 // system include files
22 #include <string>
23 #include <iostream>
24 #include "boost/interprocess/managed_shared_memory.hpp"
25 #include "boost/interprocess/sync/named_mutex.hpp"
26 #include "boost/interprocess/sync/named_condition.hpp"
27 #include "boost/interprocess/sync/scoped_lock.hpp"
28 #include "boost/date_time/posix_time/posix_time_types.hpp"
29 
30 // user include files
34 
35 // forward declarations
36 
37 namespace edm::shared_memory {
39  public:
43  ControllerChannel(std::string const& iName, int iID, unsigned int iMaxWaitInSeconds);
45  ControllerChannel(const ControllerChannel&) = delete;
46  const ControllerChannel& operator=(const ControllerChannel&) = delete;
49 
50  // ---------- member functions ---------------------------
51 
56  template <typename F>
57  void setupWorker(F&& iF) {
58  using namespace boost::interprocess;
59  scoped_lock<named_mutex> lock(mutex_);
60  iF();
61  using namespace boost::posix_time;
62  //std::cout << id_ << " waiting for external process" << std::endl;
63  if (not wait(lock)) {
64  //std::cout << id_ << " FAILED waiting for external process" << std::endl;
65  *stop_ = true;
67  << "Failed waiting for external process while setting up the process. Timed out after " << maxWaitInSeconds_
68  << " seconds.";
69  } else {
70  //std::cout << id_ << " done waiting for external process" << std::endl;
71  }
72  }
73 
77  template <typename F, typename FRETRY>
78  void setupWorkerWithRetry(F&& iF, FRETRY&& iRetry) {
79  using namespace boost::interprocess;
80  scoped_lock<named_mutex> lock(mutex_);
81  iF();
82  using namespace boost::posix_time;
83  //std::cout << id_ << " waiting for external process" << std::endl;
84  bool shouldContinue = true;
85  long long int retryCount = 0;
86  do {
87  if (not wait(lock)) {
88  if (not iRetry()) {
89  *stop_ = true;
91  << "Failed waiting for external process while setting up the process. Timed out after "
92  << maxWaitInSeconds_ << " seconds with " << retryCount << " retries.";
93  }
94  //std::cerr<<"retrying\n";
95  ++retryCount;
96  } else {
97  shouldContinue = false;
98  }
99  } while (shouldContinue);
100  }
101 
102  template <typename F>
103  bool doTransition(F&& iF, edm::Transition iTrans, unsigned long long iTransitionID) {
104  using namespace boost::interprocess;
105 
106  //std::cout << id_ << " taking from lock" << std::endl;
107  scoped_lock<named_mutex> lock(mutex_);
108  if (not wait(lock, iTrans, iTransitionID)) {
109  return false;
110  }
111  //std::cout <<id_<<"running doTranstion command"<<std::endl;
112  iF();
113  return true;
114  }
115 
116  template <typename F, typename FRETRY>
117  bool doTransitionWithRetry(F&& iF, FRETRY&& iRetry, edm::Transition iTrans, unsigned long long iTransitionID) {
118  using namespace boost::interprocess;
119 
120  //std::cout << id_ << " taking from lock" << std::endl;
121  scoped_lock<named_mutex> lock(mutex_);
122  if (not wait(lock, iTrans, iTransitionID)) {
123  if (not iRetry()) {
124  return false;
125  }
126  bool shouldContinue = true;
127  do {
128  using namespace boost::posix_time;
129  if (not continueWait(lock)) {
130  if (not iRetry()) {
131  return false;
132  }
133  } else {
134  shouldContinue = false;
135  }
136  } while (shouldContinue);
137  }
138  //std::cout <<id_<<"running doTranstion command"<<std::endl;
139  iF();
140  return true;
141  }
142 
147 
148  void stopWorker() {
149  //std::cout <<"stopWorker"<<std::endl;
150  using namespace boost::interprocess;
151  scoped_lock<named_mutex> lock(mutex_);
152  *stop_ = true;
153  //std::cout <<"stopWorker sending notification"<<std::endl;
154  cndFromMain_.notify_all();
155  }
156 
157  // ---------- const member functions ---------------------------
158  std::string const& sharedMemoryName() const { return smName_; }
159  std::string uniqueID() const { return uniqueName(""); }
160 
161  //should only be called after calling `doTransition`
162  bool shouldKeepEvent() const { return *keepEvent_; }
163 
164  unsigned int maxWaitInSeconds() const noexcept { return maxWaitInSeconds_; }
165 
166  private:
168  const unsigned long long initValue_;
169  const unsigned long long* ptr_;
170 
171  [[nodiscard]] bool workerFinished() const noexcept { return initValue_ != *ptr_; }
172  };
173 
174  [[nodiscard]] CheckWorkerStatus initCheckWorkerStatus(unsigned long long* iPtr) const noexcept {
175  return {*iPtr, iPtr};
176  }
177 
178  static BufferInfo* bufferInfo(const char* iWhich, boost::interprocess::managed_shared_memory& mem);
179 
180  std::string uniqueName(std::string iBase) const;
181 
182  bool wait(boost::interprocess::scoped_lock<boost::interprocess::named_mutex>& lock,
183  edm::Transition iTrans,
184  unsigned long long iTransID);
185  bool wait(boost::interprocess::scoped_lock<boost::interprocess::named_mutex>& lock);
186  bool continueWait(boost::interprocess::scoped_lock<boost::interprocess::named_mutex>& lock);
187 
188  // ---------- member data --------------------------------
189  int id_;
190  unsigned int maxWaitInSeconds_;
192  struct SMORemover {
193  //handle removing the shared memory object from the system even
194  // if an exception happens during construction
195  SMORemover(const std::string& iName) : m_name(iName) {
196  //remove an object which was left from a previous failed job
198  }
200  //ControllerChannel passes in smName_ so it owns the string
201  std::string const& m_name;
202  } smRemover_;
203  boost::interprocess::managed_shared_memory managed_sm_;
206 
207  struct MutexRemover {
208  MutexRemover(std::string iName) : m_name(std::move(iName)) {
210  }
212  std::string const m_name;
213  };
215  boost::interprocess::named_mutex mutex_;
216 
218  ConditionRemover(std::string iName) : m_name(std::move(iName)) {
220  }
222  std::string const m_name;
223  };
224 
226  boost::interprocess::named_condition cndFromMain_;
227 
229  boost::interprocess::named_condition cndToMain_;
230 
232  unsigned long long* transitionID_;
233  bool* stop_;
234  bool* keepEvent_;
235  };
236 } // namespace edm::shared_memory
237 
238 #endif
void setupWorkerWithRetry(F &&iF, FRETRY &&iRetry)
BufferInfo * toWorkerBufferInfo()
This can be used with WriteBuffer to keep Controller and Worker in sync.
boost::interprocess::named_condition cndToMain_
struct edm::shared_memory::ControllerChannel::SMORemover smRemover_
const char * ptr_
Definition: DataKey.cc:76
bool wait(boost::interprocess::scoped_lock< boost::interprocess::named_mutex > &lock, edm::Transition iTrans, unsigned long long iTransID)
unsigned int maxWaitInSeconds() const noexcept
std::string const & sharedMemoryName() const
BufferInfo * fromWorkerBufferInfo()
This can be used with ReadBuffer to keep Controller and Worker in sync.
boost::interprocess::named_mutex mutex_
Transition
Definition: Transition.h:12
uint16_t mem[nChs][nEvts]
CheckWorkerStatus initCheckWorkerStatus(unsigned long long *iPtr) const noexcept
std::string uniqueName(std::string iBase) const
const ControllerChannel & operator=(const ControllerChannel &)=delete
boost::interprocess::named_condition cndFromMain_
ControllerChannel(std::string const &iName, int iID, unsigned int iMaxWaitInSeconds)
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:223
boost::interprocess::managed_shared_memory managed_sm_
bool continueWait(boost::interprocess::scoped_lock< boost::interprocess::named_mutex > &lock)
static BufferInfo * bufferInfo(const char *iWhich, boost::interprocess::managed_shared_memory &mem)
static uInt32 F(BLOWFISH_CTX *ctx, uInt32 x)
Definition: blowfish.cc:163
def move(src, dest)
Definition: eostools.py:511
bool doTransition(F &&iF, edm::Transition iTrans, unsigned long long iTransitionID)
bool doTransitionWithRetry(F &&iF, FRETRY &&iRetry, edm::Transition iTrans, unsigned long long iTransitionID)