CMS 3D CMS Logo

List of all members | Public Member Functions | Private Member Functions | Private Attributes
ALPAKA_ACCELERATOR_NAMESPACE::EDMetadata Class Reference

#include <EDMetadata.h>

Public Member Functions

Device device () const
 
void discardEvent ()
 
 EDMetadata (std::shared_ptr< Queue > queue, std::shared_ptr< Event > event)
 
void enqueueCallback (edm::WaitingTaskWithArenaHolder holder)
 
Queue & queue () const
 
void recordEvent ()
 
void synchronize (EDMetadata &consumer, bool tryReuseQueue) const
 
 ~EDMetadata ()
 

Private Member Functions

std::shared_ptr< Queue > tryReuseQueue_ () const
 

Private Attributes

std::shared_ptr< Eventevent_
 
std::atomic< bool > eventComplete_ = false
 
std::atomic< bool > mayReuseQueue_ = true
 
std::shared_ptr< Queue > queue_
 

Detailed Description

The EDMetadata class provides the exact synchronization mechanisms for Event data products for backends with asynchronous Queue. These include

For synchronous backends the EDMetadata acts as an owner of the Queue object, as no further synchronization is needed.

EDMetadata is used as the Metadata class for edm::DeviceProduct<T>, and is an implementation detail (not visible to user code).

TODO: What to do with device-synchronous backends? The data product needs to be wrapped into the edm::DeviceProduct, but the EDMetadata class used there does not need anything except "dummy" implementation of synchronize(). The question is clearly solvable, so maybe leave it to the time we would actually need one?

Definition at line 61 of file EDMetadata.h.

Constructor & Destructor Documentation

◆ EDMetadata()

ALPAKA_ACCELERATOR_NAMESPACE::EDMetadata::EDMetadata ( std::shared_ptr< Queue >  queue,
std::shared_ptr< Event event 
)
inline

Definition at line 63 of file EDMetadata.h.

std::shared_ptr< Event > event_
Definition: EDMetadata.h:91
std::shared_ptr< Queue > queue_
Definition: EDMetadata.h:90
def move(src, dest)
Definition: eostools.py:511
Definition: event.py:1

◆ ~EDMetadata()

ALPAKA_ACCELERATOR_NAMESPACE::EDMetadata::~EDMetadata ( )

Definition at line 10 of file EDMetadata.cc.

References CMS_SA_ALLOW, event_, eventComplete_, mayReuseQueue_, and SequenceTypes::wait().

10  {
11  // Make sure that the production of the product in the GPU is
12  // complete before destructing the product. This is to make sure
13  // that the EDM stream does not move to the next event before all
14  // asynchronous processing of the current is complete.
15 
16  // TODO: a callback notifying a WaitingTaskHolder (or similar)
17  // would avoid blocking the CPU, but would also require more work.
18 
19  // If event_ is null, the EDMetadata was either
20  // default-constructed, or fully synchronized before leaving the
21  // produce() call, so no synchronization is needed.
22  // If the queue was re-used, then some other EDMetadata object in
23  // the same edm::Event records the event_ (in the same queue) and
24  // calls alpaka::wait(), and therefore this wait() call can be
25  // skipped).
26  if (event_ and not eventComplete_ and mayReuseQueue_) {
27  // Must not throw in a destructor, and if there were an
28  // exception could not really propagate it anyway.
29  CMS_SA_ALLOW try { alpaka::wait(*event_); } catch (...) {
30  }
31  }
32  }
#define CMS_SA_ALLOW
std::shared_ptr< Event > event_
Definition: EDMetadata.h:91

Member Function Documentation

◆ device()

Device ALPAKA_ACCELERATOR_NAMESPACE::EDMetadata::device ( ) const
inline

Definition at line 67 of file EDMetadata.h.

References queue_.

67 { return alpaka::getDev(*queue_); }
std::shared_ptr< Queue > queue_
Definition: EDMetadata.h:90

◆ discardEvent()

void ALPAKA_ACCELERATOR_NAMESPACE::EDMetadata::discardEvent ( )
inline

Definition at line 76 of file EDMetadata.h.

References event_.

76 { event_.reset(); }
std::shared_ptr< Event > event_
Definition: EDMetadata.h:91

◆ enqueueCallback()

void ALPAKA_ACCELERATOR_NAMESPACE::EDMetadata::enqueueCallback ( edm::WaitingTaskWithArenaHolder  holder)

Definition at line 34 of file EDMetadata.cc.

References EDM_STRINGIZE, event_, eostools::move(), recordEvent(), edm::Async::runAsync(), and SequenceTypes::wait().

34  {
36  recordEvent();
37  async->runAsync(
38  std::move(holder),
39  [event = event_]() mutable { alpaka::wait(*event); },
40  []() { return "Enqueued via " EDM_STRINGIZE(ALPAKA_ACCELERATOR_NAMESPACE) "::EDMetadata::enqueueCallback()"; });
41  }
std::shared_ptr< Event > event_
Definition: EDMetadata.h:91
void runAsync(WaitingTaskWithArenaHolder holder, F &&func, G &&errorContextFunc)
Definition: Async.h:21
#define EDM_STRINGIZE(token)
Definition: stringize.h:6
def move(src, dest)
Definition: eostools.py:511
Definition: event.py:1

◆ queue()

Queue& ALPAKA_ACCELERATOR_NAMESPACE::EDMetadata::queue ( ) const
inline

Definition at line 71 of file EDMetadata.h.

References queue_.

Referenced by synchronize().

71 { return *queue_; }
std::shared_ptr< Queue > queue_
Definition: EDMetadata.h:90

◆ recordEvent()

void ALPAKA_ACCELERATOR_NAMESPACE::EDMetadata::recordEvent ( )
inline

Definition at line 75 of file EDMetadata.h.

References event_, and queue_.

Referenced by enqueueCallback().

75 { alpaka::enqueue(*queue_, *event_); }
std::shared_ptr< Event > event_
Definition: EDMetadata.h:91
std::shared_ptr< Queue > queue_
Definition: EDMetadata.h:90

◆ synchronize()

void ALPAKA_ACCELERATOR_NAMESPACE::EDMetadata::synchronize ( EDMetadata consumer,
bool  tryReuseQueue 
) const

Synchronizes 'consumer' metadata wrt. 'this' in the event product

Definition at line 43 of file EDMetadata.cc.

References OfflineOutput_cfi::consumer, event_, eventComplete_, Exception, edm::errors::LogicError, queue(), queue_, tryReuseQueue_(), and SequenceTypes::wait().

43  {
44  if (*queue_ == *consumer.queue_) {
45  return;
46  }
47 
48  if (tryReuseQueue) {
49  if (auto queue = tryReuseQueue_()) {
50  consumer.queue_ = queue_;
51  return;
52  }
53  }
54 
55  if (eventComplete_) {
56  return;
57  }
58 
59  // TODO: how necessary this check is?
60  if (alpaka::getDev(*queue_) != alpaka::getDev(*consumer.queue_)) {
61  throw edm::Exception(edm::errors::LogicError) << "Handling data from multiple devices is not yet supported";
62  }
63 
64  // If the event has been discarded, the produce() function that
65  // constructed this EDMetadata object did not launch any
66  // asynchronous work.
67  if (not event_) {
68  return;
69  }
70 
71  if (alpaka::isComplete(*event_)) {
72  eventComplete_ = true;
73  } else {
74  // Event not yet occurred, so need to add synchronization
75  // here. Sychronization is done by making the queue to wait
76  // for an event, so all subsequent work in the queue will run
77  // only after the event has "occurred" (i.e. data product
78  // became available).
79  alpaka::wait(*consumer.queue_, *event_);
80  }
81  }
std::shared_ptr< Event > event_
Definition: EDMetadata.h:91
std::shared_ptr< Queue > queue_
Definition: EDMetadata.h:90
std::shared_ptr< Queue > tryReuseQueue_() const
Definition: EDMetadata.cc:83

◆ tryReuseQueue_()

std::shared_ptr< Queue > ALPAKA_ACCELERATOR_NAMESPACE::EDMetadata::tryReuseQueue_ ( ) const
private

Returns a shared_ptr to the Queue if it can be reused, or a null shared_ptr if not

Definition at line 83 of file EDMetadata.cc.

References mayReuseQueue_, and queue_.

Referenced by synchronize().

83  {
84  bool expected = true;
85  if (mayReuseQueue_.compare_exchange_strong(expected, false)) {
86  // If the current thread is the one flipping the flag, it may
87  // reuse the queue.
88  return queue_;
89  }
90  return nullptr;
91  }
std::shared_ptr< Queue > queue_
Definition: EDMetadata.h:90

Member Data Documentation

◆ event_

std::shared_ptr<Event> ALPAKA_ACCELERATOR_NAMESPACE::EDMetadata::event_
private

Definition at line 91 of file EDMetadata.h.

Referenced by discardEvent(), enqueueCallback(), recordEvent(), synchronize(), and ~EDMetadata().

◆ eventComplete_

std::atomic<bool> ALPAKA_ACCELERATOR_NAMESPACE::EDMetadata::eventComplete_ = false
mutableprivate

Definition at line 97 of file EDMetadata.h.

Referenced by synchronize(), and ~EDMetadata().

◆ mayReuseQueue_

std::atomic<bool> ALPAKA_ACCELERATOR_NAMESPACE::EDMetadata::mayReuseQueue_ = true
mutableprivate

Definition at line 95 of file EDMetadata.h.

Referenced by tryReuseQueue_(), and ~EDMetadata().

◆ queue_

std::shared_ptr<Queue> ALPAKA_ACCELERATOR_NAMESPACE::EDMetadata::queue_
private

Definition at line 90 of file EDMetadata.h.

Referenced by device(), queue(), recordEvent(), synchronize(), and tryReuseQueue_().