|
|
Go to the documentation of this file. 1 #ifndef FWCore_Framework_Worker_h
2 #define FWCore_Framework_Worker_h
63 #include <unordered_map>
68 class ProductResolverIndexHelper;
69 class ProductResolverIndexAndSkipBit;
72 class ProductRegistry;
73 class ThinnedAssociationsHelper;
75 namespace workerhelper {
79 namespace eventsetup {
129 template <
typename T>
130 bool doWork(
typename T::MyPrincipal
const&,
134 typename T::Context
const* context);
142 template <
typename T>
144 typename T::MyPrincipal
const&,
149 typename T::Context
const* context);
151 template <
typename T>
153 typename T::MyPrincipal
const&,
158 typename T::Context
const* context);
160 template <
typename T>
165 typename T::Context
const* context);
203 std::unordered_multimap<
std::string, std::tuple<TypeID const*, const char*, edm::ProductResolverIndex>>
const&
207 std::vector<ModuleDescription const*>&
modules,
209 std::map<std::string, ModuleDescription const*>
const& labelsToDesc)
const = 0;
213 virtual std::vector<ConsumesInfo>
consumesInfo()
const = 0;
218 timesRun_.store(0, std::memory_order_release);
239 template <
typename O>
287 template <
typename T>
288 bool runModule(
typename T::MyPrincipal
const&,
292 typename T::Context
const* context);
325 template <
typename T>
330 std::ostringstream iost;
344 template <
bool IS_EVENT>
353 template <
bool IS_EVENT>
362 template <
bool IS_EVENT>
380 template <
typename T>
382 typename T::MyPrincipal
const&
ep,
386 typename T::Context
const* context);
401 template <
typename T>
405 typename T::MyPrincipal
const&
ep,
410 typename T::Context
const* context)
439 std::exception_ptr temp_excptr;
441 if constexpr (T::isEvent_) {
448 temp_excptr = std::current_exception();
450 excptr = &temp_excptr;
474 std::exception_ptr* ptr =
nullptr;
475 worker->template runModuleAfterAsyncPrefetch<T>(ptr, principal, es, streamID, parentContext, sContext);
480 gQueue->push([
queue, gQueue,
f]()
mutable {
509 template <
typename T,
typename DUMMY =
void>
513 typename T::MyPrincipal
const&
ep,
521 template <
typename DUMMY>
533 m_parentContext(parentContext),
535 m_serviceToken(
token) {}
543 std::exception_ptr temp_excptr;
548 m_worker->emitPostModuleEventPrefetchingSignal();
550 temp_excptr = std::current_exception();
552 excptr = &temp_excptr;
557 if (
auto queue = m_worker->serializeRunModule()) {
558 auto const& principal = m_principal;
560 queue.push([worker = m_worker,
563 parentContext = m_parentContext,
564 serviceToken = m_serviceToken,
565 holder = m_holder]() {
569 std::exception_ptr* ptr =
nullptr;
570 worker->runAcquireAfterAsyncPrefetch(ptr, principal, es, parentContext, holder);
576 m_worker->runAcquireAfterAsyncPrefetch(excptr, m_principal, m_es, m_parentContext,
std::move(m_holder));
628 template <
typename T>
629 class ModuleSignalSentry {
632 typename T::Context
const* context,
634 : a_(
a), context_(context), moduleCallingContext_(moduleCallingContext) {
636 T::preModuleSignal(a_, context, moduleCallingContext_);
639 ~ModuleSignalSentry() {
641 T::postModuleSignal(a_, context_, moduleCallingContext_);
646 typename T::Context
const* context_;
652 namespace workerhelper {
665 return iWorker->
implDo(
ep, es, mcc);
685 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
704 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
723 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
742 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
762 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
781 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
801 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
820 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
830 template <
typename T>
832 typename T::MyPrincipal
const&
ep,
837 typename T::Context
const* context) {
843 bool expected =
false;
844 bool workStarted =
workStarted_.compare_exchange_strong(expected,
true);
847 if constexpr (T::isEvent_) {
866 auto p = m_task.load();
868 tbb::task::destroy(*
p);
873 auto t = m_task.load();
874 m_task.store(
nullptr);
878 std::atomic<edm::WaitingTask*> m_task;
881 auto ownRunTask = std::make_shared<DestroyTask>(runTask);
884 [ownRunTask, parentContext, &
ep,
token,
this](std::exception_ptr
const*)
mutable {
892 if constexpr (T::isEvent_) {
896 moduleTask =
new (tbb::task::allocate_root())
905 template <
typename T>
907 typename T::MyPrincipal
const&
ep,
911 typename T::Context
const* context) {
912 std::exception_ptr exceptionPtr;
917 exceptionPtr = *iEPtr;
918 setException<T::isEvent_>(exceptionPtr);
920 setPassed<T::isEvent_>();
925 CMS_SA_ALLOW try { runModule<T>(
ep, es, streamID, parentContext, context); }
catch (...) {
926 exceptionPtr = std::current_exception();
933 template <
typename T>
935 typename T::MyPrincipal
const& principal,
940 typename T::Context
const* context) {
946 bool expected =
false;
947 auto workStarted =
workStarted_.compare_exchange_strong(expected,
true);
951 auto toDo = [
this, &principal, &es, streamID, parentContext, context, serviceToken]() {
952 std::exception_ptr exceptionPtr;
958 this->runModule<T>(principal, es, streamID, parentContext, context);
960 exceptionPtr = std::current_exception();
968 tbb::task::spawn(*taskToDo);
973 template <
typename T>
978 typename T::Context
const* context) {
979 if constexpr (T::isEvent_) {
996 bool expected =
false;
997 if (not
workStarted_.compare_exchange_strong(expected,
true)) {
1000 waitTask->increment_ref_count();
1004 waitTask->wait_for_all();
1027 if constexpr (T::isEvent_) {
1031 waitTask->set_ref_count(2);
1033 waitTask->decrement_ref_count();
1034 waitTask->wait_for_all();
1050 waitTask->set_ref_count(2);
1053 waitTask->decrement_ref_count();
1054 waitTask->wait_for_all();
1056 if (waitTask->exceptionPtr() !=
nullptr) {
1059 setException<T::isEvent_>(*waitTask->exceptionPtr());
1063 setPassed<T::isEvent_>();
1071 prefetchSentry.release();
1074 queue.pushAndWait([&]() {
1078 CMS_SA_ALLOW try { rc = runModule<T>(
ep, es, streamID, parentContext, context); }
catch (...) {
1083 CMS_SA_ALLOW try { rc = runModule<T>(
ep, es, streamID, parentContext, context); }
catch (...) {
1095 template <
typename T>
1100 typename T::Context
const* context) {
1106 if constexpr (T::isEvent_) {
1107 timesRun_.fetch_add(1, std::memory_order_relaxed);
1116 setPassed<T::isEvent_>();
1118 setFailed<T::isEvent_>();
1126 setException<T::isEvent_>(std::current_exception());
1129 rc = setPassed<T::isEvent_>();
1136 template <
typename T>
1141 typename T::Context
const* context) {
1143 std::exception_ptr
const* prefetchingException =
nullptr;
1144 return runModuleAfterAsyncPrefetch<T>(prefetchingException,
ep, es, streamID, parentContext, context);
EnableQueueGuard & operator=(EnableQueueGuard const &)=delete
virtual std::vector< ConsumesInfo > consumesInfo() const =0
tbb::task * execute() override
virtual void itemsToGetForSelection(std::vector< ProductResolverIndexAndSkipBit > &) const =0
HandleExternalWorkExceptionTask(Worker *worker, WaitingTask *runModuleTask, ParentContext const &parentContext)
OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > Arg
void prefetchAsync(WaitingTask *, ServiceToken const &, ParentContext const &parentContext, Principal const &)
virtual void itemsMayGet(BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
EventSetupImpl const & m_es
tbb::task * execute() override
OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > Arg
static SerialTaskQueue * enableGlobalQueue(Worker *)
std::shared_ptr< ActivityRegistry > actReg_
virtual bool wantsGlobalRuns() const =0
virtual void implEndJob()=0
virtual bool implDoEnd(RunPrincipal const &rp, EventSetupImpl const &c, ModuleCallingContext const *mcc)=0
virtual void convertCurrentProcessAlias(std::string const &processName)=0
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd > Arg
void runAcquire(EventPrincipal const &ep, EventSetupImpl const &es, ParentContext const &parentContext, WaitingTaskWithArenaHolder &holder)
void prePrefetchSelectionAsync(WaitingTask *task, ServiceToken const &, StreamID stream, void const *)
std::atomic< int > timesPassed_
static bool call(Worker *iWorker, StreamID id, RunPrincipal const &ep, EventSetupImpl const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
LimitedTaskQueue * limited_
virtual void preActionBeforeRunEventAsync(WaitingTask *iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const =0
bool resume()
Resumes processing if the queue was paused.
std::atomic< int > timesVisited_
void add(WaitingTask *)
Adds task to the waiting list.
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
virtual bool wantsGlobalLuminosityBlocks() const =0
ServiceToken m_serviceToken
void resetModuleDescription(ModuleDescription const *)
void postDoEvent(EventPrincipal const &)
ExceptionToActionTable const * actions_
virtual void implBeginJob()=0
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin > Arg
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
Worker(ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
void respondToOpenInputFile(FileBlock const &fb)
std::exception_ptr setException(std::exception_ptr iException)
virtual void itemsToGet(BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
std::atomic< int > numberOfPathsLeftToRun_
void registerThinnedAssociations(ProductRegistry const ®istry, ThinnedAssociationsHelper &helper)
ServiceToken m_serviceToken
tbb::task * execute() override
bool runModule(typename T::MyPrincipal const &, EventSetupImpl const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
ModuleDescription const * moduleDescription() const
const ParentContext m_parentContext
OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > Arg
static bool needToRunSelection(Worker const *iWorker)
virtual bool wantsStreamLuminosityBlocks() const =0
static bool needToRunSelection(Worker const *iWorker)
virtual void implEndStream(StreamID)=0
std::atomic< bool > workStarted_
virtual std::vector< ProductResolverIndex > const & itemsShouldPutInEvent() const =0
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin > Arg
static SerialTaskQueue * enableGlobalQueue(Worker *)
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
Worker & operator=(Worker const &)=delete
void reset()
Resets access to the resource so that added tasks will wait.
static uInt32 F(BLOWFISH_CTX *ctx, uInt32 x)
virtual void implRespondToCloseInputFile(FileBlock const &fb)=0
static SerialTaskQueue * enableGlobalQueue(Worker *)
static bool wantsTransition(Worker const *iWorker)
TransitionIDValue(T const &iP)
static bool call(Worker *iWorker, StreamID, LuminosityBlockPrincipal const &ep, EventSetupImpl const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
virtual bool implDoPrePrefetchSelection(StreamID id, EventPrincipal const &ep, ModuleCallingContext const *mcc)=0
void pushAndWait(T &&iAction)
synchronously pushes functor iAction into queue
std::exception_ptr const * exceptionPtr() const
Returns exception thrown by dependent task.
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
void setActivityRegistry(std::shared_ptr< ActivityRegistry > areg)
std::atomic< int > timesRun_
static bool needToRunSelection(Worker const *iWorker)
void push(T &&iAction)
asynchronously pushes functor iAction into queue
void callWhenDoneAsync(WaitingTask *task)
virtual void resolvePutIndicies(BranchType iBranchType, std::unordered_multimap< std::string, std::tuple< TypeID const *, const char *, edm::ProductResolverIndex >> const &iIndicies)=0
static bool wantsTransition(Worker const *iWorker)
static bool call(Worker *iWorker, StreamID, RunPrincipal const &ep, EventSetupImpl const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
auto wrap(F iFunc) -> decltype(iFunc())
static bool wantsTransition(Worker const *iWorker)
virtual void modulesWhoseProductsAreConsumed(std::vector< ModuleDescription const * > &modules, ProductRegistry const &preg, std::map< std::string, ModuleDescription const * > const &labelsToDesc) const =0
std::atomic< State > state_
std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr const *iEPtr, typename T::MyPrincipal const &ep, EventSetupImpl const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
void pushAndWait(T &&iAction)
synchronously pushes functor iAction into queue
EventSetupImpl const & m_es
EventPrincipal const & m_principal
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
ModuleDescription const * descPtr() const
const ParentContext m_parentContext
OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > Arg
static bool needToRunSelection(Worker const *iWorker)
static bool needToRunSelection(Worker const *iWorker)
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
static bool call(Worker *iWorker, StreamID, LuminosityBlockPrincipal const &ep, EventSetupImpl const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
static bool call(Worker *iWorker, StreamID id, LuminosityBlockPrincipal const &ep, EventSetupImpl const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
std::atomic< int > timesExcept_
static bool needToRunSelection(Worker const *iWorker)
const ParentContext m_parentContext
static bool needToRunSelection(Worker const *iWorker)
static bool wantsTransition(Worker const *iWorker)
virtual TaskQueueAdaptor serializeRunModule()=0
static bool wantsTransition(Worker const *iWorker)
static bool call(Worker *iWorker, StreamID id, LuminosityBlockPrincipal const &ep, EventSetupImpl const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
static SerialTaskQueue * enableGlobalQueue(Worker *iWorker)
edm::WaitingTaskList waitingTasks_
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd > Arg
virtual void implRegisterThinnedAssociations(ProductRegistry const &, ThinnedAssociationsHelper &)=0
ServiceToken presentToken() const
StreamContext const * getStreamContext() const
void doWorkAsync(WaitingTask *task, typename T::MyPrincipal const &, EventSetupImpl const &c, ServiceToken const &token, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
static SerialTaskQueue * pauseGlobalQueue(Worker *)
RunModuleTask(Worker *worker, typename T::MyPrincipal const &ep, EventSetupImpl const &es, ServiceToken const &token, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
WaitingTask * m_runModuleTask
void prePrefetchSelectionAsync(WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *)
static bool wantsTransition(Worker const *iWorker)
virtual std::string value() const =0
static SerialTaskQueue * enableGlobalQueue(Worker *)
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
void setEarlyDeleteHelper(EarlyDeleteHelper *iHelper)
virtual bool implDoBegin(RunPrincipal const &rp, EventSetupImpl const &c, ModuleCallingContext const *mcc)=0
bool ranAcquireWithoutException_
static void exceptionContext(cms::Exception &ex, ModuleCallingContext const *mcc)
AcquireTask(Worker *worker, EventPrincipal const &ep, EventSetupImpl const &es, ServiceToken const &token, ParentContext const &parentContext, WaitingTaskWithArenaHolder holder)
virtual void implRespondToOpenInputFile(FileBlock const &fb)=0
TaskQueueAdaptor()=default
ModuleCallingContext moduleCallingContext_
OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > Arg
void respondToCloseInputFile(FileBlock const &fb)
std::atomic< int > timesFailed_
static bool needToRunSelection(Worker const *iWorker)
ModuleDescription const & description() const
std::exception_ptr cached_exception_
static ServiceRegistry & instance()
virtual bool implDoStreamEnd(StreamID id, RunPrincipal const &rp, EventSetupImpl const &c, ModuleCallingContext const *mcc)=0
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
virtual Types moduleType() const =0
static bool call(Worker *iWorker, StreamID, RunPrincipal const &ep, EventSetupImpl const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
EnableQueueGuard(EnableQueueGuard &&iGuard)
virtual bool wantsStreamRuns() const =0
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom(BranchType) const =0
std::unique_ptr< T, impl::DeviceDeleter > unique_ptr
AcquireTask(Worker *worker, typename T::MyPrincipal const &ep, EventSetupImpl const &es, ServiceToken const &token, ParentContext const &parentContext, WaitingTaskWithArenaHolder holder)
virtual bool hasAcquire() const =0
virtual bool implDoStreamBegin(StreamID id, RunPrincipal const &rp, EventSetupImpl const &c, ModuleCallingContext const *mcc)=0
static bool wantsTransition(Worker const *iWorker)
virtual SerialTaskQueue * globalRunsQueue()=0
static SerialTaskQueue * enableGlobalQueue(Worker *)
static bool call(Worker *iWorker, StreamID, EventPrincipal const &ep, EventSetupImpl const &es, ActivityRegistry *, ModuleCallingContext const *mcc, Arg::Context const *)
static SerialTaskQueue * enableGlobalQueue(Worker *)
virtual void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const &)=0
TaskQueueAdaptor(SerialTaskQueueChain *iChain)
virtual bool hasAccumulator() const =0
void skipOnPath(EventPrincipal const &iEvent)
ActivityRegistry * activityRegistry()
EnableQueueGuard(SerialTaskQueue *iQueue)
static SerialTaskQueue * enableGlobalQueue(Worker *iWorker)
SerialTaskQueueChain * serial_
bool doWork(typename T::MyPrincipal const &, EventSetupImpl const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
virtual SerialTaskQueue * globalLuminosityBlocksQueue()=0
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
virtual void implDoAcquire(EventPrincipal const &, EventSetupImpl const &c, ModuleCallingContext const *mcc, WaitingTaskWithArenaHolder &holder)=0
void doWorkNoPrefetchingAsync(WaitingTask *task, typename T::MyPrincipal const &, EventSetupImpl const &c, ServiceToken const &token, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
virtual bool implDo(EventPrincipal const &, EventSetupImpl const &c, ModuleCallingContext const *mcc)=0
std::exception_ptr handleExternalWorkException(std::exception_ptr const *iEPtr, ParentContext const &parentContext)
void runAcquireAfterAsyncPrefetch(std::exception_ptr const *iEPtr, EventPrincipal const &ep, EventSetupImpl const &es, ParentContext const &parentContext, WaitingTaskWithArenaHolder holder)
virtual void implBeginStream(StreamID)=0
std::string value() const override
T::Context const * m_context
static bool call(Worker *iWorker, StreamID id, RunPrincipal const &ep, EventSetupImpl const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
void emitPostModuleEventPrefetchingSignal()
static SerialTaskQueue * enableGlobalQueue(Worker *)
std::exception_ptr runModuleDirectly(typename T::MyPrincipal const &ep, EventSetupImpl const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
tbb::task * execute() override
WaitingTaskWithArenaHolder m_holder
T::MyPrincipal const & m_principal
static bool needToRunSelection(Worker const *iWorker)
static bool wantsTransition(Worker const *iWorker)
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
virtual ~TransitionIDValueBase()
#define CMS_THREAD_GUARD(_var_)
static bool wantsTransition(Worker const *iWorker)
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
void beginStream(StreamID id, StreamContext &streamContext)
virtual bool implNeedToRunSelection() const =0
void push(T &&iAction)
asynchronously pushes functor iAction into queue
void endStream(StreamID id, StreamContext &streamContext)
virtual std::string workerType() const =0
TaskQueueAdaptor(LimitedTaskQueue *iLimited)