CMS 3D CMS Logo

EDMetadata.cc
Go to the documentation of this file.
1 #include <alpaka/alpaka.hpp>
2 
5 
7 #ifndef ALPAKA_ACC_CPU_B_SEQ_T_SEQ_ENABLED
9  // Make sure that the production of the product in the GPU is
10  // complete before destructing the product. This is to make sure
11  // that the EDM stream does not move to the next event before all
12  // asynchronous processing of the current is complete.
13 
14  // TODO: a callback notifying a WaitingTaskHolder (or similar)
15  // would avoid blocking the CPU, but would also require more work.
16 
17  // If event_ is null, the EDMetadata was either
18  // default-constructed, or fully synchronized before leaving the
19  // produce() call, so no synchronization is needed.
20  // If the queue was re-used, then some other EDMetadata object in
21  // the same edm::Event records the event_ (in the same queue) and
22  // calls alpaka::wait(), and therefore this wait() call can be
23  // skipped).
24  if (event_ and not eventComplete_ and mayReuseQueue_) {
25  // Must not throw in a destructor, and if there were an
26  // exception could not really propagate it anyway.
27  CMS_SA_ALLOW try { alpaka::wait(*event_); } catch (...) {
28  }
29  }
30  }
31 
33  alpaka::enqueue(*queue_, alpaka::HostOnlyTask([holder = std::move(holder)](std::exception_ptr eptr) {
34  // The functor is required to be const, but the original waitingTaskHolder_
35  // needs to be notified...
36  const_cast<edm::WaitingTaskWithArenaHolder&>(holder).doneWaiting(eptr);
37  }));
38  }
39 
40  void EDMetadata::synchronize(EDMetadata& consumer, bool tryReuseQueue) const {
41  if (*queue_ == *consumer.queue_) {
42  return;
43  }
44 
45  if (tryReuseQueue) {
46  if (auto queue = tryReuseQueue_()) {
47  consumer.queue_ = queue_;
48  return;
49  }
50  }
51 
52  if (eventComplete_) {
53  return;
54  }
55 
56  // TODO: how necessary this check is?
57  if (alpaka::getDev(*queue_) != alpaka::getDev(*consumer.queue_)) {
58  throw edm::Exception(edm::errors::LogicError) << "Handling data from multiple devices is not yet supported";
59  }
60 
61  // If the event has been discarded, the produce() function that
62  // constructed this EDMetadata object did not launch any
63  // asynchronous work.
64  if (not event_) {
65  return;
66  }
67 
68  if (alpaka::isComplete(*event_)) {
69  eventComplete_ = true;
70  } else {
71  // Event not yet occurred, so need to add synchronization
72  // here. Sychronization is done by making the queue to wait
73  // for an event, so all subsequent work in the queue will run
74  // only after the event has "occurred" (i.e. data product
75  // became available).
76  alpaka::wait(*consumer.queue_, *event_);
77  }
78  }
79 
80  std::shared_ptr<Queue> EDMetadata::tryReuseQueue_() const {
81  bool expected = true;
82  if (mayReuseQueue_.compare_exchange_strong(expected, false)) {
83  // If the current thread is the one flipping the flag, it may
84  // reuse the queue.
85  return queue_;
86  }
87  return nullptr;
88  }
89 #endif
90 } // namespace ALPAKA_ACCELERATOR_NAMESPACE
#define CMS_SA_ALLOW
std::shared_ptr< Event > event_
Definition: EDMetadata.h:92
void enqueueCallback(edm::WaitingTaskWithArenaHolder holder)
Definition: EDMetadata.cc:32
std::shared_ptr< Queue > queue_
Definition: EDMetadata.h:91
void synchronize(EDMetadata &consumer, bool tryReuseQueue) const
Definition: EDMetadata.cc:40
std::shared_ptr< Queue > tryReuseQueue_() const
Definition: EDMetadata.cc:80
def move(src, dest)
Definition: eostools.py:511