|
|
Go to the documentation of this file.
13 class ModuleBeginJobSignalSentry {
15 ModuleBeginJobSignalSentry(
ActivityRegistry*
a, ModuleDescription
const& md) : a_(
a), md_(&md) {
17 a_->preModuleBeginJobSignal_(*md_);
19 ~ModuleBeginJobSignalSentry() {
21 a_->postModuleBeginJobSignal_(*md_);
26 ModuleDescription
const* md_;
29 class ModuleEndJobSignalSentry {
31 ModuleEndJobSignalSentry(
ActivityRegistry*
a, ModuleDescription
const& md) : a_(
a), md_(&md) {
33 a_->preModuleEndJobSignal_(*md_);
35 ~ModuleEndJobSignalSentry() {
37 a_->postModuleEndJobSignal_(*md_);
42 ModuleDescription
const* md_;
45 class ModuleBeginStreamSignalSentry {
47 ModuleBeginStreamSignalSentry(
ActivityRegistry*
a, StreamContext
const& sc, ModuleCallingContext
const& mcc)
48 : a_(
a), sc_(sc), mcc_(mcc) {
50 a_->preModuleBeginStreamSignal_(sc_, mcc_);
52 ~ModuleBeginStreamSignalSentry() {
54 a_->postModuleBeginStreamSignal_(sc_, mcc_);
59 StreamContext
const& sc_;
60 ModuleCallingContext
const& mcc_;
63 class ModuleEndStreamSignalSentry {
65 ModuleEndStreamSignalSentry(
ActivityRegistry*
a, StreamContext
const& sc, ModuleCallingContext
const& mcc)
66 : a_(
a), sc_(sc), mcc_(mcc) {
68 a_->preModuleEndStreamSignal_(sc_, mcc_);
70 ~ModuleEndStreamSignalSentry() {
72 a_->postModuleEndStreamSignal_(sc_, mcc_);
77 StreamContext
const& sc_;
78 ModuleCallingContext
const& mcc_;
91 numberOfPathsLeftToRun_(0),
92 moduleCallingContext_(&iMD),
96 earlyDeleteHelper_(nullptr),
98 ranAcquireWithoutException_(
false) {}
107 std::ostringstream iost;
109 iost <<
"Prefetching for module ";
111 iost <<
"Calling method for module ";
116 iost <<
" (probably inside some kind of mixing module)";
123 std::ostringstream ost;
125 ost <<
"Prefetching for module ";
127 ost <<
"Calling method for module ";
134 ost <<
"Running path '";
213 iTask->increment_ref_count();
216 bool skipCurrentProcess =
item.skipCurrentProcess();
226 if (0 == iTask->decrement_ref_count()) {
228 tbb::task::spawn(*iTask);
236 successTask->increment_ref_count();
239 tbb::task::allocate_root(), [
id, successTask, iPrincipal,
this,
token](std::exception_ptr
const*) {
244 timesRun_.fetch_add(1, std::memory_order_relaxed);
248 if (0 == successTask->decrement_ref_count()) {
249 tbb::task::destroy(*successTask);
255 if (0 == successTask->decrement_ref_count()) {
256 tbb::task::spawn(*successTask);
262 std::vector<ProductResolverIndexAndSkipBit>
items;
267 bool skipCurrentProcess =
item.skipCurrentProcess();
272 choiceHolder.doneWaiting(std::exception_ptr{});
293 std::ostringstream ost;
308 std::ostringstream ost;
331 std::ostringstream ost;
355 std::ostringstream ost;
389 timesRun_.fetch_add(1, std::memory_order_relaxed);
401 std::exception_ptr exceptionPtr;
406 exceptionPtr = *iEPtr;
415 exceptionPtr = std::current_exception();
430 return std::current_exception();
439 : m_worker(worker), m_runModuleTask(runModuleTask), m_parentContext(parentContext) {}
442 auto excptr = exceptionPtr();
445 m_runModuleTask->set_ref_count(1);
447 holder.
doneWaiting(m_worker->handleExternalWorkException(excptr, m_parentContext));
449 m_runModuleTask->set_ref_count(0);
451 return m_runModuleTask;
tbb::task * execute() override
std::string const & moduleLabel() const
virtual void itemsToGetForSelection(std::vector< ProductResolverIndexAndSkipBit > &) const =0
HandleExternalWorkExceptionTask(Worker *worker, WaitingTask *runModuleTask, ParentContext const &parentContext)
ModuleCallingContext const * moduleCallingContext() const
void exceptionContext(std::ostream &, GlobalContext const &)
unsigned int ProductResolverIndex
void prefetchAsync(WaitingTask *, ServiceToken const &, ParentContext const &parentContext, Principal const &)
void addContext(std::string const &context)
StreamContext const * streamContext() const
std::shared_ptr< ActivityRegistry > actReg_
virtual void implEndJob()=0
void setTransition(Transition v)
void runAcquire(EventPrincipal const &ep, EventSetupImpl const &es, ParentContext const &parentContext, WaitingTaskWithArenaHolder &holder)
PlaceInPathContext const * placeInPathContext() const
GlobalContext const * globalContext() const
virtual void preActionBeforeRunEventAsync(WaitingTask *iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const =0
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
void resetModuleDescription(ModuleDescription const *)
std::string const & moduleName() const
void postDoEvent(EventPrincipal const &)
ExceptionToActionTable const * actions_
void setTimestamp(Timestamp const &v)
StreamContext const * streamContext() const
virtual void implBeginJob()=0
PathContext const * pathContext() const
void setLuminosityBlockIndex(LuminosityBlockIndex const &v)
Worker(ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
std::atomic< int > numberOfPathsLeftToRun_
ModuleDescription const * moduleDescription() const
void doneWaiting(std::exception_ptr iExcept)
virtual void implEndStream(StreamID)=0
ParentContext const & parent() const
ModuleCallingContext const * moduleCallingContext() const
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
virtual bool implDoPrePrefetchSelection(StreamID id, EventPrincipal const &ep, ModuleCallingContext const *mcc)=0
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
void setActivityRegistry(std::shared_ptr< ActivityRegistry > areg)
std::atomic< int > timesRun_
InternalContext const * internalContext() const
auto wrap(F iFunc) -> decltype(iFunc())
std::atomic< State > state_
BranchType const & branchType() const
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
void prefetchAsync(WaitingTask *waitTask, ProductResolverIndex index, bool skipCurrentProcess, ServiceToken const &token, ModuleCallingContext const *mcc) const
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
edm::WaitingTaskList waitingTasks_
static LuminosityBlockIndex invalidLuminosityBlockIndex()
ModuleCallingContext const * previousModuleOnThread() const
StreamContext const * getStreamContext() const
void setState(State state)
void prePrefetchSelectionAsync(WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *)
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
void setEarlyDeleteHelper(EarlyDeleteHelper *iHelper)
bool ranAcquireWithoutException_
static void exceptionContext(cms::Exception &ex, ModuleCallingContext const *mcc)
void setEventID(EventID const &v)
ModuleCallingContext moduleCallingContext_
ModuleDescription const & description() const
std::exception_ptr cached_exception_
static RunIndex invalidRunIndex()
ModuleCallingContext const * getTopModuleCallingContext() const
std::string const & pathName() const
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom(BranchType) const =0
void setRunIndex(RunIndex const &v)
void skipOnPath(EventPrincipal const &iEvent)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
virtual void implDoAcquire(EventPrincipal const &, EventSetupImpl const &c, ModuleCallingContext const *mcc, WaitingTaskWithArenaHolder &holder)=0
std::exception_ptr handleExternalWorkException(std::exception_ptr const *iEPtr, ParentContext const &parentContext)
void runAcquireAfterAsyncPrefetch(std::exception_ptr const *iEPtr, EventPrincipal const &ep, EventSetupImpl const &es, ParentContext const &parentContext, WaitingTaskWithArenaHolder holder)
exception_actions::ActionCodes find(const std::string &category) const
virtual void implBeginStream(StreamID)=0
std::string const & category() const
void beginStream(StreamID id, StreamContext &streamContext)
void endStream(StreamID id, StreamContext &streamContext)
void doneWaiting(std::exception_ptr iExcept)