CMS 3D CMS Logo

TestInterProcessRandomProd.cc
Go to the documentation of this file.
5 
6 #include "DataFormats/TestObjects/interface/ToyProducts.h"
7 #include "DataFormats/TestObjects/interface/ThingCollection.h"
9 
12 
13 #include <cstdio>
14 #include <iostream>
15 
21 
22 #include "CLHEP/Random/RandomEngine.h"
23 #include "CLHEP/Random/engineIDulong.h"
24 #include "CLHEP/Random/RanecuEngine.h"
25 
26 using namespace edm::shared_memory;
27 namespace testinter {
28 
29  using ReturnedType = std::pair<edmtest::IntProduct, edm::RandomNumberGeneratorState>;
30 
31  struct StreamCache {
32  StreamCache(const std::string& iConfig, int id)
33  : id_{id},
34  channel_("testProd", id_, 60),
35  readBuffer_{channel_.sharedMemoryName(), channel_.fromWorkerBufferInfo()},
36  writeBuffer_{std::string("Rand") + channel_.sharedMemoryName(), channel_.toWorkerBufferInfo()},
37  deserializer_{readBuffer_},
38  bl_deserializer_{readBuffer_},
39  randSerializer_{writeBuffer_} {
40  //make sure output is flushed before popen does any writing
41  fflush(stdout);
42  fflush(stderr);
43 
44  channel_.setupWorker([&]() {
45  using namespace std::string_literals;
46  std::cout << id_ << " starting external process" << std::endl;
47  pipe_ = popen(("cmsTestInterProcessRandom "s + channel_.sharedMemoryName() + " " + channel_.uniqueID()).c_str(),
48  "w");
49 
50  if (nullptr == pipe_) {
51  abort();
52  }
53 
54  {
55  auto nlines = std::to_string(std::count(iConfig.begin(), iConfig.end(), '\n'));
56  auto result = fwrite(nlines.data(), sizeof(char), nlines.size(), pipe_);
57  assert(result == nlines.size());
58  result = fwrite(iConfig.data(), sizeof(char), iConfig.size(), pipe_);
59  assert(result == iConfig.size());
60  fflush(pipe_);
61  }
62  });
63  }
64 
65  template <typename SERIAL>
66  auto doTransition(SERIAL& iDeserializer, edm::Transition iTrans, unsigned long long iTransitionID)
67  -> decltype(iDeserializer.deserialize()) {
68  decltype(iDeserializer.deserialize()) value;
69  if (not channel_.doTransition(
70  [&value, this]() {
71  value = deserializer_.deserialize();
72  std::cout << id_ << " from shared memory " << value.first.value << std::endl;
73  },
74  iTrans,
75  iTransitionID)) {
76  std::cout << id_ << " FAILED waiting for external process" << std::endl;
77  externalFailed_ = true;
79  }
80  return value;
81  }
82  edmtest::IntProduct produce(unsigned long long iTransitionID, edm::StreamID iStream) {
84  auto& engine = gen->getEngine(iStream);
85  edm::RandomNumberGeneratorState state{engine.put(), engine.getSeed()};
86  randSerializer_.serialize(state);
87  auto v = doTransition(deserializer_, edm::Transition::Event, iTransitionID);
88  if (v.second.state_[0] != CLHEP::engineIDulong<CLHEP::RanecuEngine>()) {
89  engine.setSeed(v.second.seed_, 0);
90  }
91  engine.get(v.second.state_);
92  return v.first;
93  }
94 
96  unsigned long long iTransitionID,
99  //NOTE: root serialize requires a `void*` not a `void const*` even though it doesn't modify the object
100  randSerializer_.serialize(const_cast<edm::RandomNumberGeneratorState&>(iState));
101  return doTransition(bl_deserializer_, edm::Transition::BeginLuminosityBlock, iTransitionID);
102  }
103 
105  channel_.stopWorker();
106  pclose(pipe_);
107  }
108 
109  private:
111  auto pid = getpid();
112  iBase += std::to_string(pid);
113  iBase += "_";
114  iBase += std::to_string(id_);
115 
116  return iBase;
117  }
118 
119  int id_;
120  FILE* pipe_;
121  ControllerChannel channel_;
122  ReadBuffer readBuffer_;
124 
126  TCDeserializer deserializer_;
127  TCDeserializer bl_deserializer_;
130 
131  bool externalFailed_ = false;
132  };
133 
134 } // namespace testinter
135 
137  : public edm::global::EDProducer<edm::StreamCache<testinter::StreamCache>,
138  edm::LuminosityBlockCache<edm::RandomNumberGeneratorState>,
139  edm::BeginLuminosityBlockProducer> {
140 public:
142 
143  std::unique_ptr<testinter::StreamCache> beginStream(edm::StreamID) const final;
144  void produce(edm::StreamID, edm::Event&, edm::EventSetup const&) const final;
145 
146  void streamBeginRun(edm::StreamID, edm::Run const&, edm::EventSetup const&) const final {}
147  void streamEndRun(edm::StreamID, edm::Run const&, edm::EventSetup const&) const final {}
148 
149  std::shared_ptr<edm::RandomNumberGeneratorState> globalBeginLuminosityBlock(edm::LuminosityBlock const&,
150  edm::EventSetup const&) const final;
152 
153  void globalBeginLuminosityBlockProduce(edm::LuminosityBlock&, edm::EventSetup const&) const final;
154  void streamBeginLuminosityBlock(edm::StreamID, edm::LuminosityBlock const&, edm::EventSetup const&) const final;
155  void streamEndLuminosityBlock(edm::StreamID, edm::LuminosityBlock const&, edm::EventSetup const&) const final;
156 
157 private:
160 
162 
163  //This is set at beginStream and used for globalBeginRun
164  //The framework guarantees that non of those can happen concurrently
165  CMS_THREAD_SAFE mutable testinter::StreamCache* stream0Cache_ = nullptr;
166  //A stream which has finished processing the last lumi is used for the
167  // call to globalBeginLuminosityBlockProduce
168  mutable std::atomic<testinter::StreamCache*> availableForBeginLumi_;
169  //Streams all see the lumis in the same order, we want to be sure to pick a stream cache
170  // to use at globalBeginLumi which just finished the most recent lumi and not a previous one
171  mutable std::atomic<unsigned int> lastLumiIndex_ = 0;
172 };
173 
175  : token_{produces<edmtest::IntProduct>()},
176  blToken_{produces<edmtest::IntProduct, edm::Transition::BeginLuminosityBlock>("lumi")},
177  config_{iPSet.getUntrackedParameter<std::string>("@python_config")} {}
178 
179 std::unique_ptr<testinter::StreamCache> TestInterProcessRandomProd::beginStream(edm::StreamID iID) const {
180  auto const label = moduleDescription().moduleLabel();
181 
182  using namespace std::string_literals;
183 
184  std::string config = R"_(from FWCore.TestProcessor.TestProcess import *
185 process = TestProcess()
186 )_";
187  config += "process."s + label + "=" + config_ + "\n";
188  config += "process.moduleToTest(process."s + label + ")\n";
189  config += R"_(
190 process.add_(cms.Service("InitRootHandlers", UnloadRootSigHandler=cms.untracked.bool(True)))
191  )_";
192 
193  auto cache = std::make_unique<testinter::StreamCache>(config, iID.value());
194  if (iID.value() == 0) {
195  stream0Cache_ = cache.get();
196 
198  }
199 
200  return cache;
201 }
202 
204  auto value = streamCache(iID)->produce(iEvent.id().event(), iID);
205  iEvent.emplace(token_, value);
206 }
207 
208 std::shared_ptr<edm::RandomNumberGeneratorState> TestInterProcessRandomProd::globalBeginLuminosityBlock(
209  edm::LuminosityBlock const& iLumi, edm::EventSetup const&) const {
211  auto& engine = gen->getEngine(iLumi.index());
212  return std::make_shared<edm::RandomNumberGeneratorState>(engine.put(), engine.getSeed());
213 }
214 
216  edm::EventSetup const&) const {
217  while (not availableForBeginLumi_.load()) {
218  }
219 
220  auto v = availableForBeginLumi_.load()->beginLumiProduce(
221  *luminosityBlockCache(iLuminosityBlock.index()), iLuminosityBlock.luminosityBlock(), iLuminosityBlock.index());
223  auto& engine = gen->getEngine(iLuminosityBlock.index());
224  if (v.second.state_[0] != CLHEP::engineIDulong<CLHEP::RanecuEngine>()) {
225  engine.setSeed(v.second.seed_, 0);
226  }
227  engine.get(v.second.state_);
228 
229  iLuminosityBlock.emplace(blToken_, v.first);
230 
231  lastLumiIndex_.store(iLuminosityBlock.index());
232 }
233 
235  edm::LuminosityBlock const& iLuminosityBlock,
236  edm::EventSetup const&) const {
237  auto cache = streamCache(iID);
238  if (cache != availableForBeginLumi_.load()) {
239  (void)cache->beginLumiProduce(
240  *luminosityBlockCache(iLuminosityBlock.index()), iLuminosityBlock.luminosityBlock(), iLuminosityBlock.index());
241  } else {
242  availableForBeginLumi_ = nullptr;
243  }
244 }
245 
247  edm::LuminosityBlock const& iLuminosityBlock,
248  edm::EventSetup const&) const {
249  if (lastLumiIndex_ == iLuminosityBlock.index()) {
250  testinter::StreamCache* expected = nullptr;
251 
252  availableForBeginLumi_.compare_exchange_strong(expected, streamCache(iID));
253  }
254 }
255 
void streamEndRun(edm::StreamID, edm::Run const &, edm::EventSetup const &) const final
testinter::StreamCache * stream0Cache_
void produce(edm::StreamID, edm::Event &, edm::EventSetup const &) const final
std::atomic< unsigned int > lastLumiIndex_
void globalBeginLuminosityBlockProduce(edm::LuminosityBlock &, edm::EventSetup const &) const final
std::atomic< testinter::StreamCache * > availableForBeginLumi_
edm::EDPutTokenT< edmtest::IntProduct > const token_
void globalEndLuminosityBlock(edm::LuminosityBlock const &, edm::EventSetup const &) const final
void streamBeginLuminosityBlock(edm::StreamID, edm::LuminosityBlock const &, edm::EventSetup const &) const final
dictionary config
Read in AllInOne config in JSON format.
Definition: DMR_cfg.py:21
TestInterProcessRandomProd(edm::ParameterSet const &)
Definition: config.py:1
assert(be >=bs)
std::shared_ptr< edm::RandomNumberGeneratorState > globalBeginLuminosityBlock(edm::LuminosityBlock const &, edm::EventSetup const &) const final
ReturnedType beginLumiProduce(edm::RandomNumberGeneratorState const &iState, unsigned long long iTransitionID, edm::LuminosityBlockIndex iLumi)
void streamBeginRun(edm::StreamID, edm::Run const &, edm::EventSetup const &) const final
static std::string to_string(const XMLCh *ch)
TEMPL(T2) struct Divides void
Definition: Factorize.h:24
auto doTransition(SERIAL &iDeserializer, edm::Transition iTrans, unsigned long long iTransitionID) -> decltype(iDeserializer.deserialize())
edmtest::IntProduct produce(unsigned long long iTransitionID, edm::StreamID iStream)
char const * label
int iEvent
Definition: GenABIO.cc:224
ModuleDescription const & moduleDescription() const
Transition
Definition: Transition.h:12
#define CMS_THREAD_SAFE
#define DEFINE_FWK_MODULE(type)
Definition: MakerMacros.h:16
std::unique_ptr< testinter::StreamCache > beginStream(edm::StreamID) const final
Definition: value.py:1
def gen(fragment, howMuch)
Production test section ####.
edm::EDPutTokenT< edmtest::IntProduct > const blToken_
void emplace(EDPutTokenT< PROD > token, Args &&... args)
puts a new product
std::string unique_name(std::string iBase)
static const char pipe_[]
LuminosityBlockIndex index() const
std::pair< edmtest::IntProduct, edm::RandomNumberGeneratorState > ReturnedType
StreamCache(const std::string &iConfig, int id)
def cache(function)
Definition: utilities.py:3
unsigned int value() const
Definition: StreamID.h:43
LuminosityBlockNumber_t luminosityBlock() const
std::string const & moduleLabel() const
void streamEndLuminosityBlock(edm::StreamID, edm::LuminosityBlock const &, edm::EventSetup const &) const final
Definition: Run.h:45