|
|
Go to the documentation of this file. 1 #ifndef FWCore_Framework_Worker_h
2 #define FWCore_Framework_Worker_h
68 #include <unordered_map>
74 class ModuleProcessName;
75 class ProductResolverIndexHelper;
76 class ProductResolverIndexAndSkipBit;
79 class ProductRegistry;
80 class ThinnedAssociationsHelper;
82 namespace workerhelper {
86 namespace eventsetup {
105 void push(tbb::task_group& iG,
F&& iF) {
143 template <
typename T>
145 typename T::TransitionInfoType
const&,
149 typename T::Context
const*);
151 template <
typename T>
153 typename T::TransitionInfoType
const&,
157 typename T::Context
const*);
159 template <
typename T>
163 typename T::Context
const*);
203 std::unordered_multimap<
std::string, std::tuple<TypeID const*, const char*, edm::ProductResolverIndex>>
const&
208 std::vector<ModuleProcessName>& modulesInPreviousProcesses,
210 std::map<std::string, ModuleDescription const*>
const& labelsToDesc)
const = 0;
214 virtual std::vector<ConsumesInfo>
consumesInfo()
const = 0;
219 timesRun_.store(0, std::memory_order_release);
240 template <
typename O>
277 template <
typename T>
304 template <
bool IS_EVENT>
313 template <
bool IS_EVENT>
322 template <
bool IS_EVENT>
332 template <
typename T>
336 typename T::TransitionInfoType
const&,
352 template <
typename T>
354 typename T::TransitionInfoType
const&,
357 typename T::Context
const*);
368 template <
typename T>
372 typename T::TransitionInfoType
const& transitionInfo,
376 typename T::Context
const* context,
377 tbb::task_group* iGroup)
406 std::exception_ptr temp_excptr;
408 if constexpr (T::isEvent_) {
415 temp_excptr = std::current_exception();
417 excptr = &temp_excptr;
438 std::exception_ptr* ptr =
nullptr;
439 worker->template runModuleAfterAsyncPrefetch<T>(ptr,
info, streamID, parentContext, sContext);
472 template <
typename T,
typename DUMMY =
void>
476 typename T::TransitionInfoType
const&,
483 template <
typename DUMMY>
492 m_eventTransitionInfo(eventTransitionInfo),
493 m_parentContext(parentContext),
495 m_serviceToken(
token) {}
503 std::exception_ptr temp_excptr;
508 m_worker->emitPostModuleEventPrefetchingSignal();
510 temp_excptr = std::current_exception();
512 excptr = &temp_excptr;
517 if (
auto queue = m_worker->serializeRunModule()) {
518 queue.push(*m_holder.group(),
520 info = m_eventTransitionInfo,
521 parentContext = m_parentContext,
522 serviceToken = m_serviceToken,
523 holder = m_holder]() {
527 std::exception_ptr* ptr =
nullptr;
528 worker->runAcquireAfterAsyncPrefetch(ptr,
info, parentContext, holder);
534 m_worker->runAcquireAfterAsyncPrefetch(excptr, m_eventTransitionInfo, m_parentContext,
std::move(m_holder));
551 tbb::task_group*
group,
589 template <
typename T>
590 class ModuleSignalSentry {
593 typename T::Context
const* context,
595 : a_(
a), context_(context), moduleCallingContext_(moduleCallingContext) {
597 T::preModuleSignal(a_, context, moduleCallingContext_);
600 ~ModuleSignalSentry() {
602 T::postModuleSignal(a_, context_, moduleCallingContext_);
607 typename T::Context
const* context_;
613 namespace workerhelper {
651 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
676 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
701 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
726 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
752 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
777 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
803 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
828 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
853 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
873 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
893 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
905 template <
typename T>
909 typename T::TransitionInfoType
const& transitionInfo,
911 Principal const& principal = transitionInfo.principal();
927 template <
typename T>
929 typename T::TransitionInfoType
const& transitionInfo,
933 typename T::Context
const* context) {
939 bool expected =
false;
940 bool workStarted =
workStarted_.compare_exchange_strong(expected,
true);
943 if constexpr (T::isEvent_) {
962 auto p = m_task.exchange(
nullptr);
971 std::atomic<edm::WaitingTask*> m_task;
974 auto ownRunTask = std::make_shared<DestroyTask>(runTask);
978 std::exception_ptr
const*)
mutable {
991 if constexpr (T::isEvent_) {
1003 template <
typename T>
1005 typename T::TransitionInfoType
const& transitionInfo,
1008 typename T::Context
const* context) {
1009 std::exception_ptr exceptionPtr;
1013 exceptionPtr = *iEPtr;
1014 setException<T::isEvent_>(exceptionPtr);
1016 setPassed<T::isEvent_>();
1021 CMS_SA_ALLOW try { runModule<T>(transitionInfo, streamID, parentContext, context); }
catch (...) {
1022 exceptionPtr = std::current_exception();
1026 return exceptionPtr;
1029 template <
typename T>
1031 typename T::TransitionInfoType
const& transitionInfo,
1035 typename T::Context
const* context) {
1041 bool expected =
false;
1042 auto workStarted =
workStarted_.compare_exchange_strong(expected,
true);
1047 auto toDo = [
this,
info = transitionInfo, streamID, parentContext, context, weakToken]() {
1048 std::exception_ptr exceptionPtr;
1054 this->runModule<T>(
info, streamID, parentContext, context);
1056 exceptionPtr = std::current_exception();
1063 auto afterPrefetch =
1076 WaitingTaskHolder(*
group, afterPrefetch), transitionInfo.eventSetupImpl(), T::transition_, serviceToken);
1088 template <
typename T>
1092 typename T::Context
const* context) {
1098 if constexpr (T::isEvent_) {
1099 timesRun_.fetch_add(1, std::memory_order_relaxed);
1109 setPassed<T::isEvent_>();
1111 setFailed<T::isEvent_>();
1118 setException<T::isEvent_>(std::current_exception());
1121 rc = setPassed<T::isEvent_>();
1128 template <
typename T>
1132 typename T::Context
const* context) {
1134 std::exception_ptr
const* prefetchingException =
nullptr;
1135 return runModuleAfterAsyncPrefetch<T>(prefetchingException, transitionInfo, streamID, parentContext, context);
EnableQueueGuard & operator=(EnableQueueGuard const &)=delete
static bool wantsTransition(Worker const *iWorker)
virtual std::vector< ConsumesInfo > consumesInfo() const =0
static bool call(Worker *iWorker, StreamID id, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
virtual void itemsToGetForSelection(std::vector< ProductResolverIndexAndSkipBit > &) const =0
static bool call(Worker *iWorker, StreamID id, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
virtual bool implDoPrePrefetchSelection(StreamID, EventPrincipal const &, ModuleCallingContext const *)=0
OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > Arg
virtual void itemsMayGet(BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
virtual bool implDoAccessInputProcessBlock(ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
static SerialTaskQueue * enableGlobalQueue(Worker *)
OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > Arg
static SerialTaskQueue * enableGlobalQueue(Worker *)
bool runModule(typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
std::shared_ptr< ActivityRegistry > actReg_
virtual bool wantsGlobalRuns() const =0
virtual void implEndJob()=0
virtual void convertCurrentProcessAlias(std::string const &processName)=0
static bool needToRunSelection(Worker const *iWorker)
std::atomic< int > timesPassed_
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition)
LimitedTaskQueue * limited_
std::exception_ptr runModuleDirectly(typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
bool resume()
Resumes processing if the queue was paused.
std::atomic< int > timesVisited_
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
virtual bool wantsGlobalLuminosityBlocks() const =0
void resetModuleDescription(ModuleDescription const *)
virtual bool wantsInputProcessBlocks() const =0
void postDoEvent(EventPrincipal const &)
ExceptionToActionTable const * actions_
virtual void implBeginJob()=0
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition)
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
HandleExternalWorkExceptionTask(Worker *worker, tbb::task_group *group, WaitingTask *runModuleTask, ParentContext const &parentContext)
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
Worker(ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
RunModuleTask(Worker *worker, typename T::TransitionInfoType const &transitionInfo, ServiceToken const &token, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context, tbb::task_group *iGroup)
void respondToOpenInputFile(FileBlock const &fb)
std::exception_ptr setException(std::exception_ptr iException)
virtual void itemsToGet(BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
std::atomic< int > numberOfPathsLeftToRun_
void registerThinnedAssociations(ProductRegistry const ®istry, ThinnedAssociationsHelper &helper)
static SerialTaskQueue * enableGlobalQueue(Worker *)
static bool call(Worker *iWorker, StreamID, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition)
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
ModuleDescription const * moduleDescription() const
const ParentContext m_parentContext
virtual bool implDoBegin(RunTransitionInfo const &, ModuleCallingContext const *)=0
ServiceWeakToken m_serviceToken
OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > Arg
static bool needToRunSelection(Worker const *iWorker)
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition)
virtual bool wantsStreamLuminosityBlocks() const =0
static bool wantsTransition(Worker const *iWorker)
static bool needToRunSelection(Worker const *iWorker)
virtual void implEndStream(StreamID)=0
static void esPrefetchAsync(Worker *, WaitingTaskHolder, ServiceToken const &, ProcessBlockTransitionInfo const &, Transition)
std::atomic< bool > workStarted_
virtual std::vector< ProductResolverIndex > const & itemsShouldPutInEvent() const =0
static SerialTaskQueue * enableGlobalQueue(Worker *)
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
Worker & operator=(Worker const &)=delete
void reset()
Resets access to the resource so that added tasks will wait.
static uInt32 F(BLOWFISH_CTX *ctx, uInt32 x)
virtual void implRespondToCloseInputFile(FileBlock const &fb)=0
static SerialTaskQueue * enableGlobalQueue(Worker *)
static bool wantsTransition(Worker const *iWorker)
ModuleDescription const * description() const
virtual bool implDoStreamEnd(StreamID, RunTransitionInfo const &, ModuleCallingContext const *)=0
void doWorkAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
void runAcquireAfterAsyncPrefetch(std::exception_ptr const *, EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder)
virtual bool implDoBeginProcessBlock(ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
std::exception_ptr const * exceptionPtr() const
Returns exception thrown by dependent task.
static bool needToRunSelection(Worker const *iWorker)
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
void setActivityRegistry(std::shared_ptr< ActivityRegistry > areg)
static bool needToRunSelection(Worker const *iWorker)
std::atomic< int > timesRun_
static bool needToRunSelection(Worker const *iWorker)
AcquireTask(Worker *, typename T::TransitionInfoType const &, ServiceToken const &, ParentContext const &, WaitingTaskWithArenaHolder)
virtual void resolvePutIndicies(BranchType iBranchType, std::unordered_multimap< std::string, std::tuple< TypeID const *, const char *, edm::ProductResolverIndex >> const &iIndicies)=0
static bool wantsTransition(Worker const *iWorker)
auto wrap(F iFunc) -> decltype(iFunc())
static bool wantsTransition(Worker const *iWorker)
std::atomic< State > state_
BranchType const & branchType() const
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
virtual void doClearModule()=0
const ParentContext m_parentContext
OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > Arg
static bool needToRunSelection(Worker const *iWorker)
AcquireTask(Worker *worker, EventTransitionInfo const &eventTransitionInfo, ServiceToken const &token, ParentContext const &parentContext, WaitingTaskWithArenaHolder holder)
static bool needToRunSelection(Worker const *iWorker)
static void esPrefetchAsync(Worker *, WaitingTaskHolder, ServiceToken const &, ProcessBlockTransitionInfo const &, Transition)
static bool call(Worker *iWorker, StreamID, EventTransitionInfo const &info, ActivityRegistry *, ModuleCallingContext const *mcc, Arg::Context const *)
static bool call(Worker *iWorker, StreamID, ProcessBlockTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
std::atomic< int > timesExcept_
void push(tbb::task_group &iG, F &&iF)
virtual bool implDoEnd(RunTransitionInfo const &, ModuleCallingContext const *)=0
static bool needToRunSelection(Worker const *iWorker)
const ParentContext m_parentContext
static bool call(Worker *iWorker, StreamID id, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
static bool needToRunSelection(Worker const *iWorker)
static bool wantsTransition(Worker const *iWorker)
void push(tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
void runAcquire(EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder &)
void doWorkNoPrefetchingAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
virtual TaskQueueAdaptor serializeRunModule()=0
virtual void implDoAcquire(EventTransitionInfo const &, ModuleCallingContext const *, WaitingTaskWithArenaHolder &)=0
static bool wantsTransition(Worker const *iWorker)
virtual bool wantsProcessBlocks() const =0
static SerialTaskQueue * enableGlobalQueue(Worker *iWorker)
edm::WaitingTaskList waitingTasks_
static bool call(Worker *iWorker, StreamID, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
FunctorWaitingTask< F > * make_waiting_task(F f)
tbb::task_group * m_group
virtual void implRegisterThinnedAssociations(ProductRegistry const &, ThinnedAssociationsHelper &)=0
StreamContext const * getStreamContext() const
T::TransitionInfoType m_transitionInfo
static SerialTaskQueue * pauseGlobalQueue(Worker *)
WaitingTask * m_runModuleTask
static bool wantsTransition(Worker const *iWorker)
static SerialTaskQueue * enableGlobalQueue(Worker *)
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
void setEarlyDeleteHelper(EarlyDeleteHelper *iHelper)
bool ranAcquireWithoutException_
static void exceptionContext(cms::Exception &ex, ModuleCallingContext const *mcc)
virtual void implRespondToOpenInputFile(FileBlock const &fb)=0
void push(tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
TaskQueueAdaptor()=default
ModuleCallingContext moduleCallingContext_
static void esPrefetchAsync(Worker *, WaitingTaskHolder, ServiceToken const &, ProcessBlockTransitionInfo const &, Transition)
OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > Arg
ServiceToken lock() const
void respondToCloseInputFile(FileBlock const &fb)
static SerialTaskQueue * enableGlobalQueue(Worker *)
void callWhenDoneAsync(WaitingTaskHolder task)
std::atomic< int > timesFailed_
static bool needToRunSelection(Worker const *iWorker)
static bool wantsTransition(Worker const *iWorker)
void esPrefetchAsync(WaitingTaskHolder, EventSetupImpl const &, Transition, ServiceToken const &)
virtual bool implDo(EventTransitionInfo const &, ModuleCallingContext const *)=0
std::exception_ptr cached_exception_
tbb::task_group * m_group
bool needsESPrefetching(Transition iTrans) const noexcept
virtual std::vector< ESRecordIndex > const & esRecordsToGetFrom(Transition) const =0
virtual Types moduleType() const =0
std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr const *, typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
EnableQueueGuard(EnableQueueGuard &&iGuard)
virtual bool wantsStreamRuns() const =0
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, EventTransitionInfo const &info, Transition transition)
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom(BranchType) const =0
virtual bool hasAcquire() const =0
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
static bool wantsTransition(Worker const *iWorker)
void add(tbb::task_group *, WaitingTask *)
Adds task to the waiting list.
static bool call(Worker *iWorker, StreamID, ProcessBlockTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
virtual SerialTaskQueue * globalRunsQueue()=0
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition)
static SerialTaskQueue * enableGlobalQueue(Worker *)
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition)
void prefetchAsync(WaitingTaskHolder, ServiceToken const &, ParentContext const &, typename T::TransitionInfoType const &, Transition)
static bool call(Worker *iWorker, StreamID id, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
static bool call(Worker *iWorker, StreamID, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
static SerialTaskQueue * enableGlobalQueue(Worker *)
virtual void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const &)=0
TaskQueueAdaptor(SerialTaskQueueChain *iChain)
virtual bool hasAccumulator() const =0
virtual void preActionBeforeRunEventAsync(WaitingTaskHolder iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const =0
void skipOnPath(EventPrincipal const &iEvent)
ActivityRegistry * activityRegistry()
virtual void modulesWhoseProductsAreConsumed(std::array< std::vector< ModuleDescription const * > *, NumBranchTypes > &modules, std::vector< ModuleProcessName > &modulesInPreviousProcesses, ProductRegistry const &preg, std::map< std::string, ModuleDescription const * > const &labelsToDesc) const =0
EnableQueueGuard(SerialTaskQueue *iQueue)
void edPrefetchAsync(WaitingTaskHolder, ServiceToken const &, Principal const &) const
static SerialTaskQueue * enableGlobalQueue(Worker *iWorker)
virtual std::vector< ESProxyIndex > const & esItemsToGetFrom(Transition) const =0
SerialTaskQueueChain * serial_
void prePrefetchSelectionAsync(tbb::task_group &, WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *)
virtual SerialTaskQueue * globalLuminosityBlocksQueue()=0
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
std::exception_ptr handleExternalWorkException(std::exception_ptr const *iEPtr, ParentContext const &parentContext)
virtual void implBeginStream(StreamID)=0
T::Context const * m_context
static bool call(Worker *iWorker, StreamID, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
void emitPostModuleEventPrefetchingSignal()
static SerialTaskQueue * enableGlobalQueue(Worker *)
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition)
WaitingTaskWithArenaHolder m_holder
void prePrefetchSelectionAsync(tbb::task_group &, WaitingTask *task, ServiceToken const &, StreamID stream, void const *)
static bool needToRunSelection(Worker const *iWorker)
static bool wantsTransition(Worker const *iWorker)
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
EventTransitionInfo m_eventTransitionInfo
#define CMS_THREAD_GUARD(_var_)
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition)
static bool wantsTransition(Worker const *iWorker)
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
void beginStream(StreamID id, StreamContext &streamContext)
virtual bool implNeedToRunSelection() const =0
virtual bool implDoStreamBegin(StreamID, RunTransitionInfo const &, ModuleCallingContext const *)=0
ServiceWeakToken m_serviceToken
void endStream(StreamID id, StreamContext &streamContext)
virtual std::string workerType() const =0
virtual bool implDoEndProcessBlock(ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
TaskQueueAdaptor(LimitedTaskQueue *iLimited)
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent) const
static bool call(Worker *iWorker, StreamID, ProcessBlockTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)