20 class ModuleBeginJobSignalSentry {
22 ModuleBeginJobSignalSentry(
ActivityRegistry*
a, ModuleDescription
const& md) : a_(
a), md_(&md) {
24 a_->preModuleBeginJobSignal_(*md_);
26 ~ModuleBeginJobSignalSentry() {
28 a_->postModuleBeginJobSignal_(*md_);
33 ModuleDescription
const* md_;
36 class ModuleEndJobSignalSentry {
38 ModuleEndJobSignalSentry(
ActivityRegistry*
a, ModuleDescription
const& md) : a_(
a), md_(&md) {
40 a_->preModuleEndJobSignal_(*md_);
42 ~ModuleEndJobSignalSentry() {
44 a_->postModuleEndJobSignal_(*md_);
49 ModuleDescription
const* md_;
52 class ModuleBeginStreamSignalSentry {
54 ModuleBeginStreamSignalSentry(
ActivityRegistry*
a, StreamContext
const& sc, ModuleCallingContext
const& mcc)
55 : a_(
a), sc_(sc), mcc_(mcc) {
57 a_->preModuleBeginStreamSignal_(sc_, mcc_);
59 ~ModuleBeginStreamSignalSentry() {
61 a_->postModuleBeginStreamSignal_(sc_, mcc_);
66 StreamContext
const& sc_;
67 ModuleCallingContext
const& mcc_;
70 class ModuleEndStreamSignalSentry {
72 ModuleEndStreamSignalSentry(
ActivityRegistry*
a, StreamContext
const& sc, ModuleCallingContext
const& mcc)
73 : a_(
a), sc_(sc), mcc_(mcc) {
75 a_->preModuleEndStreamSignal_(sc_, mcc_);
77 ~ModuleEndStreamSignalSentry() {
79 a_->postModuleEndStreamSignal_(sc_, mcc_);
84 StreamContext
const& sc_;
85 ModuleCallingContext
const& mcc_;
98 numberOfPathsLeftToRun_(0),
99 moduleCallingContext_(&iMD),
103 earlyDeleteHelper_(nullptr),
105 ranAcquireWithoutException_(
false) {
115 if (
pset and
pset->exists(
"@shouldTryToContinue")) {
123 bool shouldTryToContinue)
const {
142 if (shouldTryToContinue) {
145 return not shouldTryToContinue;
169 timesRun_.fetch_add(1, std::memory_order_relaxed);
181 group.run([successTask]() {
190 std::vector<ProductResolverIndexAndSkipBit>
items;
195 bool skipCurrentProcess =
item.skipCurrentProcess();
201 choiceHolder.doneWaiting(std::exception_ptr{});
219 for (
size_t i = 0;
i !=
items.size(); ++
i) {
235 bool skipCurrentProcess =
item.skipCurrentProcess();
246 size_t iTransformIndex,
256 [
this, iTask, weakToken, &iPrincipal, iTransformIndex, mcc](std::exception_ptr
const* iExcept)
mutable {
291 std::ostringstream ost;
304 ModuleEndJobSignalSentry cpp(
actReg_.get(), *
desc);
309 std::ostringstream ost;
332 std::ostringstream ost;
356 std::ostringstream ost;
397 timesRun_.fetch_add(1, std::memory_order_relaxed);
408 std::exception_ptr exceptionPtr;
411 exceptionPtr = iEPtr;
417 runAcquire(eventTransitionInfo, parentContext, holder);
420 exceptionPtr = std::current_exception();
434 return std::current_exception();
441 oneapi::tbb::task_group*
group,
444 : m_worker(worker), m_runModuleTask(runModuleTask), m_group(
group), m_parentContext(parentContext) {}
447 auto excptr = exceptionPtr();
450 holder.
doneWaiting(m_worker->handleExternalWorkException(excptr, m_parentContext));
ModuleCallingContext const * previousModuleOnThread() const
virtual void implDoAcquire(EventTransitionInfo const &, ModuleCallingContext const *, WaitingTaskWithArenaHolder &)=0
void resetModuleDescription(ModuleDescription const *)
void setTimestamp(Timestamp const &v)
void setState(State state)
std::atomic< int > numberOfPathsLeftToRun_
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
bool getMapped(key_type const &k, value_type &result) const
unsigned int ProductResolverIndex
void setEarlyDeleteHelper(EarlyDeleteHelper *iHelper)
void prefetchAsync(WaitingTaskHolder iTask, ESResolverIndex iResolverIndex, EventSetupImpl const *, ServiceToken const &, ESParentContext) const
prefetch the data to setup for subsequent calls to getImplementation
std::shared_ptr< ActivityRegistry > actReg_
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom(BranchType) const =0
std::string const & moduleName() const
void prefetchAsync(WaitingTaskHolder waitTask, ProductResolverIndex index, bool skipCurrentProcess, ServiceToken const &token, ModuleCallingContext const *mcc) const
void runAcquireAfterAsyncPrefetch(std::exception_ptr, EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder)
std::string const & category() const
std::exception_ptr handleExternalWorkException(std::exception_ptr iEPtr, ParentContext const &parentContext)
void beginStream(StreamID id, StreamContext &streamContext)
ExceptionToActionTable const * actions_
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
bool ranAcquireWithoutException_
void setActivityRegistry(std::shared_ptr< ActivityRegistry > areg)
ModuleCallingContext moduleCallingContext_
void setTransition(Transition v)
void prePrefetchSelectionAsync(oneapi::tbb::task_group &, WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *)
std::exception_ptr cached_exception_
virtual bool implDoPrePrefetchSelection(StreamID, EventPrincipal const &, ModuleCallingContext const *)=0
virtual std::vector< ESRecordIndex > const & esRecordsToGetFrom(Transition) const =0
static RunIndex invalidRunIndex()
oneapi::tbb::task_group * group() const noexcept
void doTransformAsync(WaitingTaskHolder, size_t iTransformIndex, EventPrincipal const &, ServiceToken const &, StreamID, ModuleCallingContext const &, StreamContext const *)
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
ParameterSetID const & parameterSetID() const
void doneWaiting(std::exception_ptr iExcept)
edm::WaitingTaskList waitingTasks_
void doneWaiting(std::exception_ptr iExcept)
virtual void implDoTransformAsync(WaitingTaskHolder, size_t iTransformIndex, EventPrincipal const &, ParentContext const &, ServiceWeakToken const &)=0
virtual void implRegisterThinnedAssociations(ProductRegistry const &, ThinnedAssociationsHelper &)=0
void setLuminosityBlockIndex(LuminosityBlockIndex const &v)
void registerThinnedAssociations(ProductRegistry const ®istry, ThinnedAssociationsHelper &helper)
Worker(ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
virtual void itemsToGetForSelection(std::vector< ProductResolverIndexAndSkipBit > &) const =0
FunctorWaitingTask< F > * make_waiting_task(F f)
std::atomic< State > state_
void esPrefetchAsync(WaitingTaskHolder, EventSetupImpl const &, Transition, ServiceToken const &)
virtual std::vector< ESResolverIndex > const & esItemsToGetFrom(Transition) const =0
virtual void implEndJob()=0
static LuminosityBlockIndex invalidLuminosityBlockIndex()
ServiceToken lock() const
virtual size_t transformIndex(edm::BranchDescription const &) const =0
HandleExternalWorkExceptionTask(Worker *worker, oneapi::tbb::task_group *group, WaitingTask *runModuleTask, ParentContext const &parentContext)
ModuleDescription const * description() const
exception_actions::ActionCodes find(const std::string &category) const
virtual void implBeginJob()=0
void edPrefetchAsync(WaitingTaskHolder, ServiceToken const &, Principal const &) const
void skipOnPath(EventPrincipal const &iEvent)
void addContext(std::string const &context)
virtual void implBeginStream(StreamID)=0
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, bool isTryToContinue) const
virtual void implEndStream(StreamID)=0
void setEventID(EventID const &v)
StreamContext const * getStreamContext() const
void checkForShouldTryToContinue(ModuleDescription const &)
void runAcquire(EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder &)
void endStream(StreamID id, StreamContext &streamContext)
void postDoEvent(EventPrincipal const &)
auto wrap(F iFunc) -> decltype(iFunc())
unsigned int decrement_ref_count()
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
std::atomic< int > timesRun_
eventsetup::EventSetupRecordImpl const * findImpl(const eventsetup::EventSetupRecordKey &) const
std::string const & moduleLabel() const
void setRunIndex(RunIndex const &v)
static Registry * instance()
void increment_ref_count()
BranchType const & branchType() const
bool shouldTryToContinue_
ParentContext const & parent() const
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
virtual ProductResolverIndex itemToGetForTransform(size_t iTransformIndex) const =0