CMS 3D CMS Logo

ControllerChannel.h
Go to the documentation of this file.
1 #ifndef FWCore_SharedMemory_ControllerChannel_h
2 #define FWCore_SharedMemory_ControllerChannel_h
3 // -*- C++ -*-
4 //
5 // Package: FWCore/SharedMemory
6 // Class : ControllerChannel
7 //
16 //
17 // Original Author: Chris Jones
18 // Created: 21/01/2020
19 //
20 
21 // system include files
22 #include <string>
23 #include <iostream>
24 #include "boost/interprocess/managed_shared_memory.hpp"
25 #include "boost/interprocess/sync/named_mutex.hpp"
26 #include "boost/interprocess/sync/named_condition.hpp"
27 #include "boost/interprocess/sync/scoped_lock.hpp"
28 
29 // user include files
33 
34 // forward declarations
35 
36 namespace edm::shared_memory {
38  public:
42  ControllerChannel(std::string const& iName, int iID, unsigned int iMaxWaitInSeconds);
44  ControllerChannel(const ControllerChannel&) = delete;
45  const ControllerChannel& operator=(const ControllerChannel&) = delete;
48 
49  // ---------- member functions ---------------------------
50 
55  template <typename F>
56  void setupWorker(F&& iF) {
57  using namespace boost::interprocess;
58  scoped_lock<named_mutex> lock(mutex_);
59  iF();
60  using namespace boost::posix_time;
61  //std::cout << id_ << " waiting for external process" << std::endl;
62  if (not wait(lock)) {
63  //std::cout << id_ << " FAILED waiting for external process" << std::endl;
64  *stop_ = true;
65  throw cms::Exception("ExternalFailed")
66  << "Failed waiting for external process while setting up the process. Timed out after " << maxWaitInSeconds_
67  << " seconds.";
68  } else {
69  //std::cout << id_ << " done waiting for external process" << std::endl;
70  }
71  }
72 
76  template <typename F, typename FRETRY>
77  void setupWorkerWithRetry(F&& iF, FRETRY&& iRetry) {
78  using namespace boost::interprocess;
79  scoped_lock<named_mutex> lock(mutex_);
80  iF();
81  using namespace boost::posix_time;
82  //std::cout << id_ << " waiting for external process" << std::endl;
83  bool shouldContinue = true;
84  long long int retryCount = 0;
85  do {
86  if (not wait(lock)) {
87  if (not iRetry()) {
88  *stop_ = true;
89  throw cms::Exception("ExternalFailed")
90  << "Failed waiting for external process while setting up the process. Timed out after "
91  << maxWaitInSeconds_ << " seconds with " << retryCount << " retries.";
92  }
93  //std::cerr<<"retrying\n";
94  ++retryCount;
95  } else {
96  shouldContinue = false;
97  }
98  } while (shouldContinue);
99  }
100 
101  template <typename F>
102  bool doTransition(F&& iF, edm::Transition iTrans, unsigned long long iTransitionID) {
103  using namespace boost::interprocess;
104 
105  //std::cout << id_ << " taking from lock" << std::endl;
106  scoped_lock<named_mutex> lock(mutex_);
107  if (not wait(lock, iTrans, iTransitionID)) {
108  return false;
109  }
110  //std::cout <<id_<<"running doTranstion command"<<std::endl;
111  iF();
112  return true;
113  }
114 
115  template <typename F, typename FRETRY>
116  bool doTransitionWithRetry(F&& iF, FRETRY&& iRetry, edm::Transition iTrans, unsigned long long iTransitionID) {
117  using namespace boost::interprocess;
118 
119  //std::cout << id_ << " taking from lock" << std::endl;
120  scoped_lock<named_mutex> lock(mutex_);
121  if (not wait(lock, iTrans, iTransitionID)) {
122  if (not iRetry()) {
123  return false;
124  }
125  bool shouldContinue = true;
126  do {
127  using namespace boost::posix_time;
128  if (not continueWait(lock)) {
129  if (not iRetry()) {
130  return false;
131  }
132  } else {
133  shouldContinue = false;
134  }
135  } while (shouldContinue);
136  }
137  //std::cout <<id_<<"running doTranstion command"<<std::endl;
138  iF();
139  return true;
140  }
141 
146 
147  void stopWorker() {
148  //std::cout <<"stopWorker"<<std::endl;
149  using namespace boost::interprocess;
150  scoped_lock<named_mutex> lock(mutex_);
151  *stop_ = true;
152  //std::cout <<"stopWorker sending notification"<<std::endl;
153  cndFromMain_.notify_all();
154  }
155 
156  // ---------- const member functions ---------------------------
157  std::string const& sharedMemoryName() const { return smName_; }
158  std::string uniqueID() const { return uniqueName(""); }
159 
160  //should only be called after calling `doTransition`
161  bool shouldKeepEvent() const { return *keepEvent_; }
162 
163  unsigned int maxWaitInSeconds() const noexcept { return maxWaitInSeconds_; }
164 
165  private:
167  const unsigned long long initValue_;
168  const unsigned long long* ptr_;
169 
170  [[nodiscard]] bool workerFinished() const noexcept { return initValue_ != *ptr_; }
171  };
172 
173  [[nodiscard]] CheckWorkerStatus initCheckWorkerStatus(unsigned long long* iPtr) const noexcept {
174  return {*iPtr, iPtr};
175  }
176 
177  static BufferInfo* bufferInfo(const char* iWhich, boost::interprocess::managed_shared_memory& mem);
178 
179  std::string uniqueName(std::string iBase) const;
180 
181  bool wait(boost::interprocess::scoped_lock<boost::interprocess::named_mutex>& lock,
182  edm::Transition iTrans,
183  unsigned long long iTransID);
184  bool wait(boost::interprocess::scoped_lock<boost::interprocess::named_mutex>& lock);
185  bool continueWait(boost::interprocess::scoped_lock<boost::interprocess::named_mutex>& lock);
186 
187  // ---------- member data --------------------------------
188  int id_;
189  unsigned int maxWaitInSeconds_;
191  boost::interprocess::managed_shared_memory managed_sm_;
194 
195  boost::interprocess::named_mutex mutex_;
196  boost::interprocess::named_condition cndFromMain_;
197 
198  boost::interprocess::named_condition cndToMain_;
199 
201  unsigned long long* transitionID_;
202  bool* stop_;
203  bool* keepEvent_;
204  };
205 } // namespace edm::shared_memory
206 
207 #endif
void setupWorkerWithRetry(F &&iF, FRETRY &&iRetry)
BufferInfo * toWorkerBufferInfo()
This can be used with WriteBuffer to keep Controller and Worker in sync.
boost::interprocess::named_condition cndToMain_
const char * ptr_
Definition: DataKey.cc:76
bool wait(boost::interprocess::scoped_lock< boost::interprocess::named_mutex > &lock, edm::Transition iTrans, unsigned long long iTransID)
BufferInfo * fromWorkerBufferInfo()
This can be used with ReadBuffer to keep Controller and Worker in sync.
boost::interprocess::named_mutex mutex_
std::string const & sharedMemoryName() const
Transition
Definition: Transition.h:12
uint16_t mem[nChs][nEvts]
const ControllerChannel & operator=(const ControllerChannel &)=delete
#define noexcept
boost::interprocess::named_condition cndFromMain_
ControllerChannel(std::string const &iName, int iID, unsigned int iMaxWaitInSeconds)
boost::interprocess::managed_shared_memory managed_sm_
std::string uniqueName(std::string iBase) const
bool continueWait(boost::interprocess::scoped_lock< boost::interprocess::named_mutex > &lock)
CheckWorkerStatus initCheckWorkerStatus(unsigned long long *iPtr) const
static BufferInfo * bufferInfo(const char *iWhich, boost::interprocess::managed_shared_memory &mem)
static uInt32 F(BLOWFISH_CTX *ctx, uInt32 x)
Definition: blowfish.cc:281
bool doTransition(F &&iF, edm::Transition iTrans, unsigned long long iTransitionID)
bool doTransitionWithRetry(F &&iF, FRETRY &&iRetry, edm::Transition iTrans, unsigned long long iTransitionID)