|
|
Go to the documentation of this file.
17 #include "tbb/global_control.h"
21 class ModuleBeginJobSignalSentry {
23 ModuleBeginJobSignalSentry(
ActivityRegistry*
a, ModuleDescription
const& md) : a_(
a), md_(&md) {
25 a_->preModuleBeginJobSignal_(*md_);
27 ~ModuleBeginJobSignalSentry() {
29 a_->postModuleBeginJobSignal_(*md_);
34 ModuleDescription
const* md_;
37 class ModuleEndJobSignalSentry {
39 ModuleEndJobSignalSentry(
ActivityRegistry*
a, ModuleDescription
const& md) : a_(
a), md_(&md) {
41 a_->preModuleEndJobSignal_(*md_);
43 ~ModuleEndJobSignalSentry() {
45 a_->postModuleEndJobSignal_(*md_);
50 ModuleDescription
const* md_;
53 class ModuleBeginStreamSignalSentry {
55 ModuleBeginStreamSignalSentry(
ActivityRegistry*
a, StreamContext
const& sc, ModuleCallingContext
const& mcc)
56 : a_(
a), sc_(sc), mcc_(mcc) {
58 a_->preModuleBeginStreamSignal_(sc_, mcc_);
60 ~ModuleBeginStreamSignalSentry() {
62 a_->postModuleBeginStreamSignal_(sc_, mcc_);
67 StreamContext
const& sc_;
68 ModuleCallingContext
const& mcc_;
71 class ModuleEndStreamSignalSentry {
73 ModuleEndStreamSignalSentry(
ActivityRegistry*
a, StreamContext
const& sc, ModuleCallingContext
const& mcc)
74 : a_(
a), sc_(sc), mcc_(mcc) {
76 a_->preModuleEndStreamSignal_(sc_, mcc_);
78 ~ModuleEndStreamSignalSentry() {
80 a_->postModuleEndStreamSignal_(sc_, mcc_);
85 StreamContext
const& sc_;
86 ModuleCallingContext
const& mcc_;
99 numberOfPathsLeftToRun_(0),
100 moduleCallingContext_(&iMD),
104 earlyDeleteHelper_(nullptr),
106 ranAcquireWithoutException_(
false) {}
115 std::ostringstream iost;
117 iost <<
"Prefetching for module ";
119 iost <<
"Calling method for module ";
124 iost <<
" (probably inside some kind of mixing module)";
131 std::ostringstream ost;
133 ost <<
"Prefetching for module ";
135 ost <<
"Calling method for module ";
142 ost <<
"Running path '";
208 successTask->increment_ref_count();
211 tbb::task::allocate_root(), [
id, successTask, iPrincipal,
this,
token](std::exception_ptr
const*) {
216 timesRun_.fetch_add(1, std::memory_order_relaxed);
220 if (0 == successTask->decrement_ref_count()) {
221 tbb::task::destroy(*successTask);
227 if (0 == successTask->decrement_ref_count()) {
228 tbb::task::spawn(*successTask);
234 std::vector<ProductResolverIndexAndSkipBit>
items;
239 bool skipCurrentProcess =
item.skipCurrentProcess();
244 choiceHolder.doneWaiting(std::exception_ptr{});
267 if UNLIKELY (tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism) == 1) {
269 tbb::task_arena edArena(tbb::task_arena::attach{});
277 waitTask->set_ref_count(2);
278 for (
size_t i = 0;
i !=
items.size(); ++
i) {
286 waitTask->decrement_ref_count();
287 waitTask->wait_for_all();
289 auto exPtr = waitTask->exceptionPtr();
290 tbb::task_arena(edArena).execute([
task, exPtr]() {
293 t.doneWaiting(*exPtr);
295 t.doneWaiting(std::exception_ptr{});
306 holder.doneWaiting(*iExcept);
308 holder.doneWaiting(std::exception_ptr{});
314 for (
size_t i = 0;
i !=
items.size(); ++
i) {
332 bool skipCurrentProcess =
item.skipCurrentProcess();
357 std::ostringstream ost;
372 std::ostringstream ost;
395 std::ostringstream ost;
419 std::ostringstream ost;
460 timesRun_.fetch_add(1, std::memory_order_relaxed);
471 std::exception_ptr exceptionPtr;
475 exceptionPtr = *iEPtr;
481 runAcquire(eventTransitionInfo, parentContext, holder);
484 exceptionPtr = std::current_exception();
499 return std::current_exception();
508 : m_worker(worker), m_runModuleTask(runModuleTask), m_parentContext(parentContext) {}
511 auto excptr = exceptionPtr();
514 m_runModuleTask->set_ref_count(1);
516 holder.
doneWaiting(m_worker->handleExternalWorkException(excptr, m_parentContext));
518 m_runModuleTask->set_ref_count(0);
520 return m_runModuleTask;
tbb::task * execute() override
std::string const & moduleLabel() const
virtual void itemsToGetForSelection(std::vector< ProductResolverIndexAndSkipBit > &) const =0
eventsetup::EventSetupRecordImpl const * findImpl(const eventsetup::EventSetupRecordKey &) const
virtual bool implDoPrePrefetchSelection(StreamID, EventPrincipal const &, ModuleCallingContext const *)=0
HandleExternalWorkExceptionTask(Worker *worker, WaitingTask *runModuleTask, ParentContext const &parentContext)
ModuleCallingContext const * moduleCallingContext() const
void exceptionContext(std::ostream &, GlobalContext const &)
unsigned int ProductResolverIndex
void esPrefetchAsync(WaitingTask *, EventSetupImpl const &, Transition, ServiceToken const &)
void addContext(std::string const &context)
StreamContext const * streamContext() const
std::shared_ptr< ActivityRegistry > actReg_
virtual void implEndJob()=0
void setTransition(Transition v)
PlaceInPathContext const * placeInPathContext() const
GlobalContext const * globalContext() const
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
void resetModuleDescription(ModuleDescription const *)
std::string const & moduleName() const
void postDoEvent(EventPrincipal const &)
ExceptionToActionTable const * actions_
void setTimestamp(Timestamp const &v)
StreamContext const * streamContext() const
virtual void implBeginJob()=0
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
PathContext const * pathContext() const
void setLuminosityBlockIndex(LuminosityBlockIndex const &v)
Worker(ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
std::atomic< int > numberOfPathsLeftToRun_
void registerThinnedAssociations(ProductRegistry const ®istry, ThinnedAssociationsHelper &helper)
ModuleDescription const * moduleDescription() const
tbb::task_arena & esTaskArena()
void doneWaiting(std::exception_ptr iExcept)
virtual void implEndStream(StreamID)=0
ParentContext const & parent() const
void prefetchAsync(WaitingTask *iTask, ESProxyIndex iProxyIndex, EventSetupImpl const *, ServiceToken const &) const
prefetch the data to setup for subsequent calls to getImplementation
ModuleCallingContext const * moduleCallingContext() const
void runAcquireAfterAsyncPrefetch(std::exception_ptr const *, EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder)
void edPrefetchAsync(WaitingTask *, ServiceToken const &, Principal const &) const
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
void setActivityRegistry(std::shared_ptr< ActivityRegistry > areg)
std::atomic< int > timesRun_
InternalContext const * internalContext() const
auto wrap(F iFunc) -> decltype(iFunc())
std::atomic< State > state_
BranchType const & branchType() const
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
void prefetchAsync(WaitingTask *waitTask, ProductResolverIndex index, bool skipCurrentProcess, ServiceToken const &token, ModuleCallingContext const *mcc) const
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
void runAcquire(EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder &)
virtual void implDoAcquire(EventTransitionInfo const &, ModuleCallingContext const *, WaitingTaskWithArenaHolder &)=0
edm::WaitingTaskList waitingTasks_
static LuminosityBlockIndex invalidLuminosityBlockIndex()
ModuleCallingContext const * previousModuleOnThread() const
virtual void implRegisterThinnedAssociations(ProductRegistry const &, ThinnedAssociationsHelper &)=0
void setState(State state)
void prePrefetchSelectionAsync(WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *)
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
void setEarlyDeleteHelper(EarlyDeleteHelper *iHelper)
bool ranAcquireWithoutException_
static void exceptionContext(cms::Exception &ex, ModuleCallingContext const *mcc)
void setEventID(EventID const &v)
ModuleCallingContext moduleCallingContext_
ModuleDescription const & description() const
std::exception_ptr cached_exception_
static RunIndex invalidRunIndex()
ModuleCallingContext const * getTopModuleCallingContext() const
std::string const & pathName() const
virtual std::vector< ESRecordIndex > const & esRecordsToGetFrom(Transition) const =0
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom(BranchType) const =0
void setRunIndex(RunIndex const &v)
void skipOnPath(EventPrincipal const &iEvent)
virtual std::vector< ESProxyIndex > const & esItemsToGetFrom(Transition) const =0
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
std::exception_ptr handleExternalWorkException(std::exception_ptr const *iEPtr, ParentContext const &parentContext)
exception_actions::ActionCodes find(const std::string &category) const
virtual void implBeginStream(StreamID)=0
std::string const & category() const
void beginStream(StreamID id, StreamContext &streamContext)
void endStream(StreamID id, StreamContext &streamContext)
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent) const
void doneWaiting(std::exception_ptr iExcept)