src
HeterogeneousCore
AlpakaCore
src
alpaka
EDMetadata.cc
Go to the documentation of this file.
1
#include <alpaka/alpaka.hpp>
2
3
#include "
FWCore/Concurrency/interface/Async.h
"
4
#include "
FWCore/ServiceRegistry/interface/Service.h
"
5
#include "
FWCore/Utilities/interface/EDMException.h
"
6
#include "
HeterogeneousCore/AlpakaCore/interface/alpaka/EDMetadata.h
"
7
8
namespace
ALPAKA_ACCELERATOR_NAMESPACE
{
9
#ifndef ALPAKA_ACC_CPU_B_SEQ_T_SEQ_ENABLED
10
EDMetadata::~EDMetadata
() {
11
// Make sure that the production of the product in the GPU is
12
// complete before destructing the product. This is to make sure
13
// that the EDM stream does not move to the next event before all
14
// asynchronous processing of the current is complete.
15
16
// TODO: a callback notifying a WaitingTaskHolder (or similar)
17
// would avoid blocking the CPU, but would also require more work.
18
19
// If event_ is null, the EDMetadata was either
20
// default-constructed, or fully synchronized before leaving the
21
// produce() call, so no synchronization is needed.
22
// If the queue was re-used, then some other EDMetadata object in
23
// the same edm::Event records the event_ (in the same queue) and
24
// calls alpaka::wait(), and therefore this wait() call can be
25
// skipped).
26
if
(
event_
and not
eventComplete_
and
mayReuseQueue_
) {
27
// Must not throw in a destructor, and if there were an
28
// exception could not really propagate it anyway.
29
CMS_SA_ALLOW
try
{
alpaka::wait
(*
event_
); }
catch
(...) {
30
}
31
}
32
}
33
34
void
EDMetadata::enqueueCallback
(
edm::WaitingTaskWithArenaHolder
holder) {
35
edm::Service<edm::Async>
async;
36
recordEvent
();
37
async->
runAsync
(
38
std::move
(holder),
39
[
event
=
event_
]()
mutable
{
alpaka::wait
(*
event
); },
40
[]() {
return
"Enqueued via "
EDM_STRINGIZE
(
ALPAKA_ACCELERATOR_NAMESPACE
)
"::EDMetadata::enqueueCallback()"
; });
41
}
42
43
void
EDMetadata::synchronize
(
EDMetadata
&
consumer
,
bool
tryReuseQueue)
const
{
44
if
(*
queue_
== *
consumer
.queue_) {
45
return
;
46
}
47
48
if
(tryReuseQueue) {
49
if
(
auto
queue
=
tryReuseQueue_
()) {
50
consumer
.queue_ =
queue_
;
51
return
;
52
}
53
}
54
55
if
(
eventComplete_
) {
56
return
;
57
}
58
59
// TODO: how necessary this check is?
60
if
(alpaka::getDev(*
queue_
) != alpaka::getDev(*
consumer
.queue_)) {
61
throw
edm::Exception
(
edm::errors::LogicError
) <<
"Handling data from multiple devices is not yet supported"
;
62
}
63
64
// If the event has been discarded, the produce() function that
65
// constructed this EDMetadata object did not launch any
66
// asynchronous work.
67
if
(not
event_
) {
68
return
;
69
}
70
71
if
(alpaka::isComplete(*
event_
)) {
72
eventComplete_
=
true
;
73
}
else
{
74
// Event not yet occurred, so need to add synchronization
75
// here. Sychronization is done by making the queue to wait
76
// for an event, so all subsequent work in the queue will run
77
// only after the event has "occurred" (i.e. data product
78
// became available).
79
alpaka::wait
(*
consumer
.queue_, *
event_
);
80
}
81
}
82
83
std::shared_ptr<Queue>
EDMetadata::tryReuseQueue_
()
const
{
84
bool
expected =
true
;
85
if
(
mayReuseQueue_
.compare_exchange_strong(expected,
false
)) {
86
// If the current thread is the one flipping the flag, it may
87
// reuse the queue.
88
return
queue_
;
89
}
90
return
nullptr
;
91
}
92
#endif
93
}
// namespace ALPAKA_ACCELERATOR_NAMESPACE
SequenceTypes.wait
def wait(seq)
Definition:
SequenceTypes.py:645
CMS_SA_ALLOW
#define CMS_SA_ALLOW
edm::Service
Definition:
Service.h:30
Exception
Definition:
hltDiff.cc:245
Async.h
ALPAKA_ACCELERATOR_NAMESPACE::EDMetadata::event_
std::shared_ptr< Event > event_
Definition:
EDMetadata.h:91
ALPAKA_ACCELERATOR_NAMESPACE::EDMetadata
Definition:
EDMetadata.h:61
ALPAKA_ACCELERATOR_NAMESPACE::EDMetadata::enqueueCallback
void enqueueCallback(edm::WaitingTaskWithArenaHolder holder)
Definition:
EDMetadata.cc:34
edm::WaitingTaskWithArenaHolder
Definition:
WaitingTaskWithArenaHolder.h:34
ALPAKA_ACCELERATOR_NAMESPACE
Definition:
SiPixelCablingSoAESProducer.cc:21
ALPAKA_ACCELERATOR_NAMESPACE::EDMetadata::queue_
std::shared_ptr< Queue > queue_
Definition:
EDMetadata.h:90
ALPAKA_ACCELERATOR_NAMESPACE::EDMetadata::queue
Queue & queue() const
Definition:
EDMetadata.h:71
EDMException.h
edm::Async::runAsync
void runAsync(WaitingTaskWithArenaHolder holder, F &&func, G &&errorContextFunc)
Definition:
Async.h:21
ALPAKA_ACCELERATOR_NAMESPACE::EDMetadata::synchronize
void synchronize(EDMetadata &consumer, bool tryReuseQueue) const
Definition:
EDMetadata.cc:43
Service.h
ALPAKA_ACCELERATOR_NAMESPACE::EDMetadata::eventComplete_
std::atomic< bool > eventComplete_
Definition:
EDMetadata.h:97
ALPAKA_ACCELERATOR_NAMESPACE::EDMetadata::tryReuseQueue_
std::shared_ptr< Queue > tryReuseQueue_() const
Definition:
EDMetadata.cc:83
ALPAKA_ACCELERATOR_NAMESPACE::EDMetadata::mayReuseQueue_
std::atomic< bool > mayReuseQueue_
Definition:
EDMetadata.h:95
EDMetadata.h
EDM_STRINGIZE
#define EDM_STRINGIZE(token)
Definition:
stringize.h:6
ALPAKA_ACCELERATOR_NAMESPACE::EDMetadata::~EDMetadata
~EDMetadata()
Definition:
EDMetadata.cc:10
ALPAKA_ACCELERATOR_NAMESPACE::EDMetadata::recordEvent
void recordEvent()
Definition:
EDMetadata.h:75
eostools.move
def move(src, dest)
Definition:
eostools.py:511
OfflineOutput_cfi.consumer
consumer
Definition:
OfflineOutput_cfi.py:3
event
Definition:
event.py:1
edm::errors::LogicError
Definition:
EDMException.h:37
Generated for CMSSW Reference Manual by
1.8.14