CMS 3D CMS Logo

interprocess_random.cc
Go to the documentation of this file.
1 #include "boost/program_options.hpp"
2 
3 #include <atomic>
4 #include <csignal>
5 #include <iostream>
6 #include <string>
7 #include <thread>
8 
10 #include "DataFormats/TestObjects/interface/ToyProducts.h"
12 
14 
21 
22 static char const* const kMemoryNameOpt = "memory-name";
23 static char const* const kMemoryNameCommandOpt = "memory-name,m";
24 static char const* const kUniqueIDOpt = "unique-id";
25 static char const* const kUniqueIDCommandOpt = "unique-id,i";
26 static char const* const kHelpOpt = "help";
27 static char const* const kHelpCommandOpt = "help,h";
28 
29 //NOTE: Can use TestProcessor as the harness for the worker
30 
31 using SentType = std::pair<edmtest::IntProduct, edm::RandomNumberGeneratorState>;
32 
33 using namespace edm::shared_memory;
34 class Harness {
35 public:
36  Harness(std::string const& iConfig, edm::ServiceToken iToken)
37  : tester_(edm::test::TestProcessor::Config{iConfig}, iToken) {}
38 
39  edmtest::IntProduct getBeginLumiValue(unsigned int iLumi) {
40  auto lumi = tester_.testBeginLuminosityBlock(iLumi);
41  return *lumi.get<edmtest::IntProduct>("lumi");
42  }
43 
44  edmtest::IntProduct getEventValue() {
45  auto event = tester_.test();
46  return *event.get<edmtest::IntProduct>();
47  }
48 
49 private:
51 };
52 
53 int main(int argc, char* argv[]) {
54  std::string descString(argv[0]);
55  descString += " [--";
56  descString += kMemoryNameOpt;
57  descString += "] memory_name";
58  boost::program_options::options_description desc(descString);
59 
60  desc.add_options()(kHelpCommandOpt, "produce help message")(
61  kMemoryNameCommandOpt, boost::program_options::value<std::string>(), "memory name")(
62  kUniqueIDCommandOpt, boost::program_options::value<std::string>(), "unique id");
63 
64  boost::program_options::positional_options_description p;
65  p.add(kMemoryNameOpt, 1);
66  p.add(kUniqueIDOpt, 2);
67 
68  boost::program_options::options_description all_options("All Options");
69  all_options.add(desc);
70 
71  boost::program_options::variables_map vm;
72  try {
73  store(boost::program_options::command_line_parser(argc, argv).options(all_options).positional(p).run(), vm);
74  notify(vm);
75  } catch (boost::program_options::error const& iException) {
76  std::cout << argv[0] << ": Error while trying to process command line arguments:\n"
77  << iException.what() << "\nFor usage and an options list, please do 'cmsRun --help'.";
78  return 1;
79  }
80 
81  if (vm.count(kHelpOpt)) {
82  std::cout << desc << std::endl;
83  return 0;
84  }
85 
86  if (!vm.count(kMemoryNameOpt)) {
87  std::cout << " no argument given" << std::endl;
88  return 1;
89  }
90 
91  if (!vm.count(kUniqueIDOpt)) {
92  std::cout << " no second argument given" << std::endl;
93  return 1;
94  }
95 
96  WorkerMonitorThread monitorThread;
97 
98  monitorThread.startThread();
99 
100  try {
101  std::string const memoryName(vm[kMemoryNameOpt].as<std::string>());
102  std::string const uniqueID(vm[kUniqueIDOpt].as<std::string>());
103  {
104  //This class is holding the lock
105  WorkerChannel communicationChannel(memoryName, uniqueID);
106 
107  WriteBuffer sm_buffer{memoryName, communicationChannel.fromWorkerBufferInfo()};
108  ReadBuffer sm_readbuffer{std::string("Rand") + memoryName, communicationChannel.toWorkerBufferInfo()};
109  int counter = 0;
110 
111  //The lock must be released if there is a catastrophic signal
112  auto lockPtr = communicationChannel.accessLock();
113  monitorThread.setAction([lockPtr]() {
114  if (lockPtr) {
115  std::cerr << "SIGNAL CAUGHT: unlock\n";
116  lockPtr->unlock();
117  }
118  });
119 
120  using TCSerializer = ROOTSerializer<SentType, WriteBuffer>;
121  TCSerializer serializer(sm_buffer);
122  TCSerializer bl_serializer(sm_buffer);
123 
125  TCDeserializer random_deserializer(sm_readbuffer);
126 
127  std::cerr << uniqueID << " process: initializing " << std::endl;
128  int nlines;
129  std::cin >> nlines;
130 
132  for (int i = 0; i < nlines; ++i) {
133  std::string c;
134  std::getline(std::cin, c);
135  std::cerr << c << "\n";
136  configuration += c + "\n";
137  }
138 
140  auto serviceToken =
141  edm::ServiceRegistry::createContaining(std::unique_ptr<edm::RandomNumberGenerator>(randomService));
142 
143  Harness harness(configuration, serviceToken);
144 
145  //Either ROOT or the Framework are overriding the signal handlers
146  monitorThread.setupSignalHandling();
147 
148  std::cerr << uniqueID << " process: done initializing" << std::endl;
149  communicationChannel.workerSetupDone();
150 
151  std::cerr << uniqueID << " process: waiting " << counter << std::endl;
152  communicationChannel.handleTransitions([&](edm::Transition iTransition, unsigned long long iTransitionID) {
153  ++counter;
154  switch (iTransition) {
156  std::cerr << uniqueID << " process: start beginLumi " << std::endl;
157  auto randState = random_deserializer.deserialize();
158  std::cerr << " state " << randState.seed_ << std::endl;
159  randomService->setState(randState.state_, randState.seed_);
160  SentType toSend;
161  toSend.first = harness.getBeginLumiValue(iTransitionID);
162  toSend.second.state_ = randomService->getState();
163  toSend.second.seed_ = randomService->mySeed();
164  bl_serializer.serialize(toSend);
165  std::cerr << uniqueID << " process: end beginLumi " << toSend.first.value << std::endl;
166 
167  break;
168  }
169  case edm::Transition::Event: {
170  std::cerr << uniqueID << " process: begin event " << counter << std::endl;
171  auto randState = random_deserializer.deserialize();
172  randomService->setState(randState.state_, randState.seed_);
173  SentType toSend;
174  toSend.first = harness.getEventValue();
175  toSend.second.state_ = randomService->getState();
176  toSend.second.seed_ = randomService->mySeed();
177  std::cerr << uniqueID << " process: end event " << counter << std::endl;
178 
179  serializer.serialize(toSend);
180  std::cerr << uniqueID << " process: " << toSend.first.value << " " << counter << std::endl;
181  //usleep(10000000);
182  break;
183  }
184  default: {
185  assert(false);
186  }
187  }
188  std::cerr << uniqueID << " process: notifying and waiting" << counter << std::endl;
189  });
190  }
191  } catch (std::exception const& iExcept) {
192  std::cerr << "caught exception \n" << iExcept.what() << "\n";
193  return 1;
194  } catch (...) {
195  std::cerr << "caught unknown exception";
196  return 1;
197  }
198  return 0;
199 }
edmtest::ThingCollection getEventValue()
Definition: interprocess.cc:42
boost::interprocess::scoped_lock< boost::interprocess::named_mutex > * accessLock()
the lock is made accessible so that the WorkerMonitorThread can be used to unlock it in the event of ...
Definition: WorkerChannel.h:48
edmtest::ThingCollection getBeginLumiValue(unsigned int iLumi)
Definition: interprocess.cc:37
void setState(std::vector< unsigned long > const &, long seed)
void setupSignalHandling()
Sets the unix signal handler which communicates with the thread.
void setAction(std::function< void()> iFunc)
static char const *const kHelpOpt
edmtest::IntProduct getBeginLumiValue(unsigned int iLumi)
BufferInfo * toWorkerBufferInfo()
This can be used with ReadBuffer to keep Controller and Worker in sync.
Definition: WorkerChannel.h:51
void workerSetupDone()
Matches the ControllerChannel::setupWorker call.
Definition: WorkerChannel.h:56
std::pair< edmtest::IntProduct, edm::RandomNumberGeneratorState > SentType
Transition
Definition: Transition.h:12
static ServiceToken createContaining(std::unique_ptr< T > iService)
create a service token that holds the service defined by iService
edmtest::IntProduct getEventValue()
BufferInfo * fromWorkerBufferInfo()
This can be used with WriteBuffer to keep Controller and Worker in sync.
Definition: WorkerChannel.h:53
static char const *const kUniqueIDCommandOpt
static char const *const kMemoryNameOpt
static char const *const kHelpCommandOpt
Harness(std::string const &iConfig, edm::ServiceToken iToken)
static char const *const kMemoryNameCommandOpt
HLT enums.
static std::atomic< unsigned int > counter
Definition: Config.py:1
static char const *const kUniqueIDOpt
int main(int argc, char *argv[])