CMS 3D CMS Logo

TestInterProcessProd.cc
Go to the documentation of this file.
5 
6 #include "DataFormats/TestObjects/interface/ToyProducts.h"
7 #include "DataFormats/TestObjects/interface/ThingCollection.h"
8 
9 #include <cstdio>
10 #include <iostream>
11 
15 
16 using namespace edm::shared_memory;
17 namespace testinter {
18 
19  struct StreamCache {
20  StreamCache(const std::string& iConfig, int id)
21  : id_{id},
22  channel_("testProd", id_, 60),
23  readBuffer_{channel_.sharedMemoryName(), channel_.fromWorkerBufferInfo()},
24  deserializer_{readBuffer_},
25  br_deserializer_{readBuffer_},
26  er_deserializer_{readBuffer_},
27  bl_deserializer_{readBuffer_},
28  el_deserializer_(readBuffer_) {
29  //make sure output is flushed before popen does any writing
30  fflush(stdout);
31  fflush(stderr);
32 
33  channel_.setupWorker([&]() {
34  using namespace std::string_literals;
35  std::cout << id_ << " starting external process" << std::endl;
36  pipe_ = popen(("cmsTestInterProcess "s + channel_.sharedMemoryName() + " " + channel_.uniqueID()).c_str(), "w");
37 
38  if (nullptr == pipe_) {
39  abort();
40  }
41 
42  {
43  auto nlines = std::to_string(std::count(iConfig.begin(), iConfig.end(), '\n'));
44  auto result = fwrite(nlines.data(), sizeof(char), nlines.size(), pipe_);
45  assert(result == nlines.size());
46  result = fwrite(iConfig.data(), sizeof(char), iConfig.size(), pipe_);
47  assert(result == iConfig.size());
48  fflush(pipe_);
49  }
50  });
51  }
52 
53  template <typename SERIAL>
54  auto doTransition(SERIAL& iDeserializer,
55  edm::Transition iTrans,
56  unsigned long long iTransitionID) -> decltype(iDeserializer.deserialize()) {
57  decltype(iDeserializer.deserialize()) value;
58  if (not channel_.doTransition(
59  [&value, this]() {
60  value = deserializer_.deserialize();
61  std::cout << id_ << " from shared memory " << value.size() << std::endl;
62  },
63  iTrans,
64  iTransitionID)) {
65  std::cout << id_ << " FAILED waiting for external process" << std::endl;
66  externalFailed_ = true;
68  }
69  return value;
70  }
71  edmtest::ThingCollection produce(unsigned long long iTransitionID) {
72  return doTransition(deserializer_, edm::Transition::Event, iTransitionID);
73  }
74 
75  edmtest::ThingCollection beginRunProduce(unsigned long long iTransitionID) {
76  return doTransition(br_deserializer_, edm::Transition::BeginRun, iTransitionID);
77  }
78 
79  edmtest::ThingCollection endRunProduce(unsigned long long iTransitionID) {
80  if (not externalFailed_) {
81  return doTransition(er_deserializer_, edm::Transition::EndRun, iTransitionID);
82  }
83  return edmtest::ThingCollection();
84  }
85 
86  edmtest::ThingCollection beginLumiProduce(unsigned long long iTransitionID) {
87  return doTransition(bl_deserializer_, edm::Transition::BeginLuminosityBlock, iTransitionID);
88  }
89 
90  edmtest::ThingCollection endLumiProduce(unsigned long long iTransitionID) {
91  if (not externalFailed_) {
92  return doTransition(el_deserializer_, edm::Transition::EndLuminosityBlock, iTransitionID);
93  }
94  return edmtest::ThingCollection();
95  }
96 
98  channel_.stopWorker();
99  pclose(pipe_);
100  }
101 
102  private:
104  auto pid = getpid();
105  iBase += std::to_string(pid);
106  iBase += "_";
107  iBase += std::to_string(id_);
108 
109  return iBase;
110  }
111 
112  int id_;
113  FILE* pipe_;
116 
123  bool externalFailed_ = false;
124  };
125 
126  struct RunCache {
127  //Only stream 0 sets this at stream end Run and it is read at global end run
128  // the framework guarantees those calls can not happen simultaneously
129  CMS_THREAD_SAFE mutable edmtest::ThingCollection thingCollection_;
130  };
131  struct LumiCache {
132  //Only stream 0 sets this at stream end Lumi and it is read at global end Lumi
133  // the framework guarantees those calls can not happen simultaneously
134  CMS_THREAD_SAFE mutable edmtest::ThingCollection thingCollection_;
135  };
136 } // namespace testinter
137 
138 class TestInterProcessProd : public edm::global::EDProducer<edm::StreamCache<testinter::StreamCache>,
139  edm::RunCache<testinter::RunCache>,
140  edm::BeginRunProducer,
141  edm::EndRunProducer,
142  edm::LuminosityBlockCache<testinter::LumiCache>,
143  edm::BeginLuminosityBlockProducer,
144  edm::EndLuminosityBlockProducer> {
145 public:
147 
148  std::unique_ptr<testinter::StreamCache> beginStream(edm::StreamID) const final;
149  void produce(edm::StreamID, edm::Event&, edm::EventSetup const&) const final;
150 
151  void globalBeginRunProduce(edm::Run&, edm::EventSetup const&) const final;
152  std::shared_ptr<testinter::RunCache> globalBeginRun(edm::Run const&, edm::EventSetup const&) const final;
153  void streamBeginRun(edm::StreamID, edm::Run const&, edm::EventSetup const&) const final;
154  void streamEndRun(edm::StreamID, edm::Run const&, edm::EventSetup const&) const final;
155  void globalEndRun(edm::Run const&, edm::EventSetup const&) const final {}
156  void globalEndRunProduce(edm::Run&, edm::EventSetup const&) const final;
157 
158  void globalBeginLuminosityBlockProduce(edm::LuminosityBlock&, edm::EventSetup const&) const final;
159  std::shared_ptr<testinter::LumiCache> globalBeginLuminosityBlock(edm::LuminosityBlock const&,
160  edm::EventSetup const&) const final;
161  void streamBeginLuminosityBlock(edm::StreamID, edm::LuminosityBlock const&, edm::EventSetup const&) const final;
162  void streamEndLuminosityBlock(edm::StreamID, edm::LuminosityBlock const&, edm::EventSetup const&) const final;
164  void globalEndLuminosityBlockProduce(edm::LuminosityBlock&, edm::EventSetup const&) const final;
165 
166 private:
172 
174 
175  //This is set at beginStream and used for globalBeginRun
176  //The framework guarantees that non of those can happen concurrently
177  CMS_THREAD_SAFE mutable testinter::StreamCache* stream0Cache_ = nullptr;
178  //A stream which has finished processing the last lumi is used for the
179  // call to globalBeginLuminosityBlockProduce
180  mutable std::atomic<testinter::StreamCache*> availableForBeginLumi_;
181  //Streams all see the lumis in the same order, we want to be sure to pick a stream cache
182  // to use at globalBeginLumi which just finished the most recent lumi and not a previous one
183  mutable std::atomic<unsigned int> lastLumiIndex_ = 0;
184 };
185 
187  : token_{produces<edmtest::ThingCollection>()},
188  brToken_{produces<edmtest::ThingCollection, edm::Transition::BeginRun>("beginRun")},
189  erToken_{produces<edmtest::ThingCollection, edm::Transition::EndRun>("endRun")},
190  blToken_{produces<edmtest::ThingCollection, edm::Transition::BeginLuminosityBlock>("beginLumi")},
191  elToken_{produces<edmtest::ThingCollection, edm::Transition::EndLuminosityBlock>("endLumi")},
192  config_{iPSet.getUntrackedParameter<std::string>("@python_config")} {}
193 
194 std::unique_ptr<testinter::StreamCache> TestInterProcessProd::beginStream(edm::StreamID iID) const {
195  auto const label = moduleDescription().moduleLabel();
196 
197  using namespace std::string_literals;
198 
199  std::string config = R"_(from FWCore.TestProcessor.TestProcess import *
200 process = TestProcess()
201 )_";
202  config += "process."s + label + "=" + config_ + "\n";
203  config += "process.moduleToTest(process."s + label + ")\n";
204  config += R"_(
205 process.add_(cms.Service("InitRootHandlers", UnloadRootSigHandler=cms.untracked.bool(True)))
206  )_";
207 
208  auto cache = std::make_unique<testinter::StreamCache>(config, iID.value());
209  if (iID.value() == 0) {
210  stream0Cache_ = cache.get();
211 
213  }
214 
215  return cache;
216 }
217 
219  auto value = streamCache(iID)->produce(iEvent.id().event());
220  iEvent.emplace(token_, value);
221 }
222 
224  auto v = stream0Cache_->beginRunProduce(iRun.run());
225  iRun.emplace(brToken_, v);
226 }
227 std::shared_ptr<testinter::RunCache> TestInterProcessProd::globalBeginRun(edm::Run const&,
228  edm::EventSetup const&) const {
229  return std::make_shared<testinter::RunCache>();
230 }
231 
233  if (iID.value() != 0) {
234  (void)streamCache(iID)->beginRunProduce(iRun.run());
235  }
236 }
238  if (iID.value() == 0) {
239  runCache(iRun.index())->thingCollection_ = streamCache(iID)->endRunProduce(iRun.run());
240  } else {
241  (void)streamCache(iID)->endRunProduce(iRun.run());
242  }
243 }
245  iRun.emplace(erToken_, std::move(runCache(iRun.index())->thingCollection_));
246 }
247 
249  edm::EventSetup const&) const {
250  while (not availableForBeginLumi_.load()) {
251  }
252 
253  auto v = availableForBeginLumi_.load()->beginLumiProduce(iLuminosityBlock.run());
254  iLuminosityBlock.emplace(blToken_, v);
255 
256  lastLumiIndex_.store(iLuminosityBlock.index());
257 }
258 
259 std::shared_ptr<testinter::LumiCache> TestInterProcessProd::globalBeginLuminosityBlock(edm::LuminosityBlock const&,
260  edm::EventSetup const&) const {
261  return std::make_shared<testinter::LumiCache>();
262 }
263 
265  edm::LuminosityBlock const& iLuminosityBlock,
266  edm::EventSetup const&) const {
267  auto cache = streamCache(iID);
268  if (cache != availableForBeginLumi_.load()) {
269  (void)cache->beginLumiProduce(iLuminosityBlock.run());
270  } else {
271  availableForBeginLumi_ = nullptr;
272  }
273 }
274 
276  edm::LuminosityBlock const& iLuminosityBlock,
277  edm::EventSetup const&) const {
278  if (iID.value() == 0) {
279  luminosityBlockCache(iLuminosityBlock.index())->thingCollection_ =
280  streamCache(iID)->endLumiProduce(iLuminosityBlock.run());
281  } else {
282  (void)streamCache(iID)->endLumiProduce(iLuminosityBlock.run());
283  }
284 
285  if (lastLumiIndex_ == iLuminosityBlock.index()) {
286  testinter::StreamCache* expected = nullptr;
287 
288  availableForBeginLumi_.compare_exchange_strong(expected, streamCache(iID));
289  }
290 }
291 
293  edm::EventSetup const&) const {
294  iLuminosityBlock.emplace(elToken_, std::move(luminosityBlockCache(iLuminosityBlock.index())->thingCollection_));
295 }
296 
void globalBeginRunProduce(edm::Run &, edm::EventSetup const &) const final
edmtest::ThingCollection beginRunProduce(unsigned long long iTransitionID)
testinter::StreamCache * stream0Cache_
void streamEndLuminosityBlock(edm::StreamID, edm::LuminosityBlock const &, edm::EventSetup const &) const final
void globalEndRun(edm::Run const &, edm::EventSetup const &) const final
void emplace(EDPutTokenT< PROD > token, Args &&... args)
puts a new product
Definition: Run.h:244
edm::EDPutTokenT< edmtest::ThingCollection > const blToken_
void globalEndLuminosityBlock(edm::LuminosityBlock const &, edm::EventSetup const &) const final
Definition: config.py:1
assert(be >=bs)
static std::string to_string(const XMLCh *ch)
TEMPL(T2) struct Divides void
Definition: Factorize.h:24
edmtest::ThingCollection thingCollection_
auto doTransition(SERIAL &iDeserializer, edm::Transition iTrans, unsigned long long iTransitionID) -> decltype(iDeserializer.deserialize())
char const * label
int iEvent
Definition: GenABIO.cc:224
RunNumber_t run() const
Definition: RunBase.h:40
edmtest::ThingCollection beginLumiProduce(unsigned long long iTransitionID)
edmtest::ThingCollection produce(unsigned long long iTransitionID)
void streamBeginRun(edm::StreamID, edm::Run const &, edm::EventSetup const &) const final
edm::EDPutTokenT< edmtest::ThingCollection > const elToken_
ModuleDescription const & moduleDescription() const
Transition
Definition: Transition.h:12
edm::EDPutTokenT< edmtest::ThingCollection > const token_
#define CMS_THREAD_SAFE
#define DEFINE_FWK_MODULE(type)
Definition: MakerMacros.h:16
Definition: value.py:1
edmtest::ThingCollection thingCollection_
std::atomic< unsigned int > lastLumiIndex_
std::unique_ptr< testinter::StreamCache > beginStream(edm::StreamID) const final
std::atomic< testinter::StreamCache * > availableForBeginLumi_
edmtest::ThingCollection endRunProduce(unsigned long long iTransitionID)
void emplace(EDPutTokenT< PROD > token, Args &&... args)
puts a new product
std::string unique_name(std::string iBase)
static const char pipe_[]
edm::EDPutTokenT< edmtest::ThingCollection > const erToken_
edm::EDPutTokenT< edmtest::ThingCollection > const brToken_
void produce(edm::StreamID, edm::Event &, edm::EventSetup const &) const final
std::shared_ptr< testinter::RunCache > globalBeginRun(edm::Run const &, edm::EventSetup const &) const final
void globalBeginLuminosityBlockProduce(edm::LuminosityBlock &, edm::EventSetup const &) const final
edmtest::ThingCollection endLumiProduce(unsigned long long iTransitionID)
TestInterProcessProd(edm::ParameterSet const &)
LuminosityBlockIndex index() const
std::shared_ptr< testinter::LumiCache > globalBeginLuminosityBlock(edm::LuminosityBlock const &, edm::EventSetup const &) const final
RunIndex index() const
Definition: Run.cc:28
void streamEndRun(edm::StreamID, edm::Run const &, edm::EventSetup const &) const final
void globalEndRunProduce(edm::Run &, edm::EventSetup const &) const final
StreamCache(const std::string &iConfig, int id)
def cache(function)
Definition: utilities.py:3
dictionary config
Read in AllInOne config in JSON format.
Definition: DiMuonV_cfg.py:30
unsigned int value() const
Definition: StreamID.h:43
void globalEndLuminosityBlockProduce(edm::LuminosityBlock &, edm::EventSetup const &) const final
std::string const & moduleLabel() const
def move(src, dest)
Definition: eostools.py:511
Definition: Run.h:45
void streamBeginLuminosityBlock(edm::StreamID, edm::LuminosityBlock const &, edm::EventSetup const &) const final