CMS 3D CMS Logo

externalGenerator.cc
Go to the documentation of this file.
1 #include "boost/program_options.hpp"
2 
3 #include <atomic>
4 #include <csignal>
5 #include <iostream>
6 #include <string>
7 #include <thread>
8 #include <memory>
9 #include <filesystem>
10 #include <ctime>
11 
13 
21 
23 
30 
32 
33 static char const* const kMemoryNameOpt = "memory-name";
34 static char const* const kMemoryNameCommandOpt = "memory-name,m";
35 static char const* const kUniqueIDOpt = "unique-id";
36 static char const* const kUniqueIDCommandOpt = "unique-id,i";
37 static char const* const kHelpOpt = "help";
38 static char const* const kHelpCommandOpt = "help,h";
39 static char const* const kVerboseOpt = "verbose";
40 static char const* const kVerboseCommandOpt = "verbose,v";
41 
42 //This application only uses 1 thread
44 
45 //NOTE: Can use TestProcessor as the harness for the worker
46 
47 namespace {
48  //Based on MessageLogger time handling
49  constexpr char timeFormat[] = "dd-Mon-yyyy hh:mm:ss TZN ";
50  constexpr size_t kTimeSize = sizeof(timeFormat);
51  std::array<char, kTimeSize> formattedTime() {
52  auto t = time(nullptr);
53  std::array<char, kTimeSize> ts;
54 
55  struct tm timebuf;
56  std::strftime(ts.data(), ts.size(), "%d-%b-%Y %H:%M:%S %Z", localtime_r(&t, &timebuf));
57  return ts;
58  }
59 } // namespace
60 
61 using namespace edm::shared_memory;
62 class Harness {
63 public:
64  Harness(std::string const& iConfig, edm::ServiceToken iToken)
65  : tester_(edm::test::TestProcessor::Config{iConfig}, iToken) {}
66 
68  auto lumi = tester_.testBeginLuminosityBlock(iLumi);
69  ExternalGeneratorLumiInfo returnValue;
70  returnValue.header_ = *lumi.get<GenLumiInfoHeader>();
71  return returnValue;
72  }
73 
75  ExternalGeneratorEventInfo returnValue;
76  auto event = tester_.test();
77  returnValue.hepmc_ = *event.get<edm::HepMCProduct>("unsmeared");
78  returnValue.eventInfo_ = *event.get<GenEventInfoProduct>();
79  returnValue.keepEvent_ = event.modulePassed();
80  return returnValue;
81  }
82 
84  auto lumi = tester_.testEndLuminosityBlock();
85  return *lumi.get<GenLumiInfoProduct>();
86  }
87 
89  auto run = tester_.testEndRun();
90  return *run.get<GenRunInfoProduct>();
91  }
92 
93 private:
95 };
96 
97 template <typename T>
99 
100 namespace {
101  //needed for atexit handling
102  CMS_THREAD_SAFE boost::interprocess::scoped_lock<boost::interprocess::named_mutex>* s_sharedLock = nullptr;
103 
104  void atexit_handler() {
105  if (s_sharedLock) {
106  std::cerr << s_uniqueID << " process: early exit called: unlock " << formattedTime().data() << "\n";
107  s_sharedLock->unlock();
108  }
109  }
110 } // namespace
111 
112 int main(int argc, char* argv[]) {
113  std::string descString(argv[0]);
114  descString += " [--";
115  descString += kMemoryNameOpt;
116  descString += "] memory_name";
117  boost::program_options::options_description desc(descString);
118 
119  desc.add_options()(kHelpCommandOpt, "produce help message")(
120  kMemoryNameCommandOpt, boost::program_options::value<std::string>(), "memory name")(
121  kUniqueIDCommandOpt, boost::program_options::value<std::string>(), "unique id")(kVerboseCommandOpt,
122  "verbose output");
123 
124  boost::program_options::positional_options_description p;
125  p.add(kMemoryNameOpt, 1);
126  p.add(kUniqueIDOpt, 2);
127 
128  boost::program_options::options_description all_options("All Options");
129  all_options.add(desc);
130 
131  boost::program_options::variables_map vm;
132  try {
133  store(boost::program_options::command_line_parser(argc, argv).options(all_options).positional(p).run(), vm);
134  notify(vm);
135  } catch (boost::program_options::error const& iException) {
136  std::cout << argv[0] << ": Error while trying to process command line arguments:\n"
137  << iException.what() << "\nFor usage and an options list, please do 'cmsRun --help'.";
138  return 1;
139  }
140 
141  if (vm.count(kHelpOpt)) {
142  std::cout << desc << std::endl;
143  return 0;
144  }
145 
146  bool verbose = false;
147  if (vm.count(kVerboseOpt)) {
148  verbose = true;
149  }
150 
151  if (!vm.count(kMemoryNameOpt)) {
152  std::cout << " no argument given" << std::endl;
153  return 1;
154  }
155 
156  if (!vm.count(kUniqueIDOpt)) {
157  std::cout << " no second argument given" << std::endl;
158  return 1;
159  }
160 
161  using namespace std::string_literals;
162  using namespace std::filesystem;
163 
164  auto newDir = path("thread"s + vm[kUniqueIDOpt].as<std::string>());
165  create_directory(newDir);
166  current_path(newDir);
167 
168  WorkerMonitorThread monitorThread;
169 
170  monitorThread.startThread();
171 
172  std::string presentState = "setting up communicationChannel";
173 
174  CMS_SA_ALLOW try {
175  std::string const memoryName(vm[kMemoryNameOpt].as<std::string>());
176  std::string const uniqueID(vm[kUniqueIDOpt].as<std::string>());
177  s_uniqueID = uniqueID;
178  {
179  //This class is holding the lock
180  WorkerChannel communicationChannel(memoryName, uniqueID);
181 
182  presentState = "setting up read/write buffers";
183  WriteBuffer sm_buffer{memoryName, communicationChannel.fromWorkerBufferInfo()};
184  ReadBuffer sm_readbuffer{std::string("Rand") + memoryName, communicationChannel.toWorkerBufferInfo()};
185  int counter = 0;
186 
187  presentState = "setting up monitor thread";
188  //The lock must be released if there is a catastrophic signal
189  auto lockPtr = communicationChannel.accessLock();
190 
191  monitorThread.setAction([lockPtr]() {
192  if (lockPtr) {
193  std::cerr << s_uniqueID << " process: SIGNAL CAUGHT: unlock " << formattedTime().data() << "\n";
194  lockPtr->unlock();
195  }
196  });
197 
198  presentState = "setting up termination handler";
199  //be sure to unset the address of the shared lock before the lock goes away
200  s_sharedLock = lockPtr;
201  auto unsetLockPtr = [](void*) { s_sharedLock = nullptr; };
202  std::unique_ptr<decltype(s_sharedLock), decltype(unsetLockPtr)> sharedLockGuard{&s_sharedLock, unsetLockPtr};
203  std::atexit(atexit_handler);
204  auto releaseLock = []() {
205  if (s_sharedLock) {
206  std::cerr << s_uniqueID << " process: terminate called: unlock " << formattedTime().data() << "\n";
207  s_sharedLock->unlock();
208  s_sharedLock = nullptr;
209  //deactivate the abort signal
210 
211  struct sigaction act;
212  act.sa_sigaction = nullptr;
213  act.sa_flags = SA_SIGINFO;
214  sigemptyset(&act.sa_mask);
215  sigaction(SIGABRT, &act, nullptr);
216  std::abort();
217  }
218  };
219  std::set_terminate(releaseLock);
220 
221  presentState = "setting up serializers";
222  Serializer<ExternalGeneratorEventInfo> serializer(sm_buffer);
223  Serializer<ExternalGeneratorLumiInfo> bl_serializer(sm_buffer);
224  Serializer<GenLumiInfoProduct> el_serializer(sm_buffer);
225  Serializer<GenRunInfoProduct> er_serializer(sm_buffer);
226 
227  ROOTDeserializer<edm::RandomNumberGeneratorState, ReadBuffer> random_deserializer(sm_readbuffer);
228 
229  presentState = "reading configuration";
230  std::cerr << uniqueID << " process: initializing " << formattedTime().data() << std::endl;
231  int nlines;
232  std::cin >> nlines;
233 
235  for (int i = 0; i < nlines; ++i) {
236  std::string c;
237  std::getline(std::cin, c);
238  if (verbose) {
239  std::cerr << c << "\n";
240  }
241  configuration += c + "\n";
242  }
243 
244  presentState = "setting up random number generator";
246  auto serviceToken =
247  edm::ServiceRegistry::createContaining(std::unique_ptr<edm::RandomNumberGenerator>(randomService));
248  Harness harness(configuration, serviceToken);
249 
250  //Some generator libraries override the signal handlers
251  monitorThread.setupSignalHandling();
252  std::set_terminate(releaseLock);
253 
254  if (verbose) {
255  std::cerr << uniqueID << " process: done initializing " << formattedTime().data() << std::endl;
256  }
257  presentState = "finished initialization";
258  communicationChannel.workerSetupDone();
259 
260  presentState = "waiting for transition";
261  if (verbose)
262  std::cerr << uniqueID << " process: waiting " << counter << " " << formattedTime().data() << std::endl;
263  communicationChannel.handleTransitions([&](edm::Transition iTransition, unsigned long long iTransitionID) {
264  ++counter;
265  switch (iTransition) {
267  presentState = "beginRun transition";
268  if (verbose)
269  std::cerr << uniqueID << " process: start beginRun " << formattedTime().data() << std::endl;
270  if (verbose)
271  std::cerr << uniqueID << " process: end beginRun " << formattedTime().data() << std::endl;
272 
273  break;
274  }
276  presentState = "begin lumi";
277  if (verbose)
278  std::cerr << uniqueID << " process: start beginLumi " << formattedTime().data() << std::endl;
279  auto randState = random_deserializer.deserialize();
280  presentState = "deserialized random state in begin lumi";
281  if (verbose)
282  std::cerr << uniqueID << " random " << randState.state_.size() << " " << randState.seed_ << std::endl;
283  randomService->setState(randState.state_, randState.seed_);
284  presentState = "processing begin lumi";
285  auto value = harness.getBeginLumiValue(iTransitionID);
286  value.randomState_.state_ = randomService->getState();
287  value.randomState_.seed_ = randomService->mySeed();
288 
289  presentState = "serialize lumi";
290  bl_serializer.serialize(value);
291  if (verbose)
292  std::cerr << uniqueID << " process: end beginLumi " << formattedTime().data() << std::endl;
293  if (verbose)
294  std::cerr << uniqueID << " rand " << value.randomState_.state_.size() << " " << value.randomState_.seed_
295  << std::endl;
296  break;
297  }
298  case edm::Transition::Event: {
299  presentState = "begin event";
300  if (verbose)
301  std::cerr << uniqueID << " process: event " << counter << " " << formattedTime().data() << std::endl;
302  presentState = "deserialized random state in event";
303  auto randState = random_deserializer.deserialize();
304  randomService->setState(randState.state_, randState.seed_);
305  presentState = "processing event";
306  auto value = harness.getEventValue();
307  value.randomState_.state_ = randomService->getState();
308  value.randomState_.seed_ = randomService->mySeed();
309 
310  if (verbose)
311  std::cerr << uniqueID << " process: event " << counter << " " << formattedTime().data() << std::endl;
312 
313  presentState = "serialize event";
314  serializer.serialize(value);
315  if (verbose)
316  std::cerr << uniqueID << " process: "
317  << " " << counter << std::endl;
318  //usleep(10000000);
319  break;
320  }
322  presentState = "begin end lumi";
323  if (verbose)
324  std::cerr << uniqueID << " process: start endLumi " << formattedTime().data() << std::endl;
325  presentState = "processing end lumi";
326  auto value = harness.getEndLumiValue();
327 
328  presentState = "serialize end lumi";
329  el_serializer.serialize(value);
330  if (verbose)
331  std::cerr << uniqueID << " process: end endLumi " << formattedTime().data() << std::endl;
332 
333  break;
334  }
336  presentState = "begin end run";
337  if (verbose)
338  std::cerr << uniqueID << " process: start endRun " << formattedTime().data() << std::endl;
339  presentState = "process end run";
340  auto value = harness.getEndRunValue();
341 
342  presentState = "serialize end run";
343  er_serializer.serialize(value);
344  if (verbose)
345  std::cerr << uniqueID << " process: end endRun " << formattedTime().data() << std::endl;
346 
347  break;
348  }
349  default: {
350  assert(false);
351  }
352  }
353  presentState = "notifying and waiting after " + presentState;
354  if (verbose)
355  std::cerr << uniqueID << " process: notifying and waiting " << counter << " " << std::endl;
356  });
357  }
358  } catch (std::exception const& iExcept) {
359  std::cerr << "%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n"
360  << s_uniqueID << " process: caught exception \n"
361  << iExcept.what() << " " << formattedTime().data() << "\n"
362  << " while " << presentState << "\n"
363  << "%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n";
364  return 1;
365  } catch (...) {
366  std::cerr << "%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n"
367  << s_uniqueID << " process: caught unknown exception " << formattedTime().data() << "\n while "
368  << presentState << "\n"
369  << "%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n";
370  return 1;
371  }
372  return 0;
373 }
edmtest::ThingCollection getEndLumiValue()
Definition: interprocess.cc:47
#define CMS_SA_ALLOW
edmtest::ThingCollection getEventValue()
Definition: interprocess.cc:42
boost::interprocess::scoped_lock< boost::interprocess::named_mutex > * accessLock()
the lock is made accessible so that the WorkerMonitorThread can be used to unlock it in the event of ...
Definition: WorkerChannel.h:48
ExternalGeneratorEventInfo getEventValue()
bool verbose
static char const *const kHelpOpt
static char const *const kMemoryNameCommandOpt
edmtest::ThingCollection getBeginLumiValue(unsigned int iLumi)
Definition: interprocess.cc:37
void setState(std::vector< unsigned long > const &, long seed)
GenRunInfoProduct getEndRunValue()
void setupSignalHandling()
Sets the unix signal handler which communicates with the thread.
static char const *const kMemoryNameOpt
static std::string s_uniqueID
void setAction(std::function< void()> iFunc)
assert(be >=bs)
static char const *const kHelpCommandOpt
GenLumiInfoProduct getEndLumiValue()
BufferInfo * toWorkerBufferInfo()
This can be used with ReadBuffer to keep Controller and Worker in sync.
Definition: WorkerChannel.h:51
void workerSetupDone()
Matches the ControllerChannel::setupWorker call.
Definition: WorkerChannel.h:56
dictionary configuration
Definition: JetHT_cfg.py:38
Transition
Definition: Transition.h:12
static ServiceToken createContaining(std::unique_ptr< T > iService)
create a service token that holds the service defined by iService
static char const *const kVerboseOpt
#define CMS_THREAD_SAFE
ExternalGeneratorLumiInfo getBeginLumiValue(unsigned int iLumi)
Definition: value.py:1
static char const *const kVerboseCommandOpt
BufferInfo * fromWorkerBufferInfo()
This can be used with WriteBuffer to keep Controller and Worker in sync.
Definition: WorkerChannel.h:53
static char const *const kUniqueIDCommandOpt
Harness(std::string const &iConfig, edm::ServiceToken iToken)
edmtest::ThingCollection getEndRunValue()
Definition: interprocess.cc:52
static char const *const kUniqueIDOpt
HLT enums.
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:80
static std::atomic< unsigned int > counter
Definition: Config.py:1
int main(int argc, char *argv[])