13 class ModuleBeginJobSignalSentry {
15 ModuleBeginJobSignalSentry(
ActivityRegistry*
a, ModuleDescription
const& md):a_(a), md_(&md) {
16 if(a_) a_->preModuleBeginJobSignal_(*md_);
18 ~ModuleBeginJobSignalSentry() {
19 if(a_) a_->postModuleBeginJobSignal_(*md_);
23 ModuleDescription
const* md_;
26 class ModuleEndJobSignalSentry {
28 ModuleEndJobSignalSentry(
ActivityRegistry* a, ModuleDescription
const& md):a_(a), md_(&md) {
29 if(a_) a_->preModuleEndJobSignal_(*md_);
31 ~ModuleEndJobSignalSentry() {
32 if(a_) a_->postModuleEndJobSignal_(*md_);
36 ModuleDescription
const* md_;
39 class ModuleBeginStreamSignalSentry {
42 StreamContext
const&
sc,
43 ModuleCallingContext
const& mcc) : a_(a), sc_(sc), mcc_(mcc) {
44 if(a_) a_->preModuleBeginStreamSignal_(sc_, mcc_);
46 ~ModuleBeginStreamSignalSentry() {
47 if(a_) a_->postModuleBeginStreamSignal_(sc_, mcc_);
51 StreamContext
const& sc_;
52 ModuleCallingContext
const& mcc_;
55 class ModuleEndStreamSignalSentry {
58 StreamContext
const& sc,
59 ModuleCallingContext
const& mcc) : a_(a), sc_(sc), mcc_(mcc) {
60 if(a_) a_->preModuleEndStreamSignal_(sc_, mcc_);
62 ~ModuleEndStreamSignalSentry() {
63 if(a_) a_->postModuleEndStreamSignal_(sc_, mcc_);
67 StreamContext
const& sc_;
68 ModuleCallingContext
const& mcc_;
82 numberOfPathsLeftToRun_(0),
83 moduleCallingContext_(&iMD),
89 ranAcquireWithoutException_(
false)
108 std::ostringstream iost;
110 iost <<
"Prefetching for module ";
112 iost <<
"Calling method for module ";
118 iost <<
" (probably inside some kind of mixing module)";
125 std::ostringstream ost;
127 ost <<
"Prefetching for module ";
129 ost <<
"Calling method for module ";
137 ost <<
"Running path '";
175 std::rethrow_exception(iPtr);
218 iTask->increment_ref_count();
219 for(
auto const& item : items) {
221 bool skipCurrentProcess = item.skipCurrentProcess();
231 if(0 == iTask->decrement_ref_count()) {
233 tbb::task::spawn(*iTask);
241 successTask->increment_ref_count();
244 [
id,successTask,iPrincipal,
this,token](std::exception_ptr
const*) {
248 timesRun_.fetch_add(1,std::memory_order_relaxed);
252 if ( 0 == successTask->decrement_ref_count() ) {
258 if(0 == successTask->decrement_ref_count()) {
259 tbb::task::spawn(*successTask);
265 std::vector<ProductResolverIndexAndSkipBit>
items;
268 for(
auto const& item : items) {
270 bool skipCurrentProcess = item.skipCurrentProcess();
275 choiceHolder.doneWaiting(std::exception_ptr{});
298 std::ostringstream ost;
314 std::ostringstream ost;
338 std::ostringstream ost;
362 std::ostringstream ost;
396 timesRun_.fetch_add(1,std::memory_order_relaxed);
408 std::exception_ptr exceptionPtr;
413 exceptionPtr = *iEPtr;
421 exceptionPtr = std::current_exception();
433 std::rethrow_exception(*iEPtr);
438 return std::current_exception();
449 m_runModuleTask(runModuleTask),
450 m_parentContext(parentContext) {
std::string const & pathName() const
GlobalContext const * globalContext() const
void resetModuleDescription(ModuleDescription const *)
void setTimestamp(Timestamp const &v)
ModuleDescription const & description() const
void setState(State state)
std::atomic< int > numberOfPathsLeftToRun_
unsigned int ProductResolverIndex
StreamContext const * getStreamContext() const
ModuleCallingContext const * moduleCallingContext() const
void setEarlyDeleteHelper(EarlyDeleteHelper *iHelper)
ModuleCallingContext const * getTopModuleCallingContext() const
static void exceptionContext(cms::Exception &ex, ModuleCallingContext const *mcc)
ParentContext const m_parentContext
InternalContext const * internalContext() const
WaitingTask * m_runModuleTask
std::shared_ptr< ActivityRegistry > actReg_
std::string const & moduleName() const
std::string const & category() const
exception_actions::ActionCodes find(const std::string &category) const
ModuleCallingContext const * moduleCallingContext() const
void beginStream(StreamID id, StreamContext &streamContext)
void exceptionContext(std::ostream &, GlobalContext const &)
ExceptionToActionTable const * actions_
std::string const & moduleLabel() const
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
bool ranAcquireWithoutException_
void setActivityRegistry(std::shared_ptr< ActivityRegistry > areg)
ParentContext const & parent() const
void prefetchAsync(WaitingTask *waitTask, ProductResolverIndex index, bool skipCurrentProcess, ServiceToken const &token, ModuleCallingContext const *mcc) const
ModuleCallingContext moduleCallingContext_
virtual void itemsToGetForSelection(std::vector< ProductResolverIndexAndSkipBit > &) const =0
void runAcquireAfterAsyncPrefetch(std::exception_ptr const *iEPtr, EventPrincipal const &ep, EventSetup const &es, ParentContext const &parentContext, WaitingTaskWithArenaHolder holder)
void setTransition(Transition v)
std::exception_ptr cached_exception_
static RunIndex invalidRunIndex()
tbb::task * execute() override
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
void doneWaiting(std::exception_ptr iExcept)
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
edm::WaitingTaskList waitingTasks_
void doneWaiting(std::exception_ptr iExcept)
HandleExternalWorkExceptionTask(Worker *worker, WaitingTask *runModuleTask, ParentContext const &parentContext)
void setLuminosityBlockIndex(LuminosityBlockIndex const &v)
BranchType const & branchType() const
PathContext const * pathContext() const
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
ModuleDescription const * moduleDescription() const
Worker(ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
std::atomic< State > state_
void runAcquire(EventPrincipal const &ep, EventSetup const &es, ParentContext const &parentContext, WaitingTaskWithArenaHolder &holder)
void prefetchAsync(WaitingTask *, ServiceToken const &, ParentContext const &parentContext, Principal const &)
virtual void implEndJob()=0
StreamContext const * streamContext() const
static LuminosityBlockIndex invalidLuminosityBlockIndex()
virtual bool implDoPrePrefetchSelection(StreamID id, EventPrincipal const &ep, ModuleCallingContext const *mcc)=0
void prePrefetchSelectionAsync(WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *)
virtual void implBeginJob()=0
virtual void implDoAcquire(EventPrincipal const &, EventSetup const &c, ModuleCallingContext const *mcc, WaitingTaskWithArenaHolder &holder)=0
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
void addContext(std::string const &context)
virtual void implBeginStream(StreamID)=0
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom(BranchType) const =0
virtual void implEndStream(StreamID)=0
StreamContext const * streamContext() const
void setEventID(EventID const &v)
ModuleCallingContext const * previousModuleOnThread() const
void endStream(StreamID id, StreamContext &streamContext)
void postDoEvent(EventPrincipal const &)
auto wrap(F iFunc) -> decltype(iFunc())
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
virtual void preActionBeforeRunEventAsync(WaitingTask *iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const =0
std::atomic< int > timesRun_
std::exception_ptr const * exceptionPtr() const
Returns exception thrown by dependent task.
PlaceInPathContext const * placeInPathContext() const
void setRunIndex(RunIndex const &v)
std::exception_ptr handleExternalWorkException(std::exception_ptr const *iEPtr, ParentContext const &parentContext)
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)