CMS 3D CMS Logo

interprocess.cc
Go to the documentation of this file.
1 #include "boost/program_options.hpp"
2 
3 #include <atomic>
4 #include <csignal>
5 #include <iostream>
6 #include <string>
7 #include <thread>
8 
10 #include "DataFormats/TestObjects/interface/ToyProducts.h"
11 #include "DataFormats/TestObjects/interface/ThingCollection.h"
12 
17 
18 static char const* const kMemoryNameOpt = "memory-name";
19 static char const* const kMemoryNameCommandOpt = "memory-name,m";
20 static char const* const kUniqueIDOpt = "unique-id";
21 static char const* const kUniqueIDCommandOpt = "unique-id,i";
22 static char const* const kHelpOpt = "help";
23 static char const* const kHelpCommandOpt = "help,h";
24 
25 //NOTE: Can use TestProcessor as the harness for the worker
26 
27 using namespace edm::shared_memory;
28 class Harness {
29 public:
30  Harness(std::string const& iConfig) : tester_(edm::test::TestProcessor::Config{iConfig}) {}
31 
32  edmtest::ThingCollection getBeginRunValue(unsigned int iRun) {
33  auto run = tester_.testBeginRun(iRun);
34  return *run.get<edmtest::ThingCollection>("beginRun");
35  }
36 
37  edmtest::ThingCollection getBeginLumiValue(unsigned int iLumi) {
38  auto lumi = tester_.testBeginLuminosityBlock(iLumi);
39  return *lumi.get<edmtest::ThingCollection>("beginLumi");
40  }
41 
42  edmtest::ThingCollection getEventValue() {
43  auto event = tester_.test();
44  return *event.get<edmtest::ThingCollection>();
45  }
46 
47  edmtest::ThingCollection getEndLumiValue() {
48  auto lumi = tester_.testEndLuminosityBlock();
49  return *lumi.get<edmtest::ThingCollection>("endLumi");
50  }
51 
52  edmtest::ThingCollection getEndRunValue() {
53  auto run = tester_.testEndRun();
54  return *run.get<edmtest::ThingCollection>("endRun");
55  }
56 
57 private:
59 };
60 
61 int main(int argc, char* argv[]) {
62  std::string descString(argv[0]);
63  descString += " [--";
64  descString += kMemoryNameOpt;
65  descString += "] memory_name";
66  boost::program_options::options_description desc(descString);
67 
68  desc.add_options()(kHelpCommandOpt, "produce help message")(
69  kMemoryNameCommandOpt, boost::program_options::value<std::string>(), "memory name")(
70  kUniqueIDCommandOpt, boost::program_options::value<std::string>(), "unique id");
71 
72  boost::program_options::positional_options_description p;
73  p.add(kMemoryNameOpt, 1);
74  p.add(kUniqueIDOpt, 2);
75 
76  boost::program_options::options_description all_options("All Options");
77  all_options.add(desc);
78 
79  boost::program_options::variables_map vm;
80  try {
81  store(boost::program_options::command_line_parser(argc, argv).options(all_options).positional(p).run(), vm);
82  notify(vm);
83  } catch (boost::program_options::error const& iException) {
84  std::cout << argv[0] << ": Error while trying to process command line arguments:\n"
85  << iException.what() << "\nFor usage and an options list, please do 'cmsRun --help'.";
86  return 1;
87  }
88 
89  if (vm.count(kHelpOpt)) {
90  std::cout << desc << std::endl;
91  return 0;
92  }
93 
94  if (!vm.count(kMemoryNameOpt)) {
95  std::cout << " no argument given" << std::endl;
96  return 1;
97  }
98 
99  if (!vm.count(kUniqueIDOpt)) {
100  std::cout << " no second argument given" << std::endl;
101  return 1;
102  }
103 
104  WorkerMonitorThread monitorThread;
105 
106  monitorThread.startThread();
107 
108  CMS_SA_ALLOW try {
109  std::string const memoryName(vm[kMemoryNameOpt].as<std::string>());
110  std::string const uniqueID(vm[kUniqueIDOpt].as<std::string>());
111  {
112  //using namespace boost::interprocess;
113  //auto controlNameUnique = unique_name(memoryName, uniqueID);
114 
115  //This class is holding the lock
116  WorkerChannel communicationChannel(memoryName, uniqueID);
117 
118  WriteBuffer sm_buffer{memoryName, communicationChannel.fromWorkerBufferInfo()};
119  int counter = 0;
120 
121  //The lock must be released if there is a catastrophic signal
122  auto lockPtr = communicationChannel.accessLock();
123  monitorThread.setAction([lockPtr]() {
124  if (lockPtr) {
125  std::cerr << "SIGNAL CAUGHT: unlock\n";
126  lockPtr->unlock();
127  }
128  });
129 
131  TCSerializer serializer(sm_buffer);
132  TCSerializer br_serializer(sm_buffer);
133  TCSerializer bl_serializer(sm_buffer);
134  TCSerializer el_serializer(sm_buffer);
135  TCSerializer er_serializer(sm_buffer);
136 
137  std::cerr << uniqueID << " process: initializing " << std::endl;
138  int nlines;
139  std::cin >> nlines;
140 
142  for (int i = 0; i < nlines; ++i) {
143  std::string c;
144  std::getline(std::cin, c);
145  std::cerr << c << "\n";
146  configuration += c + "\n";
147  }
148 
149  Harness harness(configuration);
150 
151  //Either ROOT or the Framework are overriding the signal handlers
152  monitorThread.setupSignalHandling();
153 
154  std::cerr << uniqueID << " process: done initializing" << std::endl;
155  communicationChannel.workerSetupDone();
156 
157  std::cerr << uniqueID << " process: waiting " << counter << std::endl;
158  communicationChannel.handleTransitions([&](edm::Transition iTransition, unsigned long long iTransitionID) {
159  ++counter;
160  switch (iTransition) {
162  std::cerr << uniqueID << " process: start beginRun " << std::endl;
163  auto value = harness.getBeginRunValue(iTransitionID);
164 
165  br_serializer.serialize(value);
166  std::cerr << uniqueID << " process: end beginRun " << value.size() << std::endl;
167 
168  break;
169  }
171  std::cerr << uniqueID << " process: start beginLumi " << std::endl;
172  auto value = harness.getBeginLumiValue(iTransitionID);
173 
174  bl_serializer.serialize(value);
175  std::cerr << uniqueID << " process: end beginLumi " << value.size() << std::endl;
176 
177  break;
178  }
179  case edm::Transition::Event: {
180  std::cerr << uniqueID << " process: integrating " << counter << std::endl;
181  auto value = harness.getEventValue();
182 
183  std::cerr << uniqueID << " process: integrated " << counter << std::endl;
184 
185  serializer.serialize(value);
186  std::cerr << uniqueID << " process: " << value.size() << " " << counter << std::endl;
187  //usleep(10000000);
188  break;
189  }
191  std::cerr << uniqueID << " process: start endLumi " << std::endl;
192  auto value = harness.getEndLumiValue();
193 
194  el_serializer.serialize(value);
195  std::cerr << uniqueID << " process: end endLumi " << value.size() << std::endl;
196 
197  break;
198  }
200  std::cerr << uniqueID << " process: start endRun " << std::endl;
201  auto value = harness.getEndRunValue();
202 
203  er_serializer.serialize(value);
204  std::cerr << uniqueID << " process: end endRun " << value.size() << std::endl;
205 
206  break;
207  }
208  default: {
209  assert(false);
210  }
211  }
212  std::cerr << uniqueID << " process: notifying and waiting" << counter << std::endl;
213  });
214  }
215  } catch (std::exception const& iExcept) {
216  std::cerr << "caught exception \n" << iExcept.what() << "\n";
217  return 1;
218  } catch (...) {
219  std::cerr << "caught unknown exception";
220  return 1;
221  }
222  return 0;
223 }
counter
Definition: counter.py:1
cmsBatch.argv
argv
Definition: cmsBatch.py:279
Harness::Harness
Harness(std::string const &iConfig)
Definition: interprocess.cc:30
Harness::getEndRunValue
edmtest::ThingCollection getEndRunValue()
Definition: interprocess.cc:52
mps_fire.i
i
Definition: mps_fire.py:355
dir2webdir.argc
argc
Definition: dir2webdir.py:39
edm
HLT enums.
Definition: AlignableModifier.h:19
edm::Transition::Event
AlCaHLTBitMon_ParallelJobs.p
p
Definition: AlCaHLTBitMon_ParallelJobs.py:153
TestProcessor
gather_cfg.cout
cout
Definition: gather_cfg.py:144
cms::cuda::assert
assert(be >=bs)
edm::shared_memory::WorkerMonitorThread
Definition: WorkerMonitorThread.h:32
main
int main(int argc, char *argv[])
Definition: interprocess.cc:61
edm::shared_memory::ROOTSerializer
Definition: ROOTSerializer.h:31
Harness::tester_
edm::test::TestProcessor tester_
Definition: interprocess.cc:58
Harness
Definition: interprocess.cc:28
relativeConstraints.error
error
Definition: relativeConstraints.py:53
CMS_SA_ALLOW
#define CMS_SA_ALLOW
Definition: thread_safety_macros.h:5
edm::shared_memory::WorkerChannel::fromWorkerBufferInfo
BufferInfo * fromWorkerBufferInfo()
This can be used with WriteBuffer to keep Controller and Worker in sync.
Definition: WorkerChannel.h:53
TestProcessor.h
edm::shared_memory::WriteBuffer
Definition: WriteBuffer.h:37
edm::shared_memory::WorkerChannel::accessLock
boost::interprocess::scoped_lock< boost::interprocess::named_mutex > * accessLock()
the lock is made accessible so that the WorkerMonitorThread can be used to unlock it in the event of ...
Definition: WorkerChannel.h:48
edm::shared_memory::WorkerMonitorThread::setupSignalHandling
void setupSignalHandling()
Sets the unix signal handler which communicates with the thread.
Definition: WorkerMonitorThread.cc:102
test
Definition: SmallWORMDict.h:13
edm::shared_memory::WorkerMonitorThread::startThread
void startThread()
Definition: WorkerMonitorThread.cc:78
kUniqueIDCommandOpt
static char const *const kUniqueIDCommandOpt
Definition: interprocess.cc:21
edm::shared_memory::WorkerMonitorThread::setAction
void setAction(std::function< void()> iFunc)
Definition: WorkerMonitorThread.h:48
kMemoryNameOpt
static char const *const kMemoryNameOpt
Definition: interprocess.cc:18
edm::Transition::BeginLuminosityBlock
Harness::getBeginLumiValue
edmtest::ThingCollection getBeginLumiValue(unsigned int iLumi)
Definition: interprocess.cc:37
kUniqueIDOpt
static char const *const kUniqueIDOpt
Definition: interprocess.cc:20
AlCaHLTBitMon_QueryRunRegistry.string
string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
cppFunctionSkipper.exception
exception
Definition: cppFunctionSkipper.py:10
edm::Transition
Transition
Definition: Transition.h:12
ctppsRawToDigi_cff.configuration
configuration
Definition: ctppsRawToDigi_cff.py:11
beam_dqm_sourceclient-live_cfg.cerr
cerr
Definition: beam_dqm_sourceclient-live_cfg.py:17
Harness::getEventValue
edmtest::ThingCollection getEventValue()
Definition: interprocess.cc:42
WorkerChannel.h
value
Definition: value.py:1
counter
static std::atomic< unsigned int > counter
Definition: SharedResourceNames.cc:15
edm::test::TestProcessor
Definition: TestProcessor.h:154
Harness::getBeginRunValue
edmtest::ThingCollection getBeginRunValue(unsigned int iRun)
Definition: interprocess.cc:32
edm::shared_memory
Definition: buffer_names.h:27
HltBtagPostValidation_cff.c
c
Definition: HltBtagPostValidation_cff.py:31
kHelpOpt
static char const *const kHelpOpt
Definition: interprocess.cc:22
edm::shared_memory::WorkerChannel::handleTransitions
void handleTransitions(F &&iF)
Definition: WorkerChannel.h:65
Harness::getEndLumiValue
edmtest::ThingCollection getEndLumiValue()
Definition: interprocess.cc:47
edm::shared_memory::WorkerChannel
Definition: WorkerChannel.h:35
kHelpCommandOpt
static char const *const kHelpCommandOpt
Definition: interprocess.cc:23
edm::Transition::EndLuminosityBlock
writedatasetfile.run
run
Definition: writedatasetfile.py:27
edm::Transition::BeginRun
AlcaSiPixelAliHarvester0T_cff.options
options
Definition: AlcaSiPixelAliHarvester0T_cff.py:42
WriteBuffer.h
WorkerMonitorThread.h
kMemoryNameCommandOpt
static char const *const kMemoryNameCommandOpt
Definition: interprocess.cc:19
Config
Definition: Config.py:1
edm::Transition::EndRun
edm::shared_memory::WorkerChannel::workerSetupDone
void workerSetupDone()
Matches the ControllerChannel::setupWorker call.
Definition: WorkerChannel.h:56
lumi
Definition: LumiSectionData.h:20
ROOTSerializer.h