19 class ModuleBeginJobSignalSentry {
21 ModuleBeginJobSignalSentry(
ActivityRegistry*
a, ModuleDescription
const& md) : a_(
a), md_(&md) {
23 a_->preModuleBeginJobSignal_(*md_);
25 ~ModuleBeginJobSignalSentry() {
27 a_->postModuleBeginJobSignal_(*md_);
32 ModuleDescription
const* md_;
35 class ModuleEndJobSignalSentry {
37 ModuleEndJobSignalSentry(
ActivityRegistry*
a, ModuleDescription
const& md) : a_(
a), md_(&md) {
39 a_->preModuleEndJobSignal_(*md_);
41 ~ModuleEndJobSignalSentry() {
43 a_->postModuleEndJobSignal_(*md_);
48 ModuleDescription
const* md_;
51 class ModuleBeginStreamSignalSentry {
53 ModuleBeginStreamSignalSentry(
ActivityRegistry*
a, StreamContext
const& sc, ModuleCallingContext
const& mcc)
54 : a_(
a), sc_(sc), mcc_(mcc) {
56 a_->preModuleBeginStreamSignal_(sc_, mcc_);
58 ~ModuleBeginStreamSignalSentry() {
60 a_->postModuleBeginStreamSignal_(sc_, mcc_);
65 StreamContext
const& sc_;
66 ModuleCallingContext
const& mcc_;
69 class ModuleEndStreamSignalSentry {
71 ModuleEndStreamSignalSentry(
ActivityRegistry*
a, StreamContext
const& sc, ModuleCallingContext
const& mcc)
72 : a_(
a), sc_(sc), mcc_(mcc) {
74 a_->preModuleEndStreamSignal_(sc_, mcc_);
76 ~ModuleEndStreamSignalSentry() {
78 a_->postModuleEndStreamSignal_(sc_, mcc_);
83 StreamContext
const& sc_;
84 ModuleCallingContext
const& mcc_;
97 numberOfPathsLeftToRun_(0),
98 moduleCallingContext_(&iMD),
102 earlyDeleteHelper_(nullptr),
104 ranAcquireWithoutException_(
false) {}
164 timesRun_.fetch_add(1, std::memory_order_relaxed);
176 group.run([successTask]() {
185 std::vector<ProductResolverIndexAndSkipBit>
items;
190 bool skipCurrentProcess =
item.skipCurrentProcess();
196 choiceHolder.doneWaiting(std::exception_ptr{});
214 for (
size_t i = 0;
i !=
items.size(); ++
i) {
230 bool skipCurrentProcess =
item.skipCurrentProcess();
241 size_t iTransformIndex,
251 std::exception_ptr
const* iExcept)
mutable {
281 std::ostringstream ost;
294 ModuleEndJobSignalSentry cpp(
actReg_.get(), *
desc);
299 std::ostringstream ost;
322 std::ostringstream ost;
346 std::ostringstream ost;
387 timesRun_.fetch_add(1, std::memory_order_relaxed);
398 std::exception_ptr exceptionPtr;
401 exceptionPtr = iEPtr;
407 runAcquire(eventTransitionInfo, parentContext, holder);
410 exceptionPtr = std::current_exception();
424 return std::current_exception();
431 oneapi::tbb::task_group*
group,
434 : m_worker(worker), m_runModuleTask(runModuleTask), m_group(
group), m_parentContext(parentContext) {}
437 auto excptr = exceptionPtr();
440 holder.
doneWaiting(m_worker->handleExternalWorkException(excptr, m_parentContext));
ModuleCallingContext const * previousModuleOnThread() const
virtual void implDoAcquire(EventTransitionInfo const &, ModuleCallingContext const *, WaitingTaskWithArenaHolder &)=0
void resetModuleDescription(ModuleDescription const *)
void setTimestamp(Timestamp const &v)
void setState(State state)
std::atomic< int > numberOfPathsLeftToRun_
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
unsigned int ProductResolverIndex
void setEarlyDeleteHelper(EarlyDeleteHelper *iHelper)
void prefetchAsync(WaitingTaskHolder iTask, ESProxyIndex iProxyIndex, EventSetupImpl const *, ServiceToken const &, ESParentContext) const
prefetch the data to setup for subsequent calls to getImplementation
std::shared_ptr< ActivityRegistry > actReg_
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom(BranchType) const =0
std::string const & moduleName() const
void prefetchAsync(WaitingTaskHolder waitTask, ProductResolverIndex index, bool skipCurrentProcess, ServiceToken const &token, ModuleCallingContext const *mcc) const
void runAcquireAfterAsyncPrefetch(std::exception_ptr, EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder)
std::string const & category() const
std::exception_ptr handleExternalWorkException(std::exception_ptr iEPtr, ParentContext const &parentContext)
void beginStream(StreamID id, StreamContext &streamContext)
ExceptionToActionTable const * actions_
virtual std::vector< ESProxyIndex > const & esItemsToGetFrom(Transition) const =0
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
bool ranAcquireWithoutException_
void setActivityRegistry(std::shared_ptr< ActivityRegistry > areg)
ModuleCallingContext moduleCallingContext_
ModuleCallingContext const * getTopModuleCallingContext() const
void setTransition(Transition v)
void prePrefetchSelectionAsync(oneapi::tbb::task_group &, WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *)
std::exception_ptr cached_exception_
virtual bool implDoPrePrefetchSelection(StreamID, EventPrincipal const &, ModuleCallingContext const *)=0
virtual std::vector< ESRecordIndex > const & esRecordsToGetFrom(Transition) const =0
static RunIndex invalidRunIndex()
oneapi::tbb::task_group * group() const noexcept
void doTransformAsync(WaitingTaskHolder, size_t iTransformIndex, EventPrincipal const &, ServiceToken const &, StreamID, ModuleCallingContext const &, StreamContext const *)
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
PlaceInPathContext const * placeInPathContext() const
void doneWaiting(std::exception_ptr iExcept)
edm::WaitingTaskList waitingTasks_
void doneWaiting(std::exception_ptr iExcept)
virtual void implDoTransformAsync(WaitingTaskHolder, size_t iTransformIndex, EventPrincipal const &, ParentContext const &, ServiceWeakToken const &)=0
virtual void implRegisterThinnedAssociations(ProductRegistry const &, ThinnedAssociationsHelper &)=0
void setLuminosityBlockIndex(LuminosityBlockIndex const &v)
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent) const
void registerThinnedAssociations(ProductRegistry const ®istry, ThinnedAssociationsHelper &helper)
Worker(ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
virtual void itemsToGetForSelection(std::vector< ProductResolverIndexAndSkipBit > &) const =0
FunctorWaitingTask< F > * make_waiting_task(F f)
std::atomic< State > state_
void esPrefetchAsync(WaitingTaskHolder, EventSetupImpl const &, Transition, ServiceToken const &)
virtual void implEndJob()=0
static LuminosityBlockIndex invalidLuminosityBlockIndex()
ServiceToken lock() const
virtual size_t transformIndex(edm::BranchDescription const &) const =0
HandleExternalWorkExceptionTask(Worker *worker, oneapi::tbb::task_group *group, WaitingTask *runModuleTask, ParentContext const &parentContext)
ModuleDescription const * description() const
exception_actions::ActionCodes find(const std::string &category) const
virtual void implBeginJob()=0
void edPrefetchAsync(WaitingTaskHolder, ServiceToken const &, Principal const &) const
void skipOnPath(EventPrincipal const &iEvent)
void addContext(std::string const &context)
virtual void implBeginStream(StreamID)=0
virtual void implEndStream(StreamID)=0
void setEventID(EventID const &v)
void runAcquire(EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder &)
void endStream(StreamID id, StreamContext &streamContext)
void postDoEvent(EventPrincipal const &)
PathContext const * pathContext() const
auto wrap(F iFunc) -> decltype(iFunc())
unsigned int decrement_ref_count()
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
std::atomic< int > timesRun_
eventsetup::EventSetupRecordImpl const * findImpl(const eventsetup::EventSetupRecordKey &) const
std::string const & moduleLabel() const
void setRunIndex(RunIndex const &v)
void increment_ref_count()
BranchType const & branchType() const
ParentContext const & parent() const
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
virtual ProductResolverIndex itemToGetForTransform(size_t iTransformIndex) const =0