20 class ModuleBeginJobTraits {
23 static void preModuleSignal(ActivityRegistry* activityRegistry,
25 ModuleCallingContext
const* moduleCallingContext) {
26 activityRegistry->preModuleBeginJobSignal_(*moduleCallingContext->moduleDescription());
28 static void postModuleSignal(ActivityRegistry* activityRegistry,
30 ModuleCallingContext
const* moduleCallingContext) {
31 activityRegistry->postModuleBeginJobSignal_(*moduleCallingContext->moduleDescription());
35 class ModuleEndJobTraits {
38 static void preModuleSignal(ActivityRegistry* activityRegistry,
40 ModuleCallingContext
const* moduleCallingContext) {
41 activityRegistry->preModuleEndJobSignal_(*moduleCallingContext->moduleDescription());
43 static void postModuleSignal(ActivityRegistry* activityRegistry,
45 ModuleCallingContext
const* moduleCallingContext) {
46 activityRegistry->postModuleEndJobSignal_(*moduleCallingContext->moduleDescription());
50 class ModuleBeginStreamTraits {
53 static void preModuleSignal(ActivityRegistry* activityRegistry,
54 StreamContext
const* streamContext,
55 ModuleCallingContext
const* moduleCallingContext) {
56 activityRegistry->preModuleBeginStreamSignal_(*streamContext, *moduleCallingContext);
58 static void postModuleSignal(ActivityRegistry* activityRegistry,
59 StreamContext
const* streamContext,
60 ModuleCallingContext
const* moduleCallingContext) {
61 activityRegistry->postModuleBeginStreamSignal_(*streamContext, *moduleCallingContext);
65 class ModuleEndStreamTraits {
68 static void preModuleSignal(ActivityRegistry* activityRegistry,
69 StreamContext
const* streamContext,
70 ModuleCallingContext
const* moduleCallingContext) {
71 activityRegistry->preModuleEndStreamSignal_(*streamContext, *moduleCallingContext);
73 static void postModuleSignal(ActivityRegistry* activityRegistry,
74 StreamContext
const* streamContext,
75 ModuleCallingContext
const* moduleCallingContext) {
76 activityRegistry->postModuleEndStreamSignal_(*streamContext, *moduleCallingContext);
90 numberOfPathsLeftToRun_(0),
91 moduleCallingContext_(&iMD),
95 earlyDeleteHelper_(nullptr),
97 ranAcquireWithoutException_(
false) {
107 if (
pset and
pset->exists(
"@shouldTryToContinue")) {
115 bool shouldTryToContinue)
const noexcept {
134 if (shouldTryToContinue) {
137 return not shouldTryToContinue;
152 successTask->increment_ref_count();
160 if (not implDoPrePrefetchSelection(
id, *iPrincipal, &moduleCallingContext_)) {
161 timesRun_.fetch_add(1, std::memory_order_relaxed);
163 waitingTasks_.doneWaiting(
nullptr);
165 if (0 == successTask->decrement_ref_count()) {
178 setException<true>(std::current_exception());
179 waitingTasks_.doneWaiting(std::current_exception());
181 if (0 == successTask->decrement_ref_count()) {
186 if (0 == successTask->decrement_ref_count()) {
187 group.run([successTask]() {
189 successTask->execute();
196 std::vector<ProductResolverIndexAndSkipBit>
items;
197 itemsToGetForSelection(
items);
201 bool skipCurrentProcess =
item.skipCurrentProcess();
204 iPrincipal->prefetchAsync(
205 choiceHolder, productResolverIndex, skipCurrentProcess,
token, &moduleCallingContext_);
208 choiceHolder.doneWaiting(std::exception_ptr{});
218 auto const& recs = esRecordsToGetFrom(iTrans);
219 auto const&
items = esItemsToGetFrom(iTrans);
226 for (
size_t i = 0;
i !=
items.size(); ++
i) {
228 auto rec = iImpl.findImpl(recs[
i]);
238 Principal const& iPrincipal)
const noexcept {
240 std::vector<ProductResolverIndexAndSkipBit>
const&
items = itemsToGetFrom(iPrincipal.branchType());
244 bool skipCurrentProcess =
item.skipCurrentProcess();
246 iPrincipal.prefetchAsync(iTask, productResolverIndex, skipCurrentProcess,
token, &moduleCallingContext_);
255 size_t iTransformIndex,
265 [
this, iTask, weakToken, &iPrincipal, iTransformIndex, mcc](std::exception_ptr
const* iExcept)
mutable {
267 actReg_->postModuleTransformPrefetchingSignal_.emit(*mcc.getStreamContext(), mcc);
269 iTask.doneWaiting(*iExcept);
272 implDoTransformAsync(iTask, iTransformIndex, iPrincipal, mcc.parent(), weakToken);
276 actReg_->preModuleTransformPrefetchingSignal_.emit(*mcc.getStreamContext(), mcc);
277 iPrincipal.prefetchAsync(
278 WaitingTaskHolder(*iTask.group(),
task), itemToGetForTransform(iTransformIndex),
false, iToken, &mcc);
300 sentry.preModuleSignal();
302 sentry.postModuleSignal();
321 sentry.preModuleSignal();
323 sentry.postModuleSignal();
340 sentry.preModuleSignal();
342 sentry.postModuleSignal();
361 sentry.preModuleSignal();
363 sentry.postModuleSignal();
405 timesRun_.fetch_add(1, std::memory_order_relaxed);
415 ranAcquireWithoutException_ =
false;
416 std::exception_ptr exceptionPtr;
418 if (shouldRethrowException(iEPtr, parentContext,
true, shouldTryToContinue_)) {
419 exceptionPtr = iEPtr;
425 runAcquire(eventTransitionInfo, parentContext, holder);
426 ranAcquireWithoutException_ =
true;
428 exceptionPtr = std::current_exception();
432 holder.doneWaiting(exceptionPtr);
437 if (ranAcquireWithoutException_) {
443 return std::current_exception();
450 oneapi::tbb::task_group*
group,
453 : m_worker(worker), m_runModuleTask(runModuleTask), m_group(
group), m_parentContext(parentContext) {}
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, bool isTryToContinue) const noexcept
virtual void implDoAcquire(EventTransitionInfo const &, ModuleCallingContext const *, WaitingTaskWithArenaHolder &)=0
void resetModuleDescription(ModuleDescription const *)
std::atomic< int > numberOfPathsLeftToRun_
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
bool getMapped(key_type const &k, value_type &result) const
unsigned int ProductResolverIndex
void setEarlyDeleteHelper(EarlyDeleteHelper *iHelper)
std::exception_ptr exceptionPtr() const noexcept
Returns exception thrown by dependent task.
ModuleCallingContext const * previousModuleOnThread() const noexcept
ParentContext const m_parentContext
WaitingTask * m_runModuleTask
void doTransformAsync(WaitingTaskHolder, size_t iTransformIndex, EventPrincipal const &, ServiceToken const &, StreamID, ModuleCallingContext const &, StreamContext const *) noexcept
std::shared_ptr< ActivityRegistry > actReg_
void beginStream(StreamID, StreamContext const &)
ActivityRegistry * activityRegistry()
std::string const & category() const
void runAcquireAfterAsyncPrefetch(std::exception_ptr, EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder) noexcept
void setActivityRegistry(std::shared_ptr< ActivityRegistry > areg)
oneapi::tbb::task_group * m_group
ModuleCallingContext moduleCallingContext_
void edPrefetchAsync(WaitingTaskHolder, ServiceToken const &, Principal const &) const noexcept
std::exception_ptr handleExternalWorkException(std::exception_ptr iEPtr, ParentContext const &parentContext) noexcept
std::exception_ptr cached_exception_
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
ParameterSetID const & parameterSetID() const
edm::WaitingTaskList waitingTasks_
virtual void implRegisterThinnedAssociations(ProductRegistry const &, ThinnedAssociationsHelper &)=0
void endStream(StreamID, StreamContext const &)
void registerThinnedAssociations(ProductRegistry const ®istry, ThinnedAssociationsHelper &helper)
Worker(ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
FunctorWaitingTask< F > * make_waiting_task(F f)
ModuleDescription const * description() const noexcept
virtual void implEndJob()=0
ServiceToken lock() const
void doneWaiting(std::exception_ptr iExcept) noexcept
HandleExternalWorkExceptionTask(Worker *worker, oneapi::tbb::task_group *group, WaitingTask *runModuleTask, ParentContext const &parentContext) noexcept
ParentContext const & parent() const noexcept
virtual void implBeginJob()=0
void prePrefetchSelectionAsync(oneapi::tbb::task_group &, WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *) noexcept
void skipOnPath(EventPrincipal const &iEvent)
void addContext(std::string const &context)
virtual void implBeginStream(StreamID)=0
virtual void implEndStream(StreamID)=0
State state() const noexcept
void checkForShouldTryToContinue(ModuleDescription const &)
void runAcquire(EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder &)
void endJob(GlobalContext const &)
void postDoEvent(EventPrincipal const &)
void esPrefetchAsync(WaitingTaskHolder, EventSetupImpl const &, Transition, ServiceToken const &) noexcept
auto wrap(F iFunc) -> decltype(iFunc())
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
std::atomic< int > timesRun_
void beginJob(GlobalContext const &)
virtual size_t transformIndex(edm::BranchDescription const &) const noexcept=0
static Registry * instance()
bool shouldTryToContinue_
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)