|
|
Go to the documentation of this file.
17 #include "tbb/global_control.h"
21 class ModuleBeginJobSignalSentry {
23 ModuleBeginJobSignalSentry(
ActivityRegistry*
a, ModuleDescription
const& md) : a_(
a), md_(&md) {
25 a_->preModuleBeginJobSignal_(*md_);
27 ~ModuleBeginJobSignalSentry() {
29 a_->postModuleBeginJobSignal_(*md_);
34 ModuleDescription
const* md_;
37 class ModuleEndJobSignalSentry {
39 ModuleEndJobSignalSentry(
ActivityRegistry*
a, ModuleDescription
const& md) : a_(
a), md_(&md) {
41 a_->preModuleEndJobSignal_(*md_);
43 ~ModuleEndJobSignalSentry() {
45 a_->postModuleEndJobSignal_(*md_);
50 ModuleDescription
const* md_;
53 class ModuleBeginStreamSignalSentry {
55 ModuleBeginStreamSignalSentry(
ActivityRegistry*
a, StreamContext
const& sc, ModuleCallingContext
const& mcc)
56 : a_(
a), sc_(sc), mcc_(mcc) {
58 a_->preModuleBeginStreamSignal_(sc_, mcc_);
60 ~ModuleBeginStreamSignalSentry() {
62 a_->postModuleBeginStreamSignal_(sc_, mcc_);
67 StreamContext
const& sc_;
68 ModuleCallingContext
const& mcc_;
71 class ModuleEndStreamSignalSentry {
73 ModuleEndStreamSignalSentry(
ActivityRegistry*
a, StreamContext
const& sc, ModuleCallingContext
const& mcc)
74 : a_(
a), sc_(sc), mcc_(mcc) {
76 a_->preModuleEndStreamSignal_(sc_, mcc_);
78 ~ModuleEndStreamSignalSentry() {
80 a_->postModuleEndStreamSignal_(sc_, mcc_);
85 StreamContext
const& sc_;
86 ModuleCallingContext
const& mcc_;
99 numberOfPathsLeftToRun_(0),
100 moduleCallingContext_(&iMD),
104 earlyDeleteHelper_(nullptr),
106 ranAcquireWithoutException_(
false) {}
115 std::ostringstream iost;
117 iost <<
"Prefetching for module ";
119 iost <<
"Calling method for module ";
124 iost <<
" (probably inside some kind of mixing module)";
131 std::ostringstream ost;
133 ost <<
"Prefetching for module ";
135 ost <<
"Calling method for module ";
142 ost <<
"Running path '";
218 timesRun_.fetch_add(1, std::memory_order_relaxed);
230 group.run([successTask]() {
239 std::vector<ProductResolverIndexAndSkipBit>
items;
244 bool skipCurrentProcess =
item.skipCurrentProcess();
250 choiceHolder.doneWaiting(std::exception_ptr{});
273 if UNLIKELY (tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism) == 1) {
274 auto taskGroup = iTask.
group();
276 taskGroup->run([
this,
task =
std::move(iTask), iTrans, &iImpl, weakToken]() {
277 std::exception_ptr exceptPtr{};
278 iImpl.
taskArena()->execute([
this, iTrans, &iImpl, weakToken, &exceptPtr]() {
284 for (
size_t i = 0;
i !=
items.size(); ++
i) {
295 auto tempTask =
task;
296 tempTask.doneWaiting(exceptPtr);
304 holder.doneWaiting(*iExcept);
306 holder.doneWaiting(std::exception_ptr{});
312 for (
size_t i = 0;
i !=
items.size(); ++
i) {
330 bool skipCurrentProcess =
item.skipCurrentProcess();
355 std::ostringstream ost;
368 ModuleEndJobSignalSentry cpp(
actReg_.get(), *
desc);
373 std::ostringstream ost;
396 std::ostringstream ost;
420 std::ostringstream ost;
461 timesRun_.fetch_add(1, std::memory_order_relaxed);
472 std::exception_ptr exceptionPtr;
476 exceptionPtr = *iEPtr;
482 runAcquire(eventTransitionInfo, parentContext, holder);
485 exceptionPtr = std::current_exception();
500 return std::current_exception();
507 tbb::task_group*
group,
510 : m_worker(worker), m_runModuleTask(runModuleTask), m_group(
group), m_parentContext(parentContext) {}
513 auto excptr = exceptionPtr();
516 holder.
doneWaiting(m_worker->handleExternalWorkException(excptr, m_parentContext));
std::string const & moduleLabel() const
virtual void itemsToGetForSelection(std::vector< ProductResolverIndexAndSkipBit > &) const =0
eventsetup::EventSetupRecordImpl const * findImpl(const eventsetup::EventSetupRecordKey &) const
virtual bool implDoPrePrefetchSelection(StreamID, EventPrincipal const &, ModuleCallingContext const *)=0
ModuleCallingContext const * moduleCallingContext() const
void exceptionContext(std::ostream &, GlobalContext const &)
unsigned int ProductResolverIndex
void addContext(std::string const &context)
StreamContext const * streamContext() const
std::shared_ptr< ActivityRegistry > actReg_
virtual void implEndJob()=0
void setTransition(Transition v)
PlaceInPathContext const * placeInPathContext() const
GlobalContext const * globalContext() const
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
void resetModuleDescription(ModuleDescription const *)
std::string const & moduleName() const
void postDoEvent(EventPrincipal const &)
ExceptionToActionTable const * actions_
void setTimestamp(Timestamp const &v)
StreamContext const * streamContext() const
virtual void implBeginJob()=0
PathContext const * pathContext() const
HandleExternalWorkExceptionTask(Worker *worker, tbb::task_group *group, WaitingTask *runModuleTask, ParentContext const &parentContext)
void setLuminosityBlockIndex(LuminosityBlockIndex const &v)
Worker(ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
std::atomic< int > numberOfPathsLeftToRun_
void registerThinnedAssociations(ProductRegistry const ®istry, ThinnedAssociationsHelper &helper)
ModuleDescription const * moduleDescription() const
void doneWaiting(std::exception_ptr iExcept)
void prefetchAsync(WaitingTaskHolder iTask, ESProxyIndex iProxyIndex, EventSetupImpl const *, ServiceToken const &, ESParentContext) const
prefetch the data to setup for subsequent calls to getImplementation
virtual void implEndStream(StreamID)=0
ParentContext const & parent() const
ModuleCallingContext const * moduleCallingContext() const
ModuleDescription const * description() const
void runAcquireAfterAsyncPrefetch(std::exception_ptr const *, EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder)
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
void setActivityRegistry(std::shared_ptr< ActivityRegistry > areg)
std::atomic< int > timesRun_
InternalContext const * internalContext() const
auto wrap(F iFunc) -> decltype(iFunc())
std::atomic< State > state_
BranchType const & branchType() const
tbb::task_arena * taskArena() const
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
void runAcquire(EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder &)
virtual void implDoAcquire(EventTransitionInfo const &, ModuleCallingContext const *, WaitingTaskWithArenaHolder &)=0
edm::WaitingTaskList waitingTasks_
FunctorWaitingTask< F > * make_waiting_task(F f)
static LuminosityBlockIndex invalidLuminosityBlockIndex()
ModuleCallingContext const * previousModuleOnThread() const
virtual void implRegisterThinnedAssociations(ProductRegistry const &, ThinnedAssociationsHelper &)=0
void setState(State state)
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
void setEarlyDeleteHelper(EarlyDeleteHelper *iHelper)
bool ranAcquireWithoutException_
static void exceptionContext(cms::Exception &ex, ModuleCallingContext const *mcc)
void setEventID(EventID const &v)
ModuleCallingContext moduleCallingContext_
ServiceToken lock() const
void increment_ref_count()
void esPrefetchAsync(WaitingTaskHolder, EventSetupImpl const &, Transition, ServiceToken const &)
std::exception_ptr cached_exception_
static RunIndex invalidRunIndex()
ModuleCallingContext const * getTopModuleCallingContext() const
std::string const & pathName() const
virtual std::vector< ESRecordIndex > const & esRecordsToGetFrom(Transition) const =0
unsigned int decrement_ref_count()
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom(BranchType) const =0
void setRunIndex(RunIndex const &v)
void skipOnPath(EventPrincipal const &iEvent)
void edPrefetchAsync(WaitingTaskHolder, ServiceToken const &, Principal const &) const
virtual std::vector< ESProxyIndex > const & esItemsToGetFrom(Transition) const =0
void prePrefetchSelectionAsync(tbb::task_group &, WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
std::exception_ptr handleExternalWorkException(std::exception_ptr const *iEPtr, ParentContext const &parentContext)
exception_actions::ActionCodes find(const std::string &category) const
virtual void implBeginStream(StreamID)=0
std::exception_ptr syncWait(F &&iFunc)
void prefetchAsync(WaitingTaskHolder waitTask, ProductResolverIndex index, bool skipCurrentProcess, ServiceToken const &token, ModuleCallingContext const *mcc) const
std::string const & category() const
void beginStream(StreamID id, StreamContext &streamContext)
tbb::task_group * group() const noexcept
void endStream(StreamID id, StreamContext &streamContext)
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent) const
void doneWaiting(std::exception_ptr iExcept)