CMS 3D CMS Logo

AcquireIntProducer.cc
Go to the documentation of this file.
2 #include "DataFormats/TestObjects/interface/ToyProducts.h"
8 #include "WaitingServer.h"
13 
14 #include <memory>
15 #include <unistd.h>
16 #include <vector>
17 
18 namespace edm {
19  class EventSetup;
20 }
21 
22 namespace edmtest {
23  using namespace std::chrono_literals;
24  class AcquireIntProducer : public edm::global::EDProducer<edm::ExternalWork, edm::StreamCache<test_acquire::Cache>> {
25  public:
26  explicit AcquireIntProducer(edm::ParameterSet const& pset);
27  ~AcquireIntProducer() override;
28 
29  std::unique_ptr<test_acquire::Cache> beginStream(edm::StreamID) const override;
30 
31  void acquire(edm::StreamID,
32  edm::Event const&,
33  edm::EventSetup const&,
34  edm::WaitingTaskWithArenaHolder) const override;
35 
36  void produce(edm::StreamID, edm::Event&, edm::EventSetup const&) const override;
37 
38  void endJob() override;
39 
40  private:
41  void preallocate(edm::PreallocationConfiguration const&) override;
42 
43  std::vector<edm::EDGetTokenT<IntProduct>> m_tokens;
45  std::unique_ptr<test_acquire::WaitingServer> m_server;
46  const unsigned int m_numberOfStreamsToAccumulate;
47  const unsigned int m_secondsToWaitForWork;
48  };
49 
51  : m_numberOfStreamsToAccumulate(pset.getUntrackedParameter<unsigned int>("streamsToAccumulate", 8)),
52  m_secondsToWaitForWork(pset.getUntrackedParameter<unsigned int>("secondsToWaitForWork", 1)) {
53  for (auto const& tag : pset.getParameter<std::vector<edm::InputTag>>("tags")) {
54  m_tokens.emplace_back(consumes<IntProduct>(tag));
55  }
56  m_tokenForProduce = consumes<IntProduct>(pset.getParameter<edm::InputTag>("produceTag"));
57  produces<IntProduct>();
58  }
59 
61  if (m_server) {
62  m_server->stop();
63  }
64  }
65 
67  m_server = std::make_unique<test_acquire::WaitingServer>(
69  m_server->start();
70  }
71 
72  std::unique_ptr<test_acquire::Cache> AcquireIntProducer::beginStream(edm::StreamID) const {
73  return std::make_unique<test_acquire::Cache>();
74  }
75 
77  edm::Event const& event,
78  edm::EventSetup const&,
79  edm::WaitingTaskWithArenaHolder holder) const {
80  std::this_thread::sleep_for(1s);
81 
82  test_acquire::Cache* streamCacheData = streamCache(streamID);
83  streamCacheData->retrieved().clear();
84  streamCacheData->processed().clear();
85 
86  for (auto const& token : m_tokens) {
87  streamCacheData->retrieved().push_back(event.get(token).value);
88  }
89  m_server->requestValuesAsync(
90  streamID.value(), &streamCacheData->retrieved(), &streamCacheData->processed(), holder);
91  }
92 
94  std::this_thread::sleep_for(1s);
95 
96  int sum = 0;
97  for (auto v : streamCache(streamID)->processed()) {
98  sum += v;
99  }
100  event.put(std::make_unique<IntProduct>(sum));
101 
102  // This part is here only for the Parentage test.
104  }
105 
107  if (m_server) {
108  m_server->stop();
109  }
110  m_server.reset();
111  }
112 } // namespace edmtest
113 
edm::EDGetTokenT< IntProduct > m_tokenForProduce
TEMPL(T2) struct Divides void
Definition: Factorize.h:24
std::vector< edm::EDGetTokenT< IntProduct > > m_tokens
std::unique_ptr< test_acquire::WaitingServer > m_server
std::unique_ptr< test_acquire::Cache > beginStream(edm::StreamID) const override
void preallocate(edm::PreallocationConfiguration const &) override
#define DEFINE_FWK_MODULE(type)
Definition: MakerMacros.h:16
AcquireIntProducer(edm::ParameterSet const &pset)
void produce(edm::StreamID, edm::Event &, edm::EventSetup const &) const override
const unsigned int m_secondsToWaitForWork
const unsigned int m_numberOfStreamsToAccumulate
HLT enums.
std::vector< int > const & processed() const
Definition: WaitingServer.h:62
unsigned int value() const
Definition: StreamID.h:43
void acquire(edm::StreamID, edm::Event const &, edm::EventSetup const &, edm::WaitingTaskWithArenaHolder) const override
std::vector< int > const & retrieved() const
Definition: WaitingServer.h:59
Definition: event.py:1