17 #include "tbb/global_control.h"
21 class ModuleBeginJobSignalSentry {
23 ModuleBeginJobSignalSentry(
ActivityRegistry*
a, ModuleDescription
const& md) : a_(a), md_(&md) {
25 a_->preModuleBeginJobSignal_(*md_);
27 ~ModuleBeginJobSignalSentry() {
29 a_->postModuleBeginJobSignal_(*md_);
34 ModuleDescription
const* md_;
37 class ModuleEndJobSignalSentry {
39 ModuleEndJobSignalSentry(
ActivityRegistry* a, ModuleDescription
const& md) : a_(a), md_(&md) {
41 a_->preModuleEndJobSignal_(*md_);
43 ~ModuleEndJobSignalSentry() {
45 a_->postModuleEndJobSignal_(*md_);
50 ModuleDescription
const* md_;
53 class ModuleBeginStreamSignalSentry {
55 ModuleBeginStreamSignalSentry(
ActivityRegistry* a, StreamContext
const& sc, ModuleCallingContext
const& mcc)
56 : a_(a), sc_(sc), mcc_(mcc) {
58 a_->preModuleBeginStreamSignal_(sc_, mcc_);
60 ~ModuleBeginStreamSignalSentry() {
62 a_->postModuleBeginStreamSignal_(sc_, mcc_);
67 StreamContext
const& sc_;
68 ModuleCallingContext
const& mcc_;
71 class ModuleEndStreamSignalSentry {
73 ModuleEndStreamSignalSentry(
ActivityRegistry* a, StreamContext
const& sc, ModuleCallingContext
const& mcc)
74 : a_(a), sc_(sc), mcc_(mcc) {
76 a_->preModuleEndStreamSignal_(sc_, mcc_);
78 ~ModuleEndStreamSignalSentry() {
80 a_->postModuleEndStreamSignal_(sc_, mcc_);
85 StreamContext
const& sc_;
86 ModuleCallingContext
const& mcc_;
99 numberOfPathsLeftToRun_(0),
100 moduleCallingContext_(&iMD),
104 earlyDeleteHelper_(nullptr),
106 ranAcquireWithoutException_(
false) {}
161 edm::make_waiting_task([
id, successTask, iPrincipal,
this, weakToken, &group](std::exception_ptr
const*) {
166 timesRun_.fetch_add(1, std::memory_order_relaxed);
178 group.run([successTask]() {
187 std::vector<ProductResolverIndexAndSkipBit>
items;
190 for (
auto const&
item : items) {
192 bool skipCurrentProcess =
item.skipCurrentProcess();
198 choiceHolder.doneWaiting(std::exception_ptr{});
221 if UNLIKELY (tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism) == 1) {
222 auto taskGroup = iTask.
group();
224 taskGroup->run([
this, task =
std::move(iTask), iTrans, &iImpl, weakToken]() {
225 std::exception_ptr exceptPtr{};
226 iImpl.
taskArena()->execute([
this, iTrans, &iImpl, weakToken, &exceptPtr]() {
232 for (
size_t i = 0;
i !=
items.size(); ++
i) {
243 auto tempTask = task;
244 tempTask.doneWaiting(exceptPtr);
252 holder.doneWaiting(*iExcept);
254 holder.doneWaiting(std::exception_ptr{});
260 for (
size_t i = 0;
i !=
items.size(); ++
i) {
276 for (
auto const&
item : items) {
278 bool skipCurrentProcess =
item.skipCurrentProcess();
303 std::ostringstream ost;
316 ModuleEndJobSignalSentry cpp(
actReg_.get(), *
desc);
321 std::ostringstream ost;
344 std::ostringstream ost;
368 std::ostringstream ost;
409 timesRun_.fetch_add(1, std::memory_order_relaxed);
420 std::exception_ptr exceptionPtr;
424 exceptionPtr = *iEPtr;
430 runAcquire(eventTransitionInfo, parentContext, holder);
433 exceptionPtr = std::current_exception();
448 return std::current_exception();
455 tbb::task_group*
group,
458 : m_worker(worker), m_runModuleTask(runModuleTask), m_group(group), m_parentContext(parentContext) {}
461 auto excptr = exceptionPtr();
464 holder.
doneWaiting(m_worker->handleExternalWorkException(excptr, m_parentContext));
virtual void implDoAcquire(EventTransitionInfo const &, ModuleCallingContext const *, WaitingTaskWithArenaHolder &)=0
ServiceToken lock() const
void resetModuleDescription(ModuleDescription const *)
void setTimestamp(Timestamp const &v)
ModuleDescription const * description() const
void setState(State state)
std::atomic< int > numberOfPathsLeftToRun_
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
unsigned int ProductResolverIndex
std::exception_ptr syncWait(F &&iFunc)
void setEarlyDeleteHelper(EarlyDeleteHelper *iHelper)
ModuleCallingContext const * getTopModuleCallingContext() const
void prefetchAsync(WaitingTaskHolder iTask, ESProxyIndex iProxyIndex, EventSetupImpl const *, ServiceToken const &, ESParentContext) const
prefetch the data to setup for subsequent calls to getImplementation
std::shared_ptr< ActivityRegistry > actReg_
std::string const & moduleName() const
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom(BranchType) const =0
void runAcquireAfterAsyncPrefetch(std::exception_ptr const *, EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder)
std::string const & category() const
exception_actions::ActionCodes find(const std::string &category) const
void beginStream(StreamID id, StreamContext &streamContext)
ExceptionToActionTable const * actions_
virtual std::vector< ESProxyIndex > const & esItemsToGetFrom(Transition) const =0
std::string const & moduleLabel() const
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
bool ranAcquireWithoutException_
void setActivityRegistry(std::shared_ptr< ActivityRegistry > areg)
ParentContext const & parent() const
ModuleCallingContext moduleCallingContext_
void setTransition(Transition v)
void prefetchAsync(WaitingTaskHolder waitTask, ProductResolverIndex index, bool skipCurrentProcess, ServiceToken const &token, ModuleCallingContext const *mcc) const
std::exception_ptr cached_exception_
virtual bool implDoPrePrefetchSelection(StreamID, EventPrincipal const &, ModuleCallingContext const *)=0
virtual std::vector< ESRecordIndex > const & esRecordsToGetFrom(Transition) const =0
static RunIndex invalidRunIndex()
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
void doneWaiting(std::exception_ptr iExcept)
edm::WaitingTaskList waitingTasks_
void doneWaiting(std::exception_ptr iExcept)
virtual void implRegisterThinnedAssociations(ProductRegistry const &, ThinnedAssociationsHelper &)=0
void setLuminosityBlockIndex(LuminosityBlockIndex const &v)
BranchType const & branchType() const
PathContext const * pathContext() const
tbb::task_arena * taskArena() const
void registerThinnedAssociations(ProductRegistry const ®istry, ThinnedAssociationsHelper &helper)
Worker(ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
virtual void itemsToGetForSelection(std::vector< ProductResolverIndexAndSkipBit > &) const =0
FunctorWaitingTask< F > * make_waiting_task(F f)
std::atomic< State > state_
void esPrefetchAsync(WaitingTaskHolder, EventSetupImpl const &, Transition, ServiceToken const &)
virtual void implEndJob()=0
static LuminosityBlockIndex invalidLuminosityBlockIndex()
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent) const
virtual void implBeginJob()=0
void skipOnPath(EventPrincipal const &iEvent)
void addContext(std::string const &context)
virtual void implBeginStream(StreamID)=0
tbb::task_group * group() const noexcept
virtual void implEndStream(StreamID)=0
void edPrefetchAsync(WaitingTaskHolder, ServiceToken const &, Principal const &) const
void setEventID(EventID const &v)
ModuleCallingContext const * previousModuleOnThread() const
void runAcquire(EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder &)
void endStream(StreamID id, StreamContext &streamContext)
void postDoEvent(EventPrincipal const &)
auto wrap(F iFunc) -> decltype(iFunc())
unsigned int decrement_ref_count()
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
std::atomic< int > timesRun_
eventsetup::EventSetupRecordImpl const * findImpl(const eventsetup::EventSetupRecordKey &) const
PlaceInPathContext const * placeInPathContext() const
void setRunIndex(RunIndex const &v)
void prePrefetchSelectionAsync(tbb::task_group &, WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *)
HandleExternalWorkExceptionTask(Worker *worker, tbb::task_group *group, WaitingTask *runModuleTask, ParentContext const &parentContext)
void increment_ref_count()
std::exception_ptr handleExternalWorkException(std::exception_ptr const *iEPtr, ParentContext const &parentContext)
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)