CMS 3D CMS Logo

ControllerChannel.cc
Go to the documentation of this file.
1 // -*- C++ -*-
2 //
3 // Package: FWCore/SharedMemory
4 // Class : ControllerChannel
5 //
6 // Implementation:
7 // [Notes on implementation]
8 //
9 // Original Author: Chris Jones
10 // Created: 21/01/2020
11 //
12 
13 // system include files
14 #include <cassert>
15 
16 // user include files
19 
20 //
21 // constants, enums and typedefs
22 //
23 using namespace edm::shared_memory;
24 using namespace boost::interprocess;
25 
26 //
27 // static data member definitions
28 //
29 
30 //
31 // constructors and destructor
32 //
33 
34 ControllerChannel::ControllerChannel(std::string const& iName, int id, unsigned int iMaxWaitInSeconds)
35  : id_{id},
36  maxWaitInSeconds_{iMaxWaitInSeconds},
37  smName_{uniqueName(iName)},
38  managed_sm_{open_or_create, smName_.c_str(), 1024},
39  toWorkerBufferInfo_{bufferInfo(channel_names::kToWorkerBufferInfo, managed_sm_)},
40  fromWorkerBufferInfo_{bufferInfo(channel_names::kFromWorkerBufferInfo, managed_sm_)},
41  mutex_{open_or_create, uniqueName(channel_names::kMutex).c_str()},
42  cndFromMain_{open_or_create, uniqueName(channel_names::kConditionFromMain).c_str()},
43  cndToMain_{open_or_create, uniqueName(channel_names::kConditionToMain).c_str()} {
44  managed_sm_.destroy<bool>(channel_names::kStop);
45  stop_ = managed_sm_.construct<bool>(channel_names::kStop)(false);
46  assert(stop_);
47  keepEvent_ = managed_sm_.construct<bool>(channel_names::kKeepEvent)(true);
48  assert(keepEvent_);
49 
50  managed_sm_.destroy<edm::Transition>(channel_names::kTransitionType);
51  transitionType_ =
53  assert(transitionType_);
54 
55  managed_sm_.destroy<unsigned long long>(channel_names::kTransitionID);
56  transitionID_ = managed_sm_.construct<unsigned long long>(channel_names::kTransitionID)(0);
57  assert(transitionID_);
58 }
59 
62  managed_sm_.destroy<bool>(channel_names::kStop);
63  managed_sm_.destroy<unsigned int>(channel_names::kTransitionType);
64  managed_sm_.destroy<unsigned long long>(channel_names::kTransitionID);
67 
71 }
72 
73 //
74 // member functions
75 //
77  auto pid = getpid();
78  iBase += std::to_string(pid);
79  iBase += "_";
80  iBase += std::to_string(id_);
81 
82  return iBase;
83 }
84 
85 bool ControllerChannel::wait(scoped_lock<named_mutex>& lock, edm::Transition iTrans, unsigned long long iTransID) {
86  *transitionType_ = iTrans;
87  *transitionID_ = iTransID;
88  //std::cout << id_ << " notifying" << std::endl;
89  cndFromMain_.notify_all();
90 
91  //std::cout << id_ << " waiting" << std::endl;
92  using namespace boost::posix_time;
93  //this has to be after change to *transitionID_ as that is the variable re-used for the check
94  auto workerStatus = initCheckWorkerStatus(transitionID_);
95  if (not cndToMain_.timed_wait(lock, microsec_clock::universal_time() + seconds(maxWaitInSeconds_)) and
96  not workerStatus.workerFinished()) {
97  //std::cout << id_ << " waiting FAILED" << std::endl;
98  return false;
99  }
100  return true;
101 }
102 
103 bool ControllerChannel::wait(scoped_lock<named_mutex>& lock) {
104  //std::cout << id_ << " waiting" << std::endl;
105  using namespace boost::posix_time;
106  *transitionID_ = 0;
107  auto workerStatus = initCheckWorkerStatus(transitionID_);
108  if (not cndToMain_.timed_wait(lock, microsec_clock::universal_time() + seconds(maxWaitInSeconds_)) and
109  not workerStatus.workerFinished()) {
110  //std::cout << id_ << " waiting FAILED" << std::endl;
111  return false;
112  }
113  return true;
114 }
115 
116 bool ControllerChannel::continueWait(scoped_lock<named_mutex>& lock) {
117  //std::cout << id_ << " waiting" << std::endl;
118  using namespace boost::posix_time;
119  //NOTE: value of *transitionID_ can not have been changed by the worker since call to wait()
120  // as we've had the lock since the end of that call.
121  auto workerStatus = initCheckWorkerStatus(transitionID_);
122  if (not cndToMain_.timed_wait(lock, microsec_clock::universal_time() + seconds(maxWaitInSeconds_)) and
123  not workerStatus.workerFinished()) {
124  //std::cout << id_ << " waiting FAILED" << std::endl;
125  return false;
126  }
127  return true;
128 }
129 
130 //
131 // const member functions
132 //
133 
134 //
135 // static member functions
136 //
137 BufferInfo* ControllerChannel::bufferInfo(const char* iWhich, managed_shared_memory& mem) {
138  mem.destroy<BufferInfo>(iWhich);
139  BufferInfo* v = mem.construct<BufferInfo>(iWhich)();
140  return v;
141 }
constexpr char const *const kConditionToMain
Definition: channel_names.h:33
boost::interprocess::named_condition cndToMain_
double seconds()
bool wait(boost::interprocess::scoped_lock< boost::interprocess::named_mutex > &lock, edm::Transition iTrans, unsigned long long iTransID)
constexpr char const *const kConditionFromMain
Definition: channel_names.h:32
constexpr char const *const kStop
Definition: channel_names.h:34
constexpr char const *const kTransitionType
Definition: channel_names.h:36
std::string to_string(const V &value)
Definition: OMSAccess.h:71
assert(be >=bs)
constexpr char const *const kMutex
Definition: channel_names.h:31
Transition
Definition: Transition.h:12
constexpr char const *const kKeepEvent
Definition: channel_names.h:35
uint16_t mem[nChs][nEvts]
CheckWorkerStatus initCheckWorkerStatus(unsigned long long *iPtr) const noexcept
std::string uniqueName(std::string iBase) const
boost::interprocess::named_condition cndFromMain_
constexpr char const *const kFromWorkerBufferInfo
Definition: channel_names.h:30
ControllerChannel(std::string const &iName, int iID, unsigned int iMaxWaitInSeconds)
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:223
constexpr char const *const kTransitionID
Definition: channel_names.h:37
constexpr char const *const kToWorkerBufferInfo
Definition: channel_names.h:29
boost::interprocess::managed_shared_memory managed_sm_
bool continueWait(boost::interprocess::scoped_lock< boost::interprocess::named_mutex > &lock)
static BufferInfo * bufferInfo(const char *iWhich, boost::interprocess::managed_shared_memory &mem)