CMS 3D CMS Logo

TestInterProcessRandomProd.cc
Go to the documentation of this file.
5 
6 #include "DataFormats/TestObjects/interface/ToyProducts.h"
7 #include "DataFormats/TestObjects/interface/ThingCollection.h"
9 
12 
13 #include <cstdio>
14 #include <iostream>
15 
21 
22 #include "CLHEP/Random/RandomEngine.h"
23 #include "CLHEP/Random/engineIDulong.h"
24 #include "CLHEP/Random/RanecuEngine.h"
25 
26 using namespace edm::shared_memory;
27 namespace testinter {
28 
29  using ReturnedType = std::pair<edmtest::IntProduct, edm::RandomNumberGeneratorState>;
30 
31  struct StreamCache {
32  StreamCache(const std::string& iConfig, int id)
33  : id_{id},
34  channel_("testProd", id_, 60),
35  readBuffer_{channel_.sharedMemoryName(), channel_.fromWorkerBufferInfo()},
36  writeBuffer_{std::string("Rand") + channel_.sharedMemoryName(), channel_.toWorkerBufferInfo()},
37  deserializer_{readBuffer_},
38  bl_deserializer_{readBuffer_},
39  randSerializer_{writeBuffer_} {
40  //make sure output is flushed before popen does any writing
41  fflush(stdout);
42  fflush(stderr);
43 
44  channel_.setupWorker([&]() {
45  using namespace std::string_literals;
46  std::cout << id_ << " starting external process" << std::endl;
47  pipe_ = popen(("cmsTestInterProcessRandom "s + channel_.sharedMemoryName() + " " + channel_.uniqueID()).c_str(),
48  "w");
49 
50  if (nullptr == pipe_) {
51  abort();
52  }
53 
54  {
55  auto nlines = std::to_string(std::count(iConfig.begin(), iConfig.end(), '\n'));
56  auto result = fwrite(nlines.data(), sizeof(char), nlines.size(), pipe_);
57  assert(result == nlines.size());
58  result = fwrite(iConfig.data(), sizeof(char), iConfig.size(), pipe_);
59  assert(result == iConfig.size());
60  fflush(pipe_);
61  }
62  });
63  }
64 
65  template <typename SERIAL>
66  auto doTransition(SERIAL& iDeserializer,
67  edm::Transition iTrans,
68  unsigned long long iTransitionID) -> decltype(iDeserializer.deserialize()) {
69  decltype(iDeserializer.deserialize()) value;
70  if (not channel_.doTransition(
71  [&value, this]() {
72  value = deserializer_.deserialize();
73  std::cout << id_ << " from shared memory " << value.first.value << std::endl;
74  },
75  iTrans,
76  iTransitionID)) {
77  std::cout << id_ << " FAILED waiting for external process" << std::endl;
78  externalFailed_ = true;
80  }
81  return value;
82  }
83  edmtest::IntProduct produce(unsigned long long iTransitionID, edm::StreamID iStream) {
85  auto& engine = gen->getEngine(iStream);
86  edm::RandomNumberGeneratorState state{engine.put(), engine.getSeed()};
87  randSerializer_.serialize(state);
88  auto v = doTransition(deserializer_, edm::Transition::Event, iTransitionID);
89  if (v.second.state_[0] != CLHEP::engineIDulong<CLHEP::RanecuEngine>()) {
90  engine.setSeed(v.second.seed_, 0);
91  }
92  engine.get(v.second.state_);
93  return v.first;
94  }
95 
97  unsigned long long iTransitionID,
100  //NOTE: root serialize requires a `void*` not a `void const*` even though it doesn't modify the object
101  randSerializer_.serialize(const_cast<edm::RandomNumberGeneratorState&>(iState));
102  return doTransition(bl_deserializer_, edm::Transition::BeginLuminosityBlock, iTransitionID);
103  }
104 
106  channel_.stopWorker();
107  pclose(pipe_);
108  }
109 
110  private:
112  auto pid = getpid();
113  iBase += std::to_string(pid);
114  iBase += "_";
115  iBase += std::to_string(id_);
116 
117  return iBase;
118  }
119 
120  int id_;
121  FILE* pipe_;
122  ControllerChannel channel_;
123  ReadBuffer readBuffer_;
125 
127  TCDeserializer deserializer_;
128  TCDeserializer bl_deserializer_;
131 
132  bool externalFailed_ = false;
133  };
134 
135 } // namespace testinter
136 
138  : public edm::global::EDProducer<edm::StreamCache<testinter::StreamCache>,
139  edm::LuminosityBlockCache<edm::RandomNumberGeneratorState>,
140  edm::BeginLuminosityBlockProducer> {
141 public:
143 
144  std::unique_ptr<testinter::StreamCache> beginStream(edm::StreamID) const final;
145  void produce(edm::StreamID, edm::Event&, edm::EventSetup const&) const final;
146 
147  void streamBeginRun(edm::StreamID, edm::Run const&, edm::EventSetup const&) const final {}
148  void streamEndRun(edm::StreamID, edm::Run const&, edm::EventSetup const&) const final {}
149 
150  std::shared_ptr<edm::RandomNumberGeneratorState> globalBeginLuminosityBlock(edm::LuminosityBlock const&,
151  edm::EventSetup const&) const final;
153 
154  void globalBeginLuminosityBlockProduce(edm::LuminosityBlock&, edm::EventSetup const&) const final;
155  void streamBeginLuminosityBlock(edm::StreamID, edm::LuminosityBlock const&, edm::EventSetup const&) const final;
156  void streamEndLuminosityBlock(edm::StreamID, edm::LuminosityBlock const&, edm::EventSetup const&) const final;
157 
158 private:
161 
163 
164  //This is set at beginStream and used for globalBeginRun
165  //The framework guarantees that non of those can happen concurrently
166  CMS_THREAD_SAFE mutable testinter::StreamCache* stream0Cache_ = nullptr;
167  //A stream which has finished processing the last lumi is used for the
168  // call to globalBeginLuminosityBlockProduce
169  mutable std::atomic<testinter::StreamCache*> availableForBeginLumi_;
170  //Streams all see the lumis in the same order, we want to be sure to pick a stream cache
171  // to use at globalBeginLumi which just finished the most recent lumi and not a previous one
172  mutable std::atomic<unsigned int> lastLumiIndex_ = 0;
173 };
174 
176  : token_{produces<edmtest::IntProduct>()},
177  blToken_{produces<edmtest::IntProduct, edm::Transition::BeginLuminosityBlock>("lumi")},
178  config_{iPSet.getUntrackedParameter<std::string>("@python_config")} {}
179 
180 std::unique_ptr<testinter::StreamCache> TestInterProcessRandomProd::beginStream(edm::StreamID iID) const {
181  auto const label = moduleDescription().moduleLabel();
182 
183  using namespace std::string_literals;
184 
185  std::string config = R"_(from FWCore.TestProcessor.TestProcess import *
186 process = TestProcess()
187 )_";
188  config += "process."s + label + "=" + config_ + "\n";
189  config += "process.moduleToTest(process."s + label + ")\n";
190  config += R"_(
191 process.add_(cms.Service("InitRootHandlers", UnloadRootSigHandler=cms.untracked.bool(True)))
192  )_";
193 
194  auto cache = std::make_unique<testinter::StreamCache>(config, iID.value());
195  if (iID.value() == 0) {
196  stream0Cache_ = cache.get();
197 
199  }
200 
201  return cache;
202 }
203 
205  auto value = streamCache(iID)->produce(iEvent.id().event(), iID);
206  iEvent.emplace(token_, value);
207 }
208 
209 std::shared_ptr<edm::RandomNumberGeneratorState> TestInterProcessRandomProd::globalBeginLuminosityBlock(
210  edm::LuminosityBlock const& iLumi, edm::EventSetup const&) const {
212  auto& engine = gen->getEngine(iLumi.index());
213  return std::make_shared<edm::RandomNumberGeneratorState>(engine.put(), engine.getSeed());
214 }
215 
217  edm::EventSetup const&) const {
218  while (not availableForBeginLumi_.load()) {
219  }
220 
221  auto v = availableForBeginLumi_.load()->beginLumiProduce(
222  *luminosityBlockCache(iLuminosityBlock.index()), iLuminosityBlock.luminosityBlock(), iLuminosityBlock.index());
224  auto& engine = gen->getEngine(iLuminosityBlock.index());
225  if (v.second.state_[0] != CLHEP::engineIDulong<CLHEP::RanecuEngine>()) {
226  engine.setSeed(v.second.seed_, 0);
227  }
228  engine.get(v.second.state_);
229 
230  iLuminosityBlock.emplace(blToken_, v.first);
231 
232  lastLumiIndex_.store(iLuminosityBlock.index());
233 }
234 
236  edm::LuminosityBlock const& iLuminosityBlock,
237  edm::EventSetup const&) const {
238  auto cache = streamCache(iID);
239  if (cache != availableForBeginLumi_.load()) {
240  (void)cache->beginLumiProduce(
241  *luminosityBlockCache(iLuminosityBlock.index()), iLuminosityBlock.luminosityBlock(), iLuminosityBlock.index());
242  } else {
243  availableForBeginLumi_ = nullptr;
244  }
245 }
246 
248  edm::LuminosityBlock const& iLuminosityBlock,
249  edm::EventSetup const&) const {
250  if (lastLumiIndex_ == iLuminosityBlock.index()) {
251  testinter::StreamCache* expected = nullptr;
252 
253  availableForBeginLumi_.compare_exchange_strong(expected, streamCache(iID));
254  }
255 }
256 
void streamEndRun(edm::StreamID, edm::Run const &, edm::EventSetup const &) const final
testinter::StreamCache * stream0Cache_
void produce(edm::StreamID, edm::Event &, edm::EventSetup const &) const final
std::atomic< unsigned int > lastLumiIndex_
void globalBeginLuminosityBlockProduce(edm::LuminosityBlock &, edm::EventSetup const &) const final
std::atomic< testinter::StreamCache * > availableForBeginLumi_
edm::EDPutTokenT< edmtest::IntProduct > const token_
void globalEndLuminosityBlock(edm::LuminosityBlock const &, edm::EventSetup const &) const final
void streamBeginLuminosityBlock(edm::StreamID, edm::LuminosityBlock const &, edm::EventSetup const &) const final
TestInterProcessRandomProd(edm::ParameterSet const &)
Definition: config.py:1
assert(be >=bs)
std::shared_ptr< edm::RandomNumberGeneratorState > globalBeginLuminosityBlock(edm::LuminosityBlock const &, edm::EventSetup const &) const final
ReturnedType beginLumiProduce(edm::RandomNumberGeneratorState const &iState, unsigned long long iTransitionID, edm::LuminosityBlockIndex iLumi)
void streamBeginRun(edm::StreamID, edm::Run const &, edm::EventSetup const &) const final
static std::string to_string(const XMLCh *ch)
TEMPL(T2) struct Divides void
Definition: Factorize.h:24
auto doTransition(SERIAL &iDeserializer, edm::Transition iTrans, unsigned long long iTransitionID) -> decltype(iDeserializer.deserialize())
edmtest::IntProduct produce(unsigned long long iTransitionID, edm::StreamID iStream)
char const * label
int iEvent
Definition: GenABIO.cc:224
ModuleDescription const & moduleDescription() const
Transition
Definition: Transition.h:12
#define CMS_THREAD_SAFE
#define DEFINE_FWK_MODULE(type)
Definition: MakerMacros.h:16
std::unique_ptr< testinter::StreamCache > beginStream(edm::StreamID) const final
Definition: value.py:1
def gen(fragment, howMuch)
Tau Embedding 2016 HIPM ###.
edm::EDPutTokenT< edmtest::IntProduct > const blToken_
void emplace(EDPutTokenT< PROD > token, Args &&... args)
puts a new product
std::string unique_name(std::string iBase)
static const char pipe_[]
LuminosityBlockIndex index() const
std::pair< edmtest::IntProduct, edm::RandomNumberGeneratorState > ReturnedType
StreamCache(const std::string &iConfig, int id)
def cache(function)
Definition: utilities.py:3
dictionary config
Read in AllInOne config in JSON format.
Definition: DiMuonV_cfg.py:30
unsigned int value() const
Definition: StreamID.h:43
LuminosityBlockNumber_t luminosityBlock() const
std::string const & moduleLabel() const
void streamEndLuminosityBlock(edm::StreamID, edm::LuminosityBlock const &, edm::EventSetup const &) const final
Definition: Run.h:45