CMS 3D CMS Logo

EDMetadata.cc
Go to the documentation of this file.
1 #include <alpaka/alpaka.hpp>
2 
7 
9 #ifndef ALPAKA_ACC_CPU_B_SEQ_T_SEQ_ENABLED
11  // Make sure that the production of the product in the GPU is
12  // complete before destructing the product. This is to make sure
13  // that the EDM stream does not move to the next event before all
14  // asynchronous processing of the current is complete.
15 
16  // TODO: a callback notifying a WaitingTaskHolder (or similar)
17  // would avoid blocking the CPU, but would also require more work.
18 
19  // If event_ is null, the EDMetadata was either
20  // default-constructed, or fully synchronized before leaving the
21  // produce() call, so no synchronization is needed.
22  // If the queue was re-used, then some other EDMetadata object in
23  // the same edm::Event records the event_ (in the same queue) and
24  // calls alpaka::wait(), and therefore this wait() call can be
25  // skipped).
26  if (event_ and not eventComplete_ and mayReuseQueue_) {
27  // Must not throw in a destructor, and if there were an
28  // exception could not really propagate it anyway.
29  CMS_SA_ALLOW try { alpaka::wait(*event_); } catch (...) {
30  }
31  }
32  }
33 
36  recordEvent();
37  async->runAsync(
38  std::move(holder),
39  [event = event_]() mutable { alpaka::wait(*event); },
40  []() { return "Enqueued via " EDM_STRINGIZE(ALPAKA_ACCELERATOR_NAMESPACE) "::EDMetadata::enqueueCallback()"; });
41  }
42 
43  void EDMetadata::synchronize(EDMetadata& consumer, bool tryReuseQueue) const {
44  if (*queue_ == *consumer.queue_) {
45  return;
46  }
47 
48  if (tryReuseQueue) {
49  if (auto queue = tryReuseQueue_()) {
50  consumer.queue_ = queue_;
51  return;
52  }
53  }
54 
55  if (eventComplete_) {
56  return;
57  }
58 
59  // TODO: how necessary this check is?
60  if (alpaka::getDev(*queue_) != alpaka::getDev(*consumer.queue_)) {
61  throw edm::Exception(edm::errors::LogicError) << "Handling data from multiple devices is not yet supported";
62  }
63 
64  // If the event has been discarded, the produce() function that
65  // constructed this EDMetadata object did not launch any
66  // asynchronous work.
67  if (not event_) {
68  return;
69  }
70 
71  if (alpaka::isComplete(*event_)) {
72  eventComplete_ = true;
73  } else {
74  // Event not yet occurred, so need to add synchronization
75  // here. Sychronization is done by making the queue to wait
76  // for an event, so all subsequent work in the queue will run
77  // only after the event has "occurred" (i.e. data product
78  // became available).
79  alpaka::wait(*consumer.queue_, *event_);
80  }
81  }
82 
83  std::shared_ptr<Queue> EDMetadata::tryReuseQueue_() const {
84  bool expected = true;
85  if (mayReuseQueue_.compare_exchange_strong(expected, false)) {
86  // If the current thread is the one flipping the flag, it may
87  // reuse the queue.
88  return queue_;
89  }
90  return nullptr;
91  }
92 #endif
93 } // namespace ALPAKA_ACCELERATOR_NAMESPACE
#define CMS_SA_ALLOW
std::shared_ptr< Event > event_
Definition: EDMetadata.h:91
void enqueueCallback(edm::WaitingTaskWithArenaHolder holder)
Definition: EDMetadata.cc:34
std::shared_ptr< Queue > queue_
Definition: EDMetadata.h:90
void runAsync(WaitingTaskWithArenaHolder holder, F &&func, G &&errorContextFunc)
Definition: Async.h:21
void synchronize(EDMetadata &consumer, bool tryReuseQueue) const
Definition: EDMetadata.cc:43
std::shared_ptr< Queue > tryReuseQueue_() const
Definition: EDMetadata.cc:83
#define EDM_STRINGIZE(token)
Definition: stringize.h:6
def move(src, dest)
Definition: eostools.py:511
Definition: event.py:1