CMS 3D CMS Logo

EDMetadata.cc
Go to the documentation of this file.
1 #include <alpaka/alpaka.hpp>
2 
5 
7 #ifndef ALPAKA_ACC_CPU_B_SEQ_T_SEQ_ENABLED
9  // Make sure that the production of the product in the GPU is
10  // complete before destructing the product. This is to make sure
11  // that the EDM stream does not move to the next event before all
12  // asynchronous processing of the current is complete.
13 
14  // TODO: a callback notifying a WaitingTaskHolder (or similar)
15  // would avoid blocking the CPU, but would also require more work.
16 
17  if (event_) {
18  // Must not throw in a destructor, and if there were an
19  // exception could not really propagate it anyway.
20  CMS_SA_ALLOW try { alpaka::wait(*event_); } catch (...) {
21  }
22  }
23  }
24 
26  alpaka::enqueue(*queue_, alpaka::HostOnlyTask([holder = std::move(holder)](std::exception_ptr eptr) {
27  // The functor is required to be const, but the original waitingTaskHolder_
28  // needs to be notified...
29  const_cast<edm::WaitingTaskWithArenaHolder&>(holder).doneWaiting(eptr);
30  }));
31  }
32 
33  void EDMetadata::synchronize(EDMetadata& consumer, bool tryReuseQueue) const {
34  if (*queue_ == *consumer.queue_) {
35  return;
36  }
37 
38  if (tryReuseQueue) {
39  if (auto queue = tryReuseQueue_()) {
40  consumer.queue_ = queue_;
41  return;
42  }
43  }
44 
45  // TODO: how necessary this check is?
46  if (alpaka::getDev(*queue_) != alpaka::getDev(*consumer.queue_)) {
47  throw edm::Exception(edm::errors::LogicError) << "Handling data from multiple devices is not yet supported";
48  }
49 
50  if (not alpaka::isComplete(*event_)) {
51  // Event not yet occurred, so need to add synchronization
52  // here. Sychronization is done by making the queue to wait
53  // for an event, so all subsequent work in the queue will run
54  // only after the event has "occurred" (i.e. data product
55  // became available).
56  alpaka::wait(*consumer.queue_, *event_);
57  }
58  }
59 
60  std::shared_ptr<Queue> EDMetadata::tryReuseQueue_() const {
61  bool expected = true;
62  if (mayReuseQueue_.compare_exchange_strong(expected, false)) {
63  // If the current thread is the one flipping the flag, it may
64  // reuse the queue.
65  return queue_;
66  }
67  return nullptr;
68  }
69 #endif
70 } // namespace ALPAKA_ACCELERATOR_NAMESPACE
#define CMS_SA_ALLOW
std::shared_ptr< Event > event_
Definition: EDMetadata.h:90
void enqueueCallback(edm::WaitingTaskWithArenaHolder holder)
Definition: EDMetadata.cc:25
std::shared_ptr< Queue > queue_
Definition: EDMetadata.h:89
void synchronize(EDMetadata &consumer, bool tryReuseQueue) const
Definition: EDMetadata.cc:33
std::shared_ptr< Queue > tryReuseQueue_() const
Definition: EDMetadata.cc:60
def move(src, dest)
Definition: eostools.py:511