CMS 3D CMS Logo

ControllerChannel.h
Go to the documentation of this file.
1 #ifndef FWCore_SharedMemory_ControllerChannel_h
2 #define FWCore_SharedMemory_ControllerChannel_h
3 // -*- C++ -*-
4 //
5 // Package: FWCore/SharedMemory
6 // Class : ControllerChannel
7 //
16 //
17 // Original Author: Chris Jones
18 // Created: 21/01/2020
19 //
20 
21 // system include files
22 #include <string>
23 #include <iostream>
24 #include "boost/interprocess/managed_shared_memory.hpp"
25 #include "boost/interprocess/sync/named_mutex.hpp"
26 #include "boost/interprocess/sync/named_condition.hpp"
27 #include "boost/interprocess/sync/scoped_lock.hpp"
28 
29 // user include files
33 
34 // forward declarations
35 
36 namespace edm::shared_memory {
38  public:
42  ControllerChannel(std::string const& iName, int iID, unsigned int iMaxWaitInSeconds);
44  ControllerChannel(const ControllerChannel&) = delete;
45  const ControllerChannel& operator=(const ControllerChannel&) = delete;
48 
49  // ---------- member functions ---------------------------
50 
55  template <typename F>
56  void setupWorker(F&& iF) {
57  using namespace boost::interprocess;
58  scoped_lock<named_mutex> lock(mutex_);
59  iF();
60  using namespace boost::posix_time;
61  //std::cout << id_ << " waiting for external process" << std::endl;
62  if (not wait(lock)) {
63  //std::cout << id_ << " FAILED waiting for external process" << std::endl;
64  *stop_ = true;
65  throw cms::Exception("ExternalFailed")
66  << "Failed waiting for external process while setting up the process. Timed out after " << maxWaitInSeconds_
67  << " seconds.";
68  } else {
69  //std::cout << id_ << " done waiting for external process" << std::endl;
70  }
71  }
72 
76  template <typename F, typename FRETRY>
77  void setupWorkerWithRetry(F&& iF, FRETRY&& iRetry) {
78  using namespace boost::interprocess;
79  scoped_lock<named_mutex> lock(mutex_);
80  iF();
81  using namespace boost::posix_time;
82  //std::cout << id_ << " waiting for external process" << std::endl;
83  bool shouldContinue = true;
84  long long int retryCount = 0;
85  do {
86  if (not wait(lock)) {
87  if (not iRetry()) {
88  *stop_ = true;
89  throw cms::Exception("ExternalFailed")
90  << "Failed waiting for external process while setting up the process. Timed out after "
91  << maxWaitInSeconds_ << " seconds with " << retryCount << " retries.";
92  }
93  //std::cerr<<"retrying\n";
94  ++retryCount;
95  } else {
96  shouldContinue = false;
97  }
98  } while (shouldContinue);
99  }
100 
101  template <typename F>
102  bool doTransition(F&& iF, edm::Transition iTrans, unsigned long long iTransitionID) {
103  using namespace boost::interprocess;
104 
105  //std::cout << id_ << " taking from lock" << std::endl;
106  scoped_lock<named_mutex> lock(mutex_);
107  if (not wait(lock, iTrans, iTransitionID)) {
108  return false;
109  }
110  //std::cout <<id_<<"running doTranstion command"<<std::endl;
111  iF();
112  return true;
113  }
114 
115  template <typename F, typename FRETRY>
116  bool doTransitionWithRetry(F&& iF, FRETRY&& iRetry, edm::Transition iTrans, unsigned long long iTransitionID) {
117  using namespace boost::interprocess;
118 
119  //std::cout << id_ << " taking from lock" << std::endl;
120  scoped_lock<named_mutex> lock(mutex_);
121  if (not wait(lock, iTrans, iTransitionID)) {
122  if (not iRetry()) {
123  return false;
124  }
125  bool shouldContinue = true;
126  do {
127  using namespace boost::posix_time;
128  if (not continueWait(lock)) {
129  if (not iRetry()) {
130  return false;
131  }
132  } else {
133  shouldContinue = false;
134  }
135  } while (shouldContinue);
136  }
137  //std::cout <<id_<<"running doTranstion command"<<std::endl;
138  iF();
139  return true;
140  }
141 
146 
147  void stopWorker() {
148  //std::cout <<"stopWorker"<<std::endl;
149  using namespace boost::interprocess;
150  scoped_lock<named_mutex> lock(mutex_);
151  *stop_ = true;
152  //std::cout <<"stopWorker sending notification"<<std::endl;
153  cndFromMain_.notify_all();
154  }
155 
156  // ---------- const member functions ---------------------------
157  std::string const& sharedMemoryName() const { return smName_; }
158  std::string uniqueID() const { return uniqueName(""); }
159 
160  //should only be called after calling `doTransition`
161  bool shouldKeepEvent() const { return *keepEvent_; }
162 
163  unsigned int maxWaitInSeconds() const noexcept { return maxWaitInSeconds_; }
164 
165  private:
167  const unsigned long long initValue_;
168  const unsigned long long* ptr_;
169 
170  [[nodiscard]] bool workerFinished() const noexcept { return initValue_ != *ptr_; }
171  };
172 
173  [[nodiscard]] CheckWorkerStatus initCheckWorkerStatus(unsigned long long* iPtr) const noexcept {
174  return {*iPtr, iPtr};
175  }
176 
177  static BufferInfo* bufferInfo(const char* iWhich, boost::interprocess::managed_shared_memory& mem);
178 
179  std::string uniqueName(std::string iBase) const;
180 
181  bool wait(boost::interprocess::scoped_lock<boost::interprocess::named_mutex>& lock,
182  edm::Transition iTrans,
183  unsigned long long iTransID);
184  bool wait(boost::interprocess::scoped_lock<boost::interprocess::named_mutex>& lock);
185  bool continueWait(boost::interprocess::scoped_lock<boost::interprocess::named_mutex>& lock);
186 
187  // ---------- member data --------------------------------
188  int id_;
189  unsigned int maxWaitInSeconds_;
191  boost::interprocess::managed_shared_memory managed_sm_;
194 
195  boost::interprocess::named_mutex mutex_;
196  boost::interprocess::named_condition cndFromMain_;
197 
198  boost::interprocess::named_condition cndToMain_;
199 
201  unsigned long long* transitionID_;
202  bool* stop_;
203  bool* keepEvent_;
204  };
205 } // namespace edm::shared_memory
206 
207 #endif
edm::shared_memory::ControllerChannel::cndToMain_
boost::interprocess::named_condition cndToMain_
Definition: ControllerChannel.h:198
edm::shared_memory::ControllerChannel::transitionType_
edm::Transition * transitionType_
Definition: ControllerChannel.h:200
edm::shared_memory::ControllerChannel::maxWaitInSeconds
unsigned int maxWaitInSeconds() const noexcept
Definition: ControllerChannel.h:163
edm::shared_memory::ControllerChannel::setupWorker
void setupWorker(F &&iF)
Definition: ControllerChannel.h:56
edm::shared_memory::ControllerChannel::toWorkerBufferInfo
BufferInfo * toWorkerBufferInfo()
This can be used with WriteBuffer to keep Controller and Worker in sync.
Definition: ControllerChannel.h:143
edm::shared_memory::ControllerChannel::keepEvent_
bool * keepEvent_
Definition: ControllerChannel.h:203
edm::shared_memory::ControllerChannel::managed_sm_
boost::interprocess::managed_shared_memory managed_sm_
Definition: ControllerChannel.h:191
BufferInfo.h
edm::shared_memory::ControllerChannel::wait
bool wait(boost::interprocess::scoped_lock< boost::interprocess::named_mutex > &lock, edm::Transition iTrans, unsigned long long iTransID)
edm::shared_memory::ControllerChannel::CheckWorkerStatus::initValue_
const unsigned long long initValue_
Definition: ControllerChannel.h:167
edm::shared_memory::ControllerChannel::initCheckWorkerStatus
CheckWorkerStatus initCheckWorkerStatus(unsigned long long *iPtr) const noexcept
Definition: ControllerChannel.h:173
edm::shared_memory::ControllerChannel::uniqueName
std::string uniqueName(std::string iBase) const
Definition: ControllerChannel.cc:76
edm::shared_memory::ControllerChannel::continueWait
bool continueWait(boost::interprocess::scoped_lock< boost::interprocess::named_mutex > &lock)
Definition: ControllerChannel.cc:116
edm::shared_memory::ControllerChannel::uniqueID
std::string uniqueID() const
Definition: ControllerChannel.h:158
mem
uint16_t mem[nChs][nEvts]
Definition: recycleTccEmu.cc:13
edm::shared_memory::ControllerChannel::fromWorkerBufferInfo_
BufferInfo * fromWorkerBufferInfo_
Definition: ControllerChannel.h:193
edm::shared_memory::ControllerChannel::transitionID_
unsigned long long * transitionID_
Definition: ControllerChannel.h:201
edm::shared_memory::ControllerChannel::shouldKeepEvent
bool shouldKeepEvent() const
Definition: ControllerChannel.h:161
watchdog.const
const
Definition: watchdog.py:83
ptr_
const char * ptr_
Definition: DataKey.cc:76
F
static uInt32 F(BLOWFISH_CTX *ctx, uInt32 x)
Definition: blowfish.cc:163
edm::shared_memory::ControllerChannel::smName_
std::string smName_
Definition: ControllerChannel.h:190
edm::shared_memory::BufferInfo
Definition: BufferInfo.h:27
edm::shared_memory::ControllerChannel::CheckWorkerStatus::workerFinished
bool workerFinished() const noexcept
Definition: ControllerChannel.h:170
edm::shared_memory::ControllerChannel::cndFromMain_
boost::interprocess::named_condition cndFromMain_
Definition: ControllerChannel.h:196
edm::shared_memory::ControllerChannel::toWorkerBufferInfo_
BufferInfo * toWorkerBufferInfo_
Definition: ControllerChannel.h:192
edm::shared_memory::ControllerChannel::stopWorker
void stopWorker()
Definition: ControllerChannel.h:147
Transition.h
edm::shared_memory::ControllerChannel::bufferInfo
static BufferInfo * bufferInfo(const char *iWhich, boost::interprocess::managed_shared_memory &mem)
Definition: ControllerChannel.cc:137
edm::shared_memory::ControllerChannel::id_
int id_
Definition: ControllerChannel.h:188
edm::shared_memory::ControllerChannel::~ControllerChannel
~ControllerChannel()
Definition: ControllerChannel.cc:60
edm::Transition
Transition
Definition: Transition.h:12
edm::shared_memory::ControllerChannel::fromWorkerBufferInfo
BufferInfo * fromWorkerBufferInfo()
This can be used with ReadBuffer to keep Controller and Worker in sync.
Definition: ControllerChannel.h:145
CommonMethods.lock
def lock()
Definition: CommonMethods.py:81
edm::shared_memory::ControllerChannel::ControllerChannel
ControllerChannel(std::string const &iName, int iID, unsigned int iMaxWaitInSeconds)
Definition: ControllerChannel.cc:34
edm::shared_memory::ControllerChannel::doTransitionWithRetry
bool doTransitionWithRetry(F &&iF, FRETRY &&iRetry, edm::Transition iTrans, unsigned long long iTransitionID)
Definition: ControllerChannel.h:116
edm::shared_memory::ControllerChannel::CheckWorkerStatus
Definition: ControllerChannel.h:166
edm::shared_memory
Definition: buffer_names.h:27
edm::shared_memory::ControllerChannel::sharedMemoryName
std::string const & sharedMemoryName() const
Definition: ControllerChannel.h:157
AlCaHLTBitMon_QueryRunRegistry.string
string string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
edm::shared_memory::ControllerChannel::CheckWorkerStatus::ptr_
const unsigned long long * ptr_
Definition: ControllerChannel.h:168
edm::shared_memory::ControllerChannel::stop_
bool * stop_
Definition: ControllerChannel.h:202
edm::shared_memory::ControllerChannel::mutex_
boost::interprocess::named_mutex mutex_
Definition: ControllerChannel.h:195
edm::shared_memory::ControllerChannel::operator=
const ControllerChannel & operator=(const ControllerChannel &)=delete
Exception
Definition: hltDiff.cc:245
edm::shared_memory::ControllerChannel::maxWaitInSeconds_
unsigned int maxWaitInSeconds_
Definition: ControllerChannel.h:189
Exception.h
edm::shared_memory::ControllerChannel
Definition: ControllerChannel.h:37
edm::shared_memory::ControllerChannel::setupWorkerWithRetry
void setupWorkerWithRetry(F &&iF, FRETRY &&iRetry)
Definition: ControllerChannel.h:77
edm::shared_memory::ControllerChannel::doTransition
bool doTransition(F &&iF, edm::Transition iTrans, unsigned long long iTransitionID)
Definition: ControllerChannel.h:102