CMS 3D CMS Logo

externalGenerator.cc
Go to the documentation of this file.
1 #include "boost/program_options.hpp"
2 
3 #include <atomic>
4 #include <csignal>
5 #include <iostream>
6 #include <string>
7 #include <thread>
8 #include <memory>
9 #include <filesystem>
10 
12 
20 
22 
29 
31 
32 static char const* const kMemoryNameOpt = "memory-name";
33 static char const* const kMemoryNameCommandOpt = "memory-name,m";
34 static char const* const kUniqueIDOpt = "unique-id";
35 static char const* const kUniqueIDCommandOpt = "unique-id,i";
36 static char const* const kHelpOpt = "help";
37 static char const* const kHelpCommandOpt = "help,h";
38 static char const* const kVerboseOpt = "verbose";
39 static char const* const kVerboseCommandOpt = "verbose,v";
40 
41 //This application only uses 1 thread
43 
44 //NOTE: Can use TestProcessor as the harness for the worker
45 
46 using namespace edm::shared_memory;
47 class Harness {
48 public:
49  Harness(std::string const& iConfig, edm::ServiceToken iToken)
50  : tester_(edm::test::TestProcessor::Config{iConfig}, iToken) {}
51 
53  auto lumi = tester_.testBeginLuminosityBlock(iLumi);
54  ExternalGeneratorLumiInfo returnValue;
55  returnValue.header_ = *lumi.get<GenLumiInfoHeader>();
56  return returnValue;
57  }
58 
60  ExternalGeneratorEventInfo returnValue;
61  auto event = tester_.test();
62  returnValue.hepmc_ = *event.get<edm::HepMCProduct>("unsmeared");
63  returnValue.eventInfo_ = *event.get<GenEventInfoProduct>();
64  returnValue.keepEvent_ = event.modulePassed();
65  return returnValue;
66  }
67 
69  auto lumi = tester_.testEndLuminosityBlock();
70  return *lumi.get<GenLumiInfoProduct>();
71  }
72 
74  auto run = tester_.testEndRun();
75  return *run.get<GenRunInfoProduct>();
76  }
77 
78 private:
80 };
81 
82 template <typename T>
84 
85 namespace {
86  //needed for atexit handling
87  CMS_THREAD_SAFE boost::interprocess::scoped_lock<boost::interprocess::named_mutex>* s_sharedLock = nullptr;
88 
89  void atexit_handler() {
90  if (s_sharedLock) {
91  std::cerr << s_uniqueID << " process: early exit called: unlock\n";
92  s_sharedLock->unlock();
93  }
94  }
95 } // namespace
96 
97 int main(int argc, char* argv[]) {
98  std::string descString(argv[0]);
99  descString += " [--";
100  descString += kMemoryNameOpt;
101  descString += "] memory_name";
102  boost::program_options::options_description desc(descString);
103 
104  desc.add_options()(kHelpCommandOpt, "produce help message")(
105  kMemoryNameCommandOpt, boost::program_options::value<std::string>(), "memory name")(
106  kUniqueIDCommandOpt, boost::program_options::value<std::string>(), "unique id")(kVerboseCommandOpt,
107  "verbose output");
108 
109  boost::program_options::positional_options_description p;
110  p.add(kMemoryNameOpt, 1);
111  p.add(kUniqueIDOpt, 2);
112 
113  boost::program_options::options_description all_options("All Options");
114  all_options.add(desc);
115 
116  boost::program_options::variables_map vm;
117  try {
118  store(boost::program_options::command_line_parser(argc, argv).options(all_options).positional(p).run(), vm);
119  notify(vm);
120  } catch (boost::program_options::error const& iException) {
121  std::cout << argv[0] << ": Error while trying to process command line arguments:\n"
122  << iException.what() << "\nFor usage and an options list, please do 'cmsRun --help'.";
123  return 1;
124  }
125 
126  if (vm.count(kHelpOpt)) {
127  std::cout << desc << std::endl;
128  return 0;
129  }
130 
131  bool verbose = false;
132  if (vm.count(kVerboseOpt)) {
133  verbose = true;
134  }
135 
136  if (!vm.count(kMemoryNameOpt)) {
137  std::cout << " no argument given" << std::endl;
138  return 1;
139  }
140 
141  if (!vm.count(kUniqueIDOpt)) {
142  std::cout << " no second argument given" << std::endl;
143  return 1;
144  }
145 
146  using namespace std::string_literals;
147  using namespace std::filesystem;
148 
149  auto newDir = path("thread"s + vm[kUniqueIDOpt].as<std::string>());
150  create_directory(newDir);
151  current_path(newDir);
152 
153  WorkerMonitorThread monitorThread;
154 
155  monitorThread.startThread();
156 
157  std::string presentState = "setting up communicationChannel";
158 
159  CMS_SA_ALLOW try {
160  std::string const memoryName(vm[kMemoryNameOpt].as<std::string>());
161  std::string const uniqueID(vm[kUniqueIDOpt].as<std::string>());
162  s_uniqueID = uniqueID;
163  {
164  //This class is holding the lock
165  WorkerChannel communicationChannel(memoryName, uniqueID);
166 
167  presentState = "setting up read/write buffers";
168  WriteBuffer sm_buffer{memoryName, communicationChannel.fromWorkerBufferInfo()};
169  ReadBuffer sm_readbuffer{std::string("Rand") + memoryName, communicationChannel.toWorkerBufferInfo()};
170  int counter = 0;
171 
172  presentState = "setting up monitor thread";
173  //The lock must be released if there is a catastrophic signal
174  auto lockPtr = communicationChannel.accessLock();
175 
176  monitorThread.setAction([lockPtr]() {
177  if (lockPtr) {
178  std::cerr << s_uniqueID << " process: SIGNAL CAUGHT: unlock\n";
179  lockPtr->unlock();
180  }
181  });
182 
183  presentState = "setting up termination handler";
184  //be sure to unset the address of the shared lock before the lock goes away
185  s_sharedLock = lockPtr;
186  auto unsetLockPtr = [](void*) { s_sharedLock = nullptr; };
187  std::unique_ptr<decltype(s_sharedLock), decltype(unsetLockPtr)> sharedLockGuard{&s_sharedLock, unsetLockPtr};
188  std::atexit(atexit_handler);
189  auto releaseLock = []() {
190  if (s_sharedLock) {
191  std::cerr << s_uniqueID << " process: terminate called: unlock\n";
192  s_sharedLock->unlock();
193  s_sharedLock = nullptr;
194  //deactivate the abort signal
195 
196  struct sigaction act;
197  act.sa_sigaction = nullptr;
198  act.sa_flags = SA_SIGINFO;
199  sigemptyset(&act.sa_mask);
200  sigaction(SIGABRT, &act, nullptr);
201  std::abort();
202  }
203  };
204  std::set_terminate(releaseLock);
205 
206  presentState = "setting up serializers";
207  Serializer<ExternalGeneratorEventInfo> serializer(sm_buffer);
208  Serializer<ExternalGeneratorLumiInfo> bl_serializer(sm_buffer);
209  Serializer<GenLumiInfoProduct> el_serializer(sm_buffer);
210  Serializer<GenRunInfoProduct> er_serializer(sm_buffer);
211 
212  ROOTDeserializer<edm::RandomNumberGeneratorState, ReadBuffer> random_deserializer(sm_readbuffer);
213 
214  presentState = "reading configuration";
215  std::cerr << uniqueID << " process: initializing " << std::endl;
216  int nlines;
217  std::cin >> nlines;
218 
220  for (int i = 0; i < nlines; ++i) {
221  std::string c;
222  std::getline(std::cin, c);
223  if (verbose) {
224  std::cerr << c << "\n";
225  }
226  configuration += c + "\n";
227  }
228 
229  presentState = "setting up random number generator";
231  auto serviceToken =
232  edm::ServiceRegistry::createContaining(std::unique_ptr<edm::RandomNumberGenerator>(randomService));
233  Harness harness(configuration, serviceToken);
234 
235  //Some generator libraries override the signal handlers
236  monitorThread.setupSignalHandling();
237  std::set_terminate(releaseLock);
238 
239  if (verbose) {
240  std::cerr << uniqueID << " process: done initializing" << std::endl;
241  }
242  presentState = "finished initialization";
243  communicationChannel.workerSetupDone();
244 
245  presentState = "waiting for transition";
246  if (verbose)
247  std::cerr << uniqueID << " process: waiting " << counter << std::endl;
248  communicationChannel.handleTransitions([&](edm::Transition iTransition, unsigned long long iTransitionID) {
249  ++counter;
250  switch (iTransition) {
252  presentState = "beginRun transition";
253  if (verbose)
254  std::cerr << uniqueID << " process: start beginRun " << std::endl;
255  if (verbose)
256  std::cerr << uniqueID << " process: end beginRun " << std::endl;
257 
258  break;
259  }
261  presentState = "begin lumi";
262  if (verbose)
263  std::cerr << uniqueID << " process: start beginLumi " << std::endl;
264  auto randState = random_deserializer.deserialize();
265  presentState = "deserialized random state in begin lumi";
266  if (verbose)
267  std::cerr << uniqueID << " random " << randState.state_.size() << " " << randState.seed_ << std::endl;
268  randomService->setState(randState.state_, randState.seed_);
269  presentState = "processing begin lumi";
270  auto value = harness.getBeginLumiValue(iTransitionID);
271  value.randomState_.state_ = randomService->getState();
272  value.randomState_.seed_ = randomService->mySeed();
273 
274  presentState = "serialize lumi";
275  bl_serializer.serialize(value);
276  if (verbose)
277  std::cerr << uniqueID << " process: end beginLumi " << std::endl;
278  if (verbose)
279  std::cerr << uniqueID << " rand " << value.randomState_.state_.size() << " " << value.randomState_.seed_
280  << std::endl;
281  break;
282  }
283  case edm::Transition::Event: {
284  presentState = "begin event";
285  if (verbose)
286  std::cerr << uniqueID << " process: event " << counter << std::endl;
287  presentState = "deserialized random state in event";
288  auto randState = random_deserializer.deserialize();
289  randomService->setState(randState.state_, randState.seed_);
290  presentState = "processing event";
291  auto value = harness.getEventValue();
292  value.randomState_.state_ = randomService->getState();
293  value.randomState_.seed_ = randomService->mySeed();
294 
295  if (verbose)
296  std::cerr << uniqueID << " process: event " << counter << std::endl;
297 
298  presentState = "serialize event";
299  serializer.serialize(value);
300  if (verbose)
301  std::cerr << uniqueID << " process: "
302  << " " << counter << std::endl;
303  //usleep(10000000);
304  break;
305  }
307  presentState = "begin end lumi";
308  if (verbose)
309  std::cerr << uniqueID << " process: start endLumi " << std::endl;
310  presentState = "processing end lumi";
311  auto value = harness.getEndLumiValue();
312 
313  presentState = "serialize end lumi";
314  el_serializer.serialize(value);
315  if (verbose)
316  std::cerr << uniqueID << " process: end endLumi " << std::endl;
317 
318  break;
319  }
321  presentState = "begin end run";
322  if (verbose)
323  std::cerr << uniqueID << " process: start endRun " << std::endl;
324  presentState = "process end run";
325  auto value = harness.getEndRunValue();
326 
327  presentState = "serialize end run";
328  er_serializer.serialize(value);
329  if (verbose)
330  std::cerr << uniqueID << " process: end endRun " << std::endl;
331 
332  break;
333  }
334  default: {
335  assert(false);
336  }
337  }
338  presentState = "notifying and waiting after " + presentState;
339  if (verbose)
340  std::cerr << uniqueID << " process: notifying and waiting " << counter << std::endl;
341  });
342  }
343  } catch (std::exception const& iExcept) {
344  std::cerr << "%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n"
345  << s_uniqueID << " process: caught exception \n"
346  << iExcept.what() << "\n"
347  << " while " << presentState << "\n"
348  << "%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n";
349  return 1;
350  } catch (...) {
351  std::cerr << "%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n"
352  << s_uniqueID << " process: caught unknown exception\n while " << presentState << "\n"
353  << "%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n";
354  return 1;
355  }
356  return 0;
357 }
edmtest::ThingCollection getEndLumiValue()
Definition: interprocess.cc:47
#define CMS_SA_ALLOW
edmtest::ThingCollection getEventValue()
Definition: interprocess.cc:42
boost::interprocess::scoped_lock< boost::interprocess::named_mutex > * accessLock()
the lock is made accessible so that the WorkerMonitorThread can be used to unlock it in the event of ...
Definition: WorkerChannel.h:48
ExternalGeneratorEventInfo getEventValue()
bool verbose
static char const *const kHelpOpt
static char const *const kMemoryNameCommandOpt
edmtest::ThingCollection getBeginLumiValue(unsigned int iLumi)
Definition: interprocess.cc:37
void setState(std::vector< unsigned long > const &, long seed)
GenRunInfoProduct getEndRunValue()
void setupSignalHandling()
Sets the unix signal handler which communicates with the thread.
static char const *const kMemoryNameOpt
static std::string s_uniqueID
void setAction(std::function< void()> iFunc)
assert(be >=bs)
static char const *const kHelpCommandOpt
GenLumiInfoProduct getEndLumiValue()
BufferInfo * toWorkerBufferInfo()
This can be used with ReadBuffer to keep Controller and Worker in sync.
Definition: WorkerChannel.h:51
void workerSetupDone()
Matches the ControllerChannel::setupWorker call.
Definition: WorkerChannel.h:56
Transition
Definition: Transition.h:12
static ServiceToken createContaining(std::unique_ptr< T > iService)
create a service token that holds the service defined by iService
static char const *const kVerboseOpt
#define CMS_THREAD_SAFE
ExternalGeneratorLumiInfo getBeginLumiValue(unsigned int iLumi)
Definition: value.py:1
static char const *const kVerboseCommandOpt
BufferInfo * fromWorkerBufferInfo()
This can be used with WriteBuffer to keep Controller and Worker in sync.
Definition: WorkerChannel.h:53
static char const *const kUniqueIDCommandOpt
Harness(std::string const &iConfig, edm::ServiceToken iToken)
edmtest::ThingCollection getEndRunValue()
Definition: interprocess.cc:52
static char const *const kUniqueIDOpt
HLT enums.
static std::atomic< unsigned int > counter
Definition: Config.py:1
int main(int argc, char *argv[])