CMS 3D CMS Logo

interprocess_random.cc
Go to the documentation of this file.
1 #include "boost/program_options.hpp"
2 
3 #include <atomic>
4 #include <csignal>
5 #include <iostream>
6 #include <string>
7 #include <thread>
8 
10 #include "DataFormats/TestObjects/interface/ToyProducts.h"
12 
14 
21 
22 static char const* const kMemoryNameOpt = "memory-name";
23 static char const* const kMemoryNameCommandOpt = "memory-name,m";
24 static char const* const kUniqueIDOpt = "unique-id";
25 static char const* const kUniqueIDCommandOpt = "unique-id,i";
26 static char const* const kHelpOpt = "help";
27 static char const* const kHelpCommandOpt = "help,h";
28 
29 //NOTE: Can use TestProcessor as the harness for the worker
30 
31 using SentType = std::pair<edmtest::IntProduct, edm::RandomNumberGeneratorState>;
32 
33 using namespace edm::shared_memory;
34 class Harness {
35 public:
36  Harness(std::string const& iConfig, edm::ServiceToken iToken)
37  : tester_(edm::test::TestProcessor::Config{iConfig}, iToken) {}
38 
39  edmtest::IntProduct getBeginLumiValue(unsigned int iLumi) {
40  auto lumi = tester_.testBeginLuminosityBlock(iLumi);
41  return *lumi.get<edmtest::IntProduct>("lumi");
42  }
43 
44  edmtest::IntProduct getEventValue() {
45  auto event = tester_.test();
46  return *event.get<edmtest::IntProduct>();
47  }
48 
49 private:
51 };
52 
53 int main(int argc, char* argv[]) {
54  std::string descString(argv[0]);
55  descString += " [--";
56  descString += kMemoryNameOpt;
57  descString += "] memory_name";
58  boost::program_options::options_description desc(descString);
59 
60  desc.add_options()(kHelpCommandOpt, "produce help message")(
62  boost::program_options::value<std::string>(),
63  "memory name")(kUniqueIDCommandOpt, boost::program_options::value<std::string>(), "unique id");
64 
65  boost::program_options::positional_options_description p;
66  p.add(kMemoryNameOpt, 1);
67  p.add(kUniqueIDOpt, 2);
68 
69  boost::program_options::options_description all_options("All Options");
70  all_options.add(desc);
71 
72  boost::program_options::variables_map vm;
73  try {
74  store(boost::program_options::command_line_parser(argc, argv).options(all_options).positional(p).run(), vm);
75  notify(vm);
76  } catch (boost::program_options::error const& iException) {
77  std::cout << argv[0] << ": Error while trying to process command line arguments:\n"
78  << iException.what() << "\nFor usage and an options list, please do 'cmsRun --help'.";
79  return 1;
80  }
81 
82  if (vm.count(kHelpOpt)) {
83  std::cout << desc << std::endl;
84  return 0;
85  }
86 
87  if (!vm.count(kMemoryNameOpt)) {
88  std::cout << " no argument given" << std::endl;
89  return 1;
90  }
91 
92  if (!vm.count(kUniqueIDOpt)) {
93  std::cout << " no second argument given" << std::endl;
94  return 1;
95  }
96 
97  WorkerMonitorThread monitorThread;
98 
99  monitorThread.startThread();
100 
101  CMS_SA_ALLOW try {
102  std::string const memoryName(vm[kMemoryNameOpt].as<std::string>());
103  std::string const uniqueID(vm[kUniqueIDOpt].as<std::string>());
104  {
105  //This class is holding the lock
106  WorkerChannel communicationChannel(memoryName, uniqueID);
107 
108  WriteBuffer sm_buffer{memoryName, communicationChannel.fromWorkerBufferInfo()};
109  ReadBuffer sm_readbuffer{std::string("Rand") + memoryName, communicationChannel.toWorkerBufferInfo()};
110  int counter = 0;
111 
112  //The lock must be released if there is a catastrophic signal
113  auto lockPtr = communicationChannel.accessLock();
114  monitorThread.setAction([lockPtr]() {
115  if (lockPtr) {
116  std::cerr << "SIGNAL CAUGHT: unlock\n";
117  lockPtr->unlock();
118  }
119  });
120 
121  using TCSerializer = ROOTSerializer<SentType, WriteBuffer>;
122  TCSerializer serializer(sm_buffer);
123  TCSerializer bl_serializer(sm_buffer);
124 
126  TCDeserializer random_deserializer(sm_readbuffer);
127 
128  std::cerr << uniqueID << " process: initializing " << std::endl;
129  int nlines;
130  std::cin >> nlines;
131 
133  for (int i = 0; i < nlines; ++i) {
134  std::string c;
135  std::getline(std::cin, c);
136  std::cerr << c << "\n";
137  configuration += c + "\n";
138  }
139 
141  auto serviceToken =
142  edm::ServiceRegistry::createContaining(std::unique_ptr<edm::RandomNumberGenerator>(randomService));
143 
144  Harness harness(configuration, serviceToken);
145 
146  //Either ROOT or the Framework are overriding the signal handlers
147  monitorThread.setupSignalHandling();
148 
149  std::cerr << uniqueID << " process: done initializing" << std::endl;
150  communicationChannel.workerSetupDone();
151 
152  std::cerr << uniqueID << " process: waiting " << counter << std::endl;
153  communicationChannel.handleTransitions([&](edm::Transition iTransition, unsigned long long iTransitionID) {
154  ++counter;
155  switch (iTransition) {
157  std::cerr << uniqueID << " process: start beginLumi " << std::endl;
158  auto randState = random_deserializer.deserialize();
159  std::cerr << " state " << randState.seed_ << std::endl;
160  randomService->setState(randState.state_, randState.seed_);
161  SentType toSend;
162  toSend.first = harness.getBeginLumiValue(iTransitionID);
163  toSend.second.state_ = randomService->getState();
164  toSend.second.seed_ = randomService->mySeed();
165  bl_serializer.serialize(toSend);
166  std::cerr << uniqueID << " process: end beginLumi " << toSend.first.value << std::endl;
167 
168  break;
169  }
170  case edm::Transition::Event: {
171  std::cerr << uniqueID << " process: begin event " << counter << std::endl;
172  auto randState = random_deserializer.deserialize();
173  randomService->setState(randState.state_, randState.seed_);
174  SentType toSend;
175  toSend.first = harness.getEventValue();
176  toSend.second.state_ = randomService->getState();
177  toSend.second.seed_ = randomService->mySeed();
178  std::cerr << uniqueID << " process: end event " << counter << std::endl;
179 
180  serializer.serialize(toSend);
181  std::cerr << uniqueID << " process: " << toSend.first.value << " " << counter << std::endl;
182  //usleep(10000000);
183  break;
184  }
185  default: {
186  assert(false);
187  }
188  }
189  std::cerr << uniqueID << " process: notifying and waiting" << counter << std::endl;
190  });
191  }
192  } catch (std::exception const& iExcept) {
193  std::cerr << "caught exception \n" << iExcept.what() << "\n";
194  return 1;
195  } catch (...) {
196  std::cerr << "caught unknown exception";
197  return 1;
198  }
199  return 0;
200 }
counter
Definition: counter.py:1
cmsBatch.argv
argv
Definition: cmsBatch.py:279
mps_fire.i
i
Definition: mps_fire.py:428
dir2webdir.argc
argc
Definition: dir2webdir.py:39
ReadBuffer.h
edm
HLT enums.
Definition: AlignableModifier.h:19
edm::Transition::Event
TestProcessor
gather_cfg.cout
cout
Definition: gather_cfg.py:144
cms::cuda::assert
assert(be >=bs)
edm::shared_memory::WorkerMonitorThread
Definition: WorkerMonitorThread.h:32
kHelpOpt
static char const *const kHelpOpt
Definition: interprocess_random.cc:26
edm::ExternalRandomNumberGeneratorService::mySeed
std::uint32_t mySeed() const final
Definition: ExternalRandomNumberGeneratorService.cc:69
RandomNumberGeneratorState.h
edm::shared_memory::ROOTSerializer
Definition: ROOTSerializer.h:31
Harness
Definition: interprocess.cc:28
relativeConstraints.error
error
Definition: relativeConstraints.py:53
CMS_SA_ALLOW
#define CMS_SA_ALLOW
Definition: thread_safety_macros.h:5
edm::shared_memory::WorkerChannel::fromWorkerBufferInfo
BufferInfo * fromWorkerBufferInfo()
This can be used with WriteBuffer to keep Controller and Worker in sync.
Definition: WorkerChannel.h:53
TestProcessor.h
edm::shared_memory::ROOTDeserializer
Definition: ROOTDeserializer.h:31
Harness::getBeginLumiValue
edmtest::IntProduct getBeginLumiValue(unsigned int iLumi)
Definition: interprocess_random.cc:39
edm::shared_memory::WriteBuffer
Definition: WriteBuffer.h:37
edm::shared_memory::WorkerChannel::accessLock
boost::interprocess::scoped_lock< boost::interprocess::named_mutex > * accessLock()
the lock is made accessible so that the WorkerMonitorThread can be used to unlock it in the event of ...
Definition: WorkerChannel.h:48
edm::shared_memory::WorkerMonitorThread::setupSignalHandling
void setupSignalHandling()
Sets the unix signal handler which communicates with the thread.
Definition: WorkerMonitorThread.cc:102
kUniqueIDCommandOpt
static char const *const kUniqueIDCommandOpt
Definition: interprocess_random.cc:25
edm::ServiceToken
Definition: ServiceToken.h:42
test
Definition: SmallWORMDict.h:13
edm::ExternalRandomNumberGeneratorService::setState
void setState(std::vector< unsigned long > const &, long seed)
Definition: ExternalRandomNumberGeneratorService.cc:24
edm::shared_memory::WorkerMonitorThread::startThread
void startThread()
Definition: WorkerMonitorThread.cc:78
Harness::Harness
Harness(std::string const &iConfig, edm::ServiceToken iToken)
Definition: interprocess_random.cc:36
edm::shared_memory::WorkerMonitorThread::setAction
void setAction(std::function< void()> iFunc)
Definition: WorkerMonitorThread.h:48
edm::Transition::BeginLuminosityBlock
kMemoryNameCommandOpt
static char const *const kMemoryNameCommandOpt
Definition: interprocess_random.cc:23
Harness::getBeginLumiValue
edmtest::ThingCollection getBeginLumiValue(unsigned int iLumi)
Definition: interprocess.cc:37
cppFunctionSkipper.exception
exception
Definition: cppFunctionSkipper.py:10
edm::Transition
Transition
Definition: Transition.h:12
AlCaHLTBitMon_ParallelJobs.p
def p
Definition: AlCaHLTBitMon_ParallelJobs.py:153
ctppsRawToDigi_cff.configuration
configuration
Definition: ctppsRawToDigi_cff.py:11
Harness::getEventValue
edmtest::ThingCollection getEventValue()
Definition: interprocess.cc:42
WorkerChannel.h
ExternalRandomNumberGeneratorService.h
counter
static std::atomic< unsigned int > counter
Definition: SharedResourceNames.cc:18
edm::test::TestProcessor
Definition: TestProcessor.h:159
edm::shared_memory
Definition: buffer_names.h:27
AlCaHLTBitMon_QueryRunRegistry.string
string string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
edm::shared_memory::WorkerChannel::handleTransitions
void handleTransitions(F &&iF)
Definition: WorkerChannel.h:65
edm::shared_memory::WorkerChannel
Definition: WorkerChannel.h:35
kMemoryNameOpt
static char const *const kMemoryNameOpt
Definition: interprocess_random.cc:22
edm::ExternalRandomNumberGeneratorService::getState
std::vector< unsigned long > getState() const
Definition: ExternalRandomNumberGeneratorService.cc:32
submitPVResolutionJobs.desc
string desc
Definition: submitPVResolutionJobs.py:251
writedatasetfile.run
run
Definition: writedatasetfile.py:27
edm::ServiceRegistry::createContaining
static ServiceToken createContaining(std::unique_ptr< T > iService)
create a service token that holds the service defined by iService
Definition: ServiceRegistry.h:99
ROOTDeserializer.h
AlcaSiPixelAliHarvester0T_cff.options
options
Definition: AlcaSiPixelAliHarvester0T_cff.py:42
WriteBuffer.h
kUniqueIDOpt
static char const *const kUniqueIDOpt
Definition: interprocess_random.cc:24
WorkerMonitorThread.h
main
int main(int argc, char *argv[])
Definition: interprocess_random.cc:53
edm::shared_memory::ReadBuffer
Definition: ReadBuffer.h:34
Config
Definition: Config.py:1
c
auto & c
Definition: CAHitNtupletGeneratorKernelsImpl.h:56
Harness::getEventValue
edmtest::IntProduct getEventValue()
Definition: interprocess_random.cc:44
edm::ExternalRandomNumberGeneratorService
Definition: ExternalRandomNumberGeneratorService.h:20
edm::shared_memory::WorkerChannel::workerSetupDone
void workerSetupDone()
Matches the ControllerChannel::setupWorker call.
Definition: WorkerChannel.h:56
EcnaPython_AdcPeg12_S1_10_R170298_1_0_150_Dee0.cerr
cerr
Definition: EcnaPython_AdcPeg12_S1_10_R170298_1_0_150_Dee0.py:8
lumi
Definition: LumiSectionData.h:20
edm::shared_memory::WorkerChannel::toWorkerBufferInfo
BufferInfo * toWorkerBufferInfo()
This can be used with ReadBuffer to keep Controller and Worker in sync.
Definition: WorkerChannel.h:51
SentType
std::pair< edmtest::IntProduct, edm::RandomNumberGeneratorState > SentType
Definition: interprocess_random.cc:31
ROOTSerializer.h
kHelpCommandOpt
static char const *const kHelpCommandOpt
Definition: interprocess_random.cc:27