1 #ifndef FWCore_Framework_Worker_h 2 #define FWCore_Framework_Worker_h 70 #include <unordered_map> 76 class ModuleProcessName;
77 class ProductResolverIndexHelper;
78 class ProductResolverIndexAndSkipBit;
81 class ProductRegistry;
82 class ThinnedAssociationsHelper;
84 namespace workerhelper {
88 namespace eventsetup {
108 void push(oneapi::tbb::task_group& iG,
F&& iF) {
146 template <
typename T>
148 typename T::TransitionInfoType
const&,
152 typename T::Context
const*);
154 template <
typename T>
156 typename T::TransitionInfoType
const&,
160 typename T::Context
const*);
162 template <
typename T>
166 typename T::Context
const*);
170 size_t iTransformIndex,
216 std::unordered_multimap<
std::string, std::tuple<TypeID const*, const char*, edm::ProductResolverIndex>>
const&
221 std::vector<ModuleProcessName>& modulesInPreviousProcesses,
223 std::map<std::string, ModuleDescription const*>
const& labelsToDesc)
const = 0;
227 virtual std::vector<ConsumesInfo>
consumesInfo()
const = 0;
233 timesRun_.store(0, std::memory_order_release);
257 template <
typename O>
273 size_t iTransformIndex,
301 template <
typename T>
327 bool isTryToContinue)
const;
330 template <
bool IS_EVENT>
339 template <
bool IS_EVENT>
348 template <
bool IS_EVENT>
358 template <
typename T>
362 typename T::TransitionInfoType
const&,
388 template <
typename T>
390 typename T::TransitionInfoType
const&,
393 typename T::Context
const*);
404 template <
typename T>
408 typename T::TransitionInfoType
const& transitionInfo,
412 typename T::Context
const*
context,
413 oneapi::tbb::task_group* iGroup)
442 std::exception_ptr temp_excptr;
451 temp_excptr = std::current_exception();
453 excptr = temp_excptr;
457 }
else if constexpr (std::is_same_v<typename T::Context, StreamContext>) {
459 }
else if constexpr (std::is_same_v<typename T::Context, GlobalContext>) {
478 std::exception_ptr ptr;
479 worker->template runModuleAfterAsyncPrefetch<T>(ptr,
info, streamID, parentContext, sContext);
512 template <
typename T,
typename DUMMY =
void>
516 typename T::TransitionInfoType
const&,
523 template <
typename DUMMY>
532 m_eventTransitionInfo(eventTransitionInfo),
533 m_parentContext(parentContext),
535 m_serviceToken(
token) {}
543 std::exception_ptr temp_excptr;
548 m_worker->emitPostModuleEventPrefetchingSignal();
550 temp_excptr = std::current_exception();
552 excptr = temp_excptr;
557 if (
auto queue = m_worker->serializeRunModule()) {
558 queue.push(*m_holder.group(),
560 info = m_eventTransitionInfo,
561 parentContext = m_parentContext,
562 serviceToken = m_serviceToken,
563 holder = m_holder]() {
567 std::exception_ptr ptr;
568 worker->runAcquireAfterAsyncPrefetch(ptr,
info, parentContext, holder);
574 m_worker->runAcquireAfterAsyncPrefetch(excptr, m_eventTransitionInfo, m_parentContext,
std::move(m_holder));
591 oneapi::tbb::task_group*
group,
631 template <
typename T>
632 class ModuleSignalSentry {
635 typename T::Context
const*
context,
637 : a_(
a), context_(
context), moduleCallingContext_(moduleCallingContext) {}
639 ~ModuleSignalSentry() {
646 T::postModuleSignal(a_, context_, moduleCallingContext_);
651 void preModuleSignal() {
653 T::preModuleSignal(a_, context_, moduleCallingContext_);
656 void postModuleSignal() {
662 T::postModuleSignal(
temp, context_, moduleCallingContext_);
668 typename T::Context
const* context_;
674 namespace workerhelper {
712 ModuleSignalSentry<Arg> cpp(actReg,
context, mcc);
715 cpp.preModuleSignal();
720 cpp.postModuleSignal();
745 ModuleSignalSentry<Arg> cpp(actReg,
context, mcc);
746 cpp.preModuleSignal();
748 cpp.postModuleSignal();
773 ModuleSignalSentry<Arg> cpp(actReg,
context, mcc);
774 cpp.preModuleSignal();
776 cpp.postModuleSignal();
801 ModuleSignalSentry<Arg> cpp(actReg,
context, mcc);
802 cpp.preModuleSignal();
804 cpp.postModuleSignal();
830 ModuleSignalSentry<Arg> cpp(actReg,
context, mcc);
831 cpp.preModuleSignal();
833 cpp.postModuleSignal();
858 ModuleSignalSentry<Arg> cpp(actReg,
context, mcc);
859 cpp.preModuleSignal();
861 cpp.postModuleSignal();
888 ModuleSignalSentry<Arg> cpp(actReg,
context, mcc);
889 cpp.preModuleSignal();
891 cpp.postModuleSignal();
919 ModuleSignalSentry<Arg> cpp(actReg,
context, mcc);
920 cpp.preModuleSignal();
922 cpp.postModuleSignal();
949 ModuleSignalSentry<Arg> cpp(actReg,
context, mcc);
950 cpp.preModuleSignal();
952 cpp.postModuleSignal();
972 ModuleSignalSentry<Arg> cpp(actReg,
context, mcc);
973 cpp.preModuleSignal();
975 cpp.postModuleSignal();
995 ModuleSignalSentry<Arg> cpp(actReg,
context, mcc);
996 cpp.preModuleSignal();
998 cpp.postModuleSignal();
1010 template <
typename T>
1014 typename T::TransitionInfoType
const& transitionInfo,
1016 Principal const& principal = transitionInfo.principal();
1022 }
else if constexpr (std::is_same_v<typename T::Context, StreamContext>) {
1024 }
else if constexpr (std::is_same_v<typename T::Context, GlobalContext>) {
1036 template <
typename T>
1038 typename T::TransitionInfoType
const& transitionInfo,
1042 typename T::Context
const*
context) {
1048 bool expected =
false;
1049 bool workStarted =
workStarted_.compare_exchange_strong(expected,
true);
1067 struct DestroyTask {
1071 auto p = m_task.exchange(
nullptr);
1080 std::atomic<edm::WaitingTask*> m_task;
1084 auto ownRunTask = std::make_shared<DestroyTask>(moduleTask);
1088 [
this, weakToken, transitionInfo, parentContext, ownRunTask,
group](std::exception_ptr
const* iExcept) {
1091 AcquireTask<T> t(
this, transitionInfo, weakToken.lock(), parentContext, runTaskHolder);
1097 auto ownModuleTask = std::make_shared<DestroyTask>(moduleTask);
1099 auto selectionTask =
1101 std::exception_ptr
const*)
mutable {
1126 template <
typename T>
1128 typename T::TransitionInfoType
const& transitionInfo,
1131 typename T::Context
const*
context) {
1132 std::exception_ptr exceptionPtr;
1133 bool shouldRun =
true;
1136 exceptionPtr = iEPtr;
1137 setException<T::isEvent_>(exceptionPtr);
1141 setPassed<T::isEvent_>();
1148 CMS_SA_ALLOW try { runModule<T>(transitionInfo, streamID, parentContext,
context); }
catch (...) {
1149 exceptionPtr = std::current_exception();
1155 return exceptionPtr;
1158 template <
typename T>
1160 typename T::TransitionInfoType
const& transitionInfo,
1164 typename T::Context
const*
context) {
1170 bool expected =
false;
1171 auto workStarted =
workStarted_.compare_exchange_strong(expected,
true);
1176 auto toDo = [
this,
info = transitionInfo, streamID, parentContext,
context, weakToken]() {
1177 std::exception_ptr exceptionPtr;
1183 this->runModule<T>(
info, streamID, parentContext,
context);
1185 exceptionPtr = std::current_exception();
1192 auto afterPrefetch =
1206 WaitingTaskHolder(*
group, afterPrefetch), transitionInfo.eventSetupImpl(), T::transition_, serviceToken);
1218 template <
typename T>
1222 typename T::Context
const*
context) {
1229 timesRun_.fetch_add(1, std::memory_order_relaxed);
1239 setPassed<T::isEvent_>();
1241 setFailed<T::isEvent_>();
1248 setException<T::isEvent_>(std::current_exception());
1251 rc = setPassed<T::isEvent_>();
1258 template <
typename T>
1262 typename T::Context
const*
context) {
1264 std::exception_ptr prefetchingException;
1265 return runModuleAfterAsyncPrefetch<T>(prefetchingException, transitionInfo, streamID, parentContext,
context);
std::exception_ptr runModuleDirectly(typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
static SerialTaskQueue * enableGlobalQueue(Worker *)
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
virtual bool hasAcquire() const =0
std::atomic< int > timesVisited_
virtual void implDoAcquire(EventTransitionInfo const &, ModuleCallingContext const *, WaitingTaskWithArenaHolder &)=0
virtual bool wantsStreamRuns() const =0
ModuleDescription const * moduleDescription() const
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
virtual bool implDoBeginProcessBlock(ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
void resetModuleDescription(ModuleDescription const *)
std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr, typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
void push(oneapi::tbb::task_group &iG, F &&iF)
static SerialTaskQueue * pauseGlobalQueue(Worker *)
T::Context const * m_context
static bool call(Worker *iWorker, StreamID, EventTransitionInfo const &info, ActivityRegistry *, ModuleCallingContext const *mcc, Arg::Context const *)
std::atomic< int > numberOfPathsLeftToRun_
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
virtual bool hasAccumulator() const =0
unsigned int ProductResolverIndex
virtual void preActionBeforeRunEventAsync(WaitingTaskHolder iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const =0
std::exception_ptr setException(std::exception_ptr iException)
static bool needToRunSelection(Worker const *iWorker)
static SerialTaskQueue * enableGlobalQueue(Worker *)
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
static bool call(Worker *iWorker, StreamID, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
void setEarlyDeleteHelper(EarlyDeleteHelper *iHelper)
AcquireTask(Worker *worker, EventTransitionInfo const &eventTransitionInfo, ServiceToken const &token, ParentContext const &parentContext, WaitingTaskWithArenaHolder holder)
static bool call(Worker *iWorker, StreamID, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
static bool wantsTransition(Worker const *iWorker)
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition)
static bool needToRunSelection(Worker const *iWorker)
ParentContext const m_parentContext
static bool call(Worker *iWorker, StreamID, ProcessBlockTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
ParentContext const m_parentContext
static SerialTaskQueue * enableGlobalQueue(Worker *)
std::atomic< int > timesExcept_
static bool wantsTransition(Worker const *iWorker)
oneapi::tbb::task_group * m_group
std::atomic< bool > workStarted_
static bool needToRunSelection(Worker const *iWorker)
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
virtual bool implDoEnd(RunTransitionInfo const &, ModuleCallingContext const *)=0
OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > Arg
std::atomic< int > timesFailed_
void emitPostModuleStreamPrefetchingSignal()
bool needsESPrefetching(Transition iTrans) const noexcept
WaitingTask * m_runModuleTask
virtual void implRespondToCloseInputFile(FileBlock const &fb)=0
std::shared_ptr< ActivityRegistry > actReg_
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
static SerialTaskQueue * enableGlobalQueue(Worker *)
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom(BranchType) const =0
void doWorkAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
TaskQueueAdaptor(SerialTaskQueueChain *iChain)
static bool needToRunSelection(Worker const *iWorker)
static SerialTaskQueue * enableGlobalQueue(Worker *)
static SerialTaskQueue * enableGlobalQueue(Worker *)
void runAcquireAfterAsyncPrefetch(std::exception_ptr, EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder)
virtual void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const &)=0
virtual void implRespondToCloseOutputFile()=0
ActivityRegistry * activityRegistry()
static bool call(Worker *iWorker, StreamID id, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
void prePrefetchSelectionAsync(oneapi::tbb::task_group &, WaitingTask *task, ServiceToken const &, StreamID stream, void const *)
std::exception_ptr handleExternalWorkException(std::exception_ptr iEPtr, ParentContext const &parentContext)
void beginStream(StreamID id, StreamContext &streamContext)
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
EventTransitionInfo m_eventTransitionInfo
TaskQueueAdaptor()=default
void reset()
Resets access to the resource so that added tasks will wait.
SerialTaskQueueChain * serial_
static bool call(Worker *iWorker, StreamID id, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
virtual std::string workerType() const =0
ExceptionToActionTable const * actions_
static bool needToRunSelection(Worker const *iWorker)
LimitedTaskQueue * limited_
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
bool ranAcquireWithoutException_
void setActivityRegistry(std::shared_ptr< ActivityRegistry > areg)
virtual bool implDoBegin(RunTransitionInfo const &, ModuleCallingContext const *)=0
virtual void itemsToGet(BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
oneapi::tbb::task_group * m_group
ModuleCallingContext moduleCallingContext_
AcquireTask(Worker *, typename T::TransitionInfoType const &, ServiceToken const &, ParentContext const &, WaitingTaskWithArenaHolder)
void prePrefetchSelectionAsync(oneapi::tbb::task_group &, WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *)
std::exception_ptr cached_exception_
static SerialTaskQueue * enableGlobalQueue(Worker *)
virtual bool implDoPrePrefetchSelection(StreamID, EventPrincipal const &, ModuleCallingContext const *)=0
virtual std::vector< ESRecordIndex > const & esRecordsToGetFrom(Transition) const =0
void emitPostModuleGlobalPrefetchingSignal()
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
virtual bool implDo(EventTransitionInfo const &, ModuleCallingContext const *)=0
static bool needToRunSelection(Worker const *iWorker)
virtual void selectInputProcessBlocks(ProductRegistry const &, ProcessBlockHelperBase const &)=0
static bool needToRunSelection(Worker const *iWorker)
void doTransformAsync(WaitingTaskHolder, size_t iTransformIndex, EventPrincipal const &, ServiceToken const &, StreamID, ModuleCallingContext const &, StreamContext const *)
std::exception_ptr exceptionPtr() const
Returns exception thrown by dependent task.
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
bool resume()
Resumes processing if the queue was paused.
static bool wantsTransition(Worker const *iWorker)
EnableQueueGuard(EnableQueueGuard &&iGuard)
Worker & operator=(Worker const &)=delete
static bool needToRunSelection(Worker const *iWorker)
edm::WaitingTaskList waitingTasks_
T::TransitionInfoType m_transitionInfo
virtual void implDoTransformAsync(WaitingTaskHolder, size_t iTransformIndex, EventPrincipal const &, ParentContext const &, ServiceWeakToken const &)=0
virtual void implRegisterThinnedAssociations(ProductRegistry const &, ThinnedAssociationsHelper &)=0
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition)
static SerialTaskQueue * enableGlobalQueue(Worker *)
ServiceWeakToken m_serviceToken
virtual bool implDoStreamBegin(StreamID, RunTransitionInfo const &, ModuleCallingContext const *)=0
static bool wantsTransition(Worker const *iWorker)
virtual bool wantsInputProcessBlocks() const =0
virtual void resolvePutIndicies(BranchType iBranchType, std::unordered_multimap< std::string, std::tuple< TypeID const *, const char *, edm::ProductResolverIndex >> const &iIndicies)=0
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition)
virtual bool wantsProcessBlocks() const =0
static void esPrefetchAsync(Worker *, WaitingTaskHolder, ServiceToken const &, ProcessBlockTransitionInfo const &, Transition)
OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > Arg
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition)
EnableQueueGuard(SerialTaskQueue *iQueue)
virtual std::vector< ConsumesInfo > consumesInfo() const =0
void add(oneapi::tbb::task_group *, WaitingTask *)
Adds task to the waiting list.
void registerThinnedAssociations(ProductRegistry const ®istry, ThinnedAssociationsHelper &helper)
static bool wantsTransition(Worker const *iWorker)
#define CMS_THREAD_GUARD(_var_)
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
Worker(ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
virtual void itemsToGetForSelection(std::vector< ProductResolverIndexAndSkipBit > &) const =0
FunctorWaitingTask< F > * make_waiting_task(F f)
std::atomic< State > state_
static SerialTaskQueue * enableGlobalQueue(Worker *)
static SerialTaskQueue * enableGlobalQueue(Worker *)
void callWhenDoneAsync(WaitingTaskHolder task)
void esPrefetchAsync(WaitingTaskHolder, EventSetupImpl const &, Transition, ServiceToken const &)
WaitingTaskWithArenaHolder m_holder
virtual std::vector< ESResolverIndex > const & esItemsToGetFrom(Transition) const =0
static bool needToRunSelection(Worker const *iWorker)
static bool call(Worker *iWorker, StreamID, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
virtual void implEndJob()=0
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
static bool wantsTransition(Worker const *iWorker)
static bool wantsTransition(Worker const *iWorker)
static bool wantsTransition(Worker const *iWorker)
ServiceToken lock() const
virtual size_t transformIndex(edm::BranchDescription const &) const =0
ServiceWeakToken m_serviceToken
HandleExternalWorkExceptionTask(Worker *worker, oneapi::tbb::task_group *group, WaitingTask *runModuleTask, ParentContext const &parentContext)
virtual Types moduleType() const =0
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
static bool wantsTransition(Worker const *iWorker)
static bool call(Worker *iWorker, StreamID, ProcessBlockTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
ModuleDescription const * description() const
static bool wantsTransition(Worker const *iWorker)
TaskQueueAdaptor(LimitedTaskQueue *iLimited)
OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > Arg
void doWorkNoPrefetchingAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
virtual bool implNeedToRunSelection() const =0
virtual bool implDoEndProcessBlock(ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
virtual bool implDoStreamEnd(StreamID, RunTransitionInfo const &, ModuleCallingContext const *)=0
virtual TaskQueueAdaptor serializeRunModule()=0
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
virtual SerialTaskQueue * globalRunsQueue()=0
OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > Arg
virtual void implBeginJob()=0
void respondToOpenInputFile(FileBlock const &fb)
void edPrefetchAsync(WaitingTaskHolder, ServiceToken const &, Principal const &) const
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
void respondToCloseOutputFile()
static SerialTaskQueue * enableGlobalQueue(Worker *iWorker)
void skipOnPath(EventPrincipal const &iEvent)
static bool call(Worker *iWorker, StreamID id, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
virtual void implBeginStream(StreamID)=0
virtual bool implDoAccessInputProcessBlock(ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, bool isTryToContinue) const
virtual void implEndStream(StreamID)=0
virtual void doClearModule()=0
void respondToCloseInputFile(FileBlock const &fb)
virtual bool wantsStreamLuminosityBlocks() const =0
virtual void convertCurrentProcessAlias(std::string const &processName)=0
bool runModule(typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
void emitPostModuleEventPrefetchingSignal()
ParentContext const m_parentContext
static SerialTaskQueue * enableGlobalQueue(Worker *iWorker)
static bool wantsTransition(Worker const *iWorker)
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition)
StreamContext const * getStreamContext() const
void checkForShouldTryToContinue(ModuleDescription const &)
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, EventTransitionInfo const &info, Transition transition)
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
std::atomic< int > timesPassed_
static bool needToRunSelection(Worker const *iWorker)
void runAcquire(EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder &)
void endStream(StreamID id, StreamContext &streamContext)
void postDoEvent(EventPrincipal const &)
virtual bool wantsGlobalLuminosityBlocks() const =0
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition)
static bool call(Worker *iWorker, StreamID id, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
static bool call(Worker *iWorker, StreamID, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
static bool needToRunSelection(Worker const *iWorker)
auto wrap(F iFunc) -> decltype(iFunc())
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition)
void prefetchAsync(WaitingTaskHolder, ServiceToken const &, ParentContext const &, typename T::TransitionInfoType const &, Transition)
static bool wantsTransition(Worker const *iWorker)
static uInt32 F(BLOWFISH_CTX *ctx, uInt32 x)
std::atomic< int > timesRun_
static bool needToRunSelection(Worker const *iWorker)
static bool call(Worker *iWorker, StreamID, ProcessBlockTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
virtual void implRespondToOpenInputFile(FileBlock const &fb)=0
GlobalContext const * getGlobalContext() const
static void esPrefetchAsync(Worker *, WaitingTaskHolder, ServiceToken const &, ProcessBlockTransitionInfo const &, Transition)
OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > Arg
static void esPrefetchAsync(Worker *, WaitingTaskHolder, ServiceToken const &, ProcessBlockTransitionInfo const &, Transition)
virtual SerialTaskQueue * globalLuminosityBlocksQueue()=0
EnableQueueGuard & operator=(EnableQueueGuard const &)=delete
edm::WaitingTaskList & waitingTaskList()
virtual ConcurrencyTypes moduleConcurrencyType() const =0
virtual bool wantsGlobalRuns() const =0
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition)
BranchType const & branchType() const
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
bool shouldTryToContinue_
virtual ProductResolverIndex itemToGetForTransform(size_t iTransformIndex) const =0
RunModuleTask(Worker *worker, typename T::TransitionInfoType const &transitionInfo, ServiceToken const &token, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context, oneapi::tbb::task_group *iGroup)
virtual void modulesWhoseProductsAreConsumed(std::array< std::vector< ModuleDescription const *> *, NumBranchTypes > &modules, std::vector< ModuleProcessName > &modulesInPreviousProcesses, ProductRegistry const &preg, std::map< std::string, ModuleDescription const *> const &labelsToDesc) const =0
virtual void itemsMayGet(BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0