CMS 3D CMS Logo

ControllerChannel.cc
Go to the documentation of this file.
1 // -*- C++ -*-
2 //
3 // Package: FWCore/SharedMemory
4 // Class : ControllerChannel
5 //
6 // Implementation:
7 // [Notes on implementation]
8 //
9 // Original Author: Chris Jones
10 // Created: 21/01/2020
11 //
12 
13 // system include files
14 #include <cassert>
15 
16 // user include files
19 
20 //
21 // constants, enums and typedefs
22 //
23 using namespace edm::shared_memory;
24 using namespace boost::interprocess;
25 
26 //
27 // static data member definitions
28 //
29 
30 //
31 // constructors and destructor
32 //
33 
34 ControllerChannel::ControllerChannel(std::string const& iName, int id, unsigned int iMaxWaitInSeconds)
35  : id_{id},
36  maxWaitInSeconds_{iMaxWaitInSeconds},
37  smName_{uniqueName(iName)},
38  smRemover_{smName_},
39  managed_sm_{create_only, smName_.c_str(), 1024},
40  toWorkerBufferInfo_{bufferInfo(channel_names::kToWorkerBufferInfo, managed_sm_)},
41  fromWorkerBufferInfo_{bufferInfo(channel_names::kFromWorkerBufferInfo, managed_sm_)},
42  mutexRemover_{uniqueName(channel_names::kMutex)},
43  mutex_{create_only, uniqueName(channel_names::kMutex).c_str()},
44  cndFromMainRemover_{uniqueName(channel_names::kConditionFromMain)},
45  cndFromMain_{create_only, uniqueName(channel_names::kConditionFromMain).c_str()},
46  cndToMainRemover_{uniqueName(channel_names::kConditionToMain)},
47  cndToMain_{create_only, uniqueName(channel_names::kConditionToMain).c_str()} {
48  stop_ = managed_sm_.construct<bool>(channel_names::kStop)(false);
49  assert(stop_);
50  keepEvent_ = managed_sm_.construct<bool>(channel_names::kKeepEvent)(true);
51  assert(keepEvent_);
52 
53  transitionType_ =
55  assert(transitionType_);
56 
57  transitionID_ = managed_sm_.construct<unsigned long long>(channel_names::kTransitionID)(0);
58  assert(transitionID_);
59 }
60 
63  managed_sm_.destroy<bool>(channel_names::kStop);
64  managed_sm_.destroy<unsigned int>(channel_names::kTransitionType);
65  managed_sm_.destroy<unsigned long long>(channel_names::kTransitionID);
68 }
69 
70 //
71 // member functions
72 //
74  auto pid = getpid();
75  iBase += std::to_string(pid);
76  iBase += "_";
77  iBase += std::to_string(id_);
78 
79  return iBase;
80 }
81 
82 bool ControllerChannel::wait(scoped_lock<named_mutex>& lock, edm::Transition iTrans, unsigned long long iTransID) {
83  *transitionType_ = iTrans;
84  *transitionID_ = iTransID;
85  //std::cout << id_ << " notifying" << std::endl;
86  cndFromMain_.notify_all();
87 
88  //std::cout << id_ << " waiting" << std::endl;
89  using namespace boost::posix_time;
90  //this has to be after change to *transitionID_ as that is the variable re-used for the check
91  auto workerStatus = initCheckWorkerStatus(transitionID_);
92  if (not cndToMain_.timed_wait(lock, microsec_clock::universal_time() + seconds(maxWaitInSeconds_)) and
93  not workerStatus.workerFinished()) {
94  //std::cout << id_ << " waiting FAILED" << std::endl;
95  return false;
96  }
97  return true;
98 }
99 
100 bool ControllerChannel::wait(scoped_lock<named_mutex>& lock) {
101  //std::cout << id_ << " waiting" << std::endl;
102  using namespace boost::posix_time;
103  *transitionID_ = 0;
104  auto workerStatus = initCheckWorkerStatus(transitionID_);
105  if (not cndToMain_.timed_wait(lock, microsec_clock::universal_time() + seconds(maxWaitInSeconds_)) and
106  not workerStatus.workerFinished()) {
107  //std::cout << id_ << " waiting FAILED" << std::endl;
108  return false;
109  }
110  return true;
111 }
112 
113 bool ControllerChannel::continueWait(scoped_lock<named_mutex>& lock) {
114  //std::cout << id_ << " waiting" << std::endl;
115  using namespace boost::posix_time;
116  //NOTE: value of *transitionID_ can not have been changed by the worker since call to wait()
117  // as we've had the lock since the end of that call.
118  auto workerStatus = initCheckWorkerStatus(transitionID_);
119  if (not cndToMain_.timed_wait(lock, microsec_clock::universal_time() + seconds(maxWaitInSeconds_)) and
120  not workerStatus.workerFinished()) {
121  //std::cout << id_ << " waiting FAILED" << std::endl;
122  return false;
123  }
124  return true;
125 }
126 
127 //
128 // const member functions
129 //
130 
131 //
132 // static member functions
133 //
134 BufferInfo* ControllerChannel::bufferInfo(const char* iWhich, managed_shared_memory& mem) {
135  mem.destroy<BufferInfo>(iWhich);
136  BufferInfo* v = mem.construct<BufferInfo>(iWhich)();
137  return v;
138 }
constexpr char const *const kConditionToMain
Definition: channel_names.h:33
boost::interprocess::named_condition cndToMain_
bool wait(boost::interprocess::scoped_lock< boost::interprocess::named_mutex > &lock, edm::Transition iTrans, unsigned long long iTransID)
constexpr char const *const kConditionFromMain
Definition: channel_names.h:32
constexpr char const *const kStop
Definition: channel_names.h:34
constexpr char const *const kTransitionType
Definition: channel_names.h:36
assert(be >=bs)
static std::string to_string(const XMLCh *ch)
constexpr char const *const kMutex
Definition: channel_names.h:31
Transition
Definition: Transition.h:12
constexpr char const *const kKeepEvent
Definition: channel_names.h:35
uint16_t mem[nChs][nEvts]
CheckWorkerStatus initCheckWorkerStatus(unsigned long long *iPtr) const noexcept
std::string uniqueName(std::string iBase) const
boost::interprocess::named_condition cndFromMain_
constexpr char const *const kFromWorkerBufferInfo
Definition: channel_names.h:30
ControllerChannel(std::string const &iName, int iID, unsigned int iMaxWaitInSeconds)
constexpr char const *const kTransitionID
Definition: channel_names.h:37
constexpr char const *const kToWorkerBufferInfo
Definition: channel_names.h:29
boost::interprocess::managed_shared_memory managed_sm_
bool continueWait(boost::interprocess::scoped_lock< boost::interprocess::named_mutex > &lock)
static BufferInfo * bufferInfo(const char *iWhich, boost::interprocess::managed_shared_memory &mem)