CMS 3D CMS Logo

interprocess.cc
Go to the documentation of this file.
1 #include "boost/program_options.hpp"
2 
3 #include <atomic>
4 #include <csignal>
5 #include <iostream>
6 #include <string>
7 #include <thread>
8 
10 #include "DataFormats/TestObjects/interface/ToyProducts.h"
11 #include "DataFormats/TestObjects/interface/ThingCollection.h"
12 
17 
18 static char const* const kMemoryNameOpt = "memory-name";
19 static char const* const kMemoryNameCommandOpt = "memory-name,m";
20 static char const* const kUniqueIDOpt = "unique-id";
21 static char const* const kUniqueIDCommandOpt = "unique-id,i";
22 static char const* const kHelpOpt = "help";
23 static char const* const kHelpCommandOpt = "help,h";
24 
25 //NOTE: Can use TestProcessor as the harness for the worker
26 
27 using namespace edm::shared_memory;
28 class Harness {
29 public:
30  Harness(std::string const& iConfig) : tester_(edm::test::TestProcessor::Config{iConfig}) {}
31 
32  edmtest::ThingCollection getBeginRunValue(unsigned int iRun) {
33  auto run = tester_.testBeginRun(iRun);
34  return *run.get<edmtest::ThingCollection>("beginRun");
35  }
36 
37  edmtest::ThingCollection getBeginLumiValue(unsigned int iLumi) {
38  auto lumi = tester_.testBeginLuminosityBlock(iLumi);
39  return *lumi.get<edmtest::ThingCollection>("beginLumi");
40  }
41 
42  edmtest::ThingCollection getEventValue() {
43  auto event = tester_.test();
44  return *event.get<edmtest::ThingCollection>();
45  }
46 
47  edmtest::ThingCollection getEndLumiValue() {
48  auto lumi = tester_.testEndLuminosityBlock();
49  return *lumi.get<edmtest::ThingCollection>("endLumi");
50  }
51 
52  edmtest::ThingCollection getEndRunValue() {
53  auto run = tester_.testEndRun();
54  return *run.get<edmtest::ThingCollection>("endRun");
55  }
56 
57 private:
59 };
60 
61 int main(int argc, char* argv[]) {
62  std::string descString(argv[0]);
63  descString += " [--";
64  descString += kMemoryNameOpt;
65  descString += "] memory_name";
66  boost::program_options::options_description desc(descString);
67 
68  desc.add_options()(kHelpCommandOpt, "produce help message")(
70  boost::program_options::value<std::string>(),
71  "memory name")(kUniqueIDCommandOpt, boost::program_options::value<std::string>(), "unique id");
72 
73  boost::program_options::positional_options_description p;
74  p.add(kMemoryNameOpt, 1);
75  p.add(kUniqueIDOpt, 2);
76 
77  boost::program_options::options_description all_options("All Options");
78  all_options.add(desc);
79 
80  boost::program_options::variables_map vm;
81  try {
82  store(boost::program_options::command_line_parser(argc, argv).options(all_options).positional(p).run(), vm);
83  notify(vm);
84  } catch (boost::program_options::error const& iException) {
85  std::cout << argv[0] << ": Error while trying to process command line arguments:\n"
86  << iException.what() << "\nFor usage and an options list, please do 'cmsRun --help'.";
87  return 1;
88  }
89 
90  if (vm.count(kHelpOpt)) {
91  std::cout << desc << std::endl;
92  return 0;
93  }
94 
95  if (!vm.count(kMemoryNameOpt)) {
96  std::cout << " no argument given" << std::endl;
97  return 1;
98  }
99 
100  if (!vm.count(kUniqueIDOpt)) {
101  std::cout << " no second argument given" << std::endl;
102  return 1;
103  }
104 
105  WorkerMonitorThread monitorThread;
106 
107  monitorThread.startThread();
108 
109  CMS_SA_ALLOW try {
110  std::string const memoryName(vm[kMemoryNameOpt].as<std::string>());
111  std::string const uniqueID(vm[kUniqueIDOpt].as<std::string>());
112  {
113  //using namespace boost::interprocess;
114  //auto controlNameUnique = unique_name(memoryName, uniqueID);
115 
116  //This class is holding the lock
117  WorkerChannel communicationChannel(memoryName, uniqueID);
118 
119  WriteBuffer sm_buffer{memoryName, communicationChannel.fromWorkerBufferInfo()};
120  int counter = 0;
121 
122  //The lock must be released if there is a catastrophic signal
123  auto lockPtr = communicationChannel.accessLock();
124  monitorThread.setAction([lockPtr]() {
125  if (lockPtr) {
126  std::cerr << "SIGNAL CAUGHT: unlock\n";
127  lockPtr->unlock();
128  }
129  });
130 
132  TCSerializer serializer(sm_buffer);
133  TCSerializer br_serializer(sm_buffer);
134  TCSerializer bl_serializer(sm_buffer);
135  TCSerializer el_serializer(sm_buffer);
136  TCSerializer er_serializer(sm_buffer);
137 
138  std::cerr << uniqueID << " process: initializing " << std::endl;
139  int nlines;
140  std::cin >> nlines;
141 
143  for (int i = 0; i < nlines; ++i) {
144  std::string c;
145  std::getline(std::cin, c);
146  std::cerr << c << "\n";
147  configuration += c + "\n";
148  }
149 
150  Harness harness(configuration);
151 
152  //Either ROOT or the Framework are overriding the signal handlers
153  monitorThread.setupSignalHandling();
154 
155  std::cerr << uniqueID << " process: done initializing" << std::endl;
156  communicationChannel.workerSetupDone();
157 
158  std::cerr << uniqueID << " process: waiting " << counter << std::endl;
159  communicationChannel.handleTransitions([&](edm::Transition iTransition, unsigned long long iTransitionID) {
160  ++counter;
161  switch (iTransition) {
163  std::cerr << uniqueID << " process: start beginRun " << std::endl;
164  auto value = harness.getBeginRunValue(iTransitionID);
165 
166  br_serializer.serialize(value);
167  std::cerr << uniqueID << " process: end beginRun " << value.size() << std::endl;
168 
169  break;
170  }
172  std::cerr << uniqueID << " process: start beginLumi " << std::endl;
173  auto value = harness.getBeginLumiValue(iTransitionID);
174 
175  bl_serializer.serialize(value);
176  std::cerr << uniqueID << " process: end beginLumi " << value.size() << std::endl;
177 
178  break;
179  }
180  case edm::Transition::Event: {
181  std::cerr << uniqueID << " process: integrating " << counter << std::endl;
182  auto value = harness.getEventValue();
183 
184  std::cerr << uniqueID << " process: integrated " << counter << std::endl;
185 
186  serializer.serialize(value);
187  std::cerr << uniqueID << " process: " << value.size() << " " << counter << std::endl;
188  //usleep(10000000);
189  break;
190  }
192  std::cerr << uniqueID << " process: start endLumi " << std::endl;
193  auto value = harness.getEndLumiValue();
194 
195  el_serializer.serialize(value);
196  std::cerr << uniqueID << " process: end endLumi " << value.size() << std::endl;
197 
198  break;
199  }
201  std::cerr << uniqueID << " process: start endRun " << std::endl;
202  auto value = harness.getEndRunValue();
203 
204  er_serializer.serialize(value);
205  std::cerr << uniqueID << " process: end endRun " << value.size() << std::endl;
206 
207  break;
208  }
209  default: {
210  assert(false);
211  }
212  }
213  std::cerr << uniqueID << " process: notifying and waiting" << counter << std::endl;
214  });
215  }
216  } catch (std::exception const& iExcept) {
217  std::cerr << "caught exception \n" << iExcept.what() << "\n";
218  return 1;
219  } catch (...) {
220  std::cerr << "caught unknown exception";
221  return 1;
222  }
223  return 0;
224 }
counter
Definition: counter.py:1
cmsBatch.argv
argv
Definition: cmsBatch.py:279
Harness::Harness
Harness(std::string const &iConfig)
Definition: interprocess.cc:30
Harness::getEndRunValue
edmtest::ThingCollection getEndRunValue()
Definition: interprocess.cc:52
mps_fire.i
i
Definition: mps_fire.py:428
dir2webdir.argc
argc
Definition: dir2webdir.py:39
edm
HLT enums.
Definition: AlignableModifier.h:19
edm::Transition::Event
TestProcessor
gather_cfg.cout
cout
Definition: gather_cfg.py:144
cms::cuda::assert
assert(be >=bs)
edm::shared_memory::WorkerMonitorThread
Definition: WorkerMonitorThread.h:32
main
int main(int argc, char *argv[])
Definition: interprocess.cc:61
edm::shared_memory::ROOTSerializer
Definition: ROOTSerializer.h:31
Harness::tester_
edm::test::TestProcessor tester_
Definition: interprocess.cc:58
Harness
Definition: interprocess.cc:28
relativeConstraints.error
error
Definition: relativeConstraints.py:53
CMS_SA_ALLOW
#define CMS_SA_ALLOW
Definition: thread_safety_macros.h:5
edm::shared_memory::WorkerChannel::fromWorkerBufferInfo
BufferInfo * fromWorkerBufferInfo()
This can be used with WriteBuffer to keep Controller and Worker in sync.
Definition: WorkerChannel.h:53
TestProcessor.h
edm::shared_memory::WriteBuffer
Definition: WriteBuffer.h:37
edm::shared_memory::WorkerChannel::accessLock
boost::interprocess::scoped_lock< boost::interprocess::named_mutex > * accessLock()
the lock is made accessible so that the WorkerMonitorThread can be used to unlock it in the event of ...
Definition: WorkerChannel.h:48
edm::shared_memory::WorkerMonitorThread::setupSignalHandling
void setupSignalHandling()
Sets the unix signal handler which communicates with the thread.
Definition: WorkerMonitorThread.cc:102
test
Definition: SmallWORMDict.h:13
edm::shared_memory::WorkerMonitorThread::startThread
void startThread()
Definition: WorkerMonitorThread.cc:78
kUniqueIDCommandOpt
static char const *const kUniqueIDCommandOpt
Definition: interprocess.cc:21
edm::shared_memory::WorkerMonitorThread::setAction
void setAction(std::function< void()> iFunc)
Definition: WorkerMonitorThread.h:48
kMemoryNameOpt
static char const *const kMemoryNameOpt
Definition: interprocess.cc:18
edm::Transition::BeginLuminosityBlock
Harness::getBeginLumiValue
edmtest::ThingCollection getBeginLumiValue(unsigned int iLumi)
Definition: interprocess.cc:37
kUniqueIDOpt
static char const *const kUniqueIDOpt
Definition: interprocess.cc:20
cppFunctionSkipper.exception
exception
Definition: cppFunctionSkipper.py:10
edm::Transition
Transition
Definition: Transition.h:12
AlCaHLTBitMon_ParallelJobs.p
def p
Definition: AlCaHLTBitMon_ParallelJobs.py:153
ctppsRawToDigi_cff.configuration
configuration
Definition: ctppsRawToDigi_cff.py:11
Harness::getEventValue
edmtest::ThingCollection getEventValue()
Definition: interprocess.cc:42
WorkerChannel.h
value
Definition: value.py:1
counter
static std::atomic< unsigned int > counter
Definition: SharedResourceNames.cc:18
edm::test::TestProcessor
Definition: TestProcessor.h:159
Harness::getBeginRunValue
edmtest::ThingCollection getBeginRunValue(unsigned int iRun)
Definition: interprocess.cc:32
edm::shared_memory
Definition: buffer_names.h:27
AlCaHLTBitMon_QueryRunRegistry.string
string string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
kHelpOpt
static char const *const kHelpOpt
Definition: interprocess.cc:22
edm::shared_memory::WorkerChannel::handleTransitions
void handleTransitions(F &&iF)
Definition: WorkerChannel.h:65
Harness::getEndLumiValue
edmtest::ThingCollection getEndLumiValue()
Definition: interprocess.cc:47
edm::shared_memory::WorkerChannel
Definition: WorkerChannel.h:35
kHelpCommandOpt
static char const *const kHelpCommandOpt
Definition: interprocess.cc:23
submitPVResolutionJobs.desc
string desc
Definition: submitPVResolutionJobs.py:251
edm::Transition::EndLuminosityBlock
writedatasetfile.run
run
Definition: writedatasetfile.py:27
edm::Transition::BeginRun
AlcaSiPixelAliHarvester0T_cff.options
options
Definition: AlcaSiPixelAliHarvester0T_cff.py:42
WriteBuffer.h
WorkerMonitorThread.h
kMemoryNameCommandOpt
static char const *const kMemoryNameCommandOpt
Definition: interprocess.cc:19
Config
Definition: Config.py:1
c
auto & c
Definition: CAHitNtupletGeneratorKernelsImpl.h:56
edm::Transition::EndRun
edm::shared_memory::WorkerChannel::workerSetupDone
void workerSetupDone()
Matches the ControllerChannel::setupWorker call.
Definition: WorkerChannel.h:56
EcnaPython_AdcPeg12_S1_10_R170298_1_0_150_Dee0.cerr
cerr
Definition: EcnaPython_AdcPeg12_S1_10_R170298_1_0_150_Dee0.py:8
lumi
Definition: LumiSectionData.h:20
ROOTSerializer.h