1 #ifndef FWCore_Framework_Worker_h 2 #define FWCore_Framework_Worker_h 71 #include <unordered_map> 77 class ModuleProcessName;
78 class ProductResolverIndexHelper;
79 class ProductResolverIndexAndSkipBit;
80 class ProductRegistry;
81 class ThinnedAssociationsHelper;
83 namespace workerhelper {
87 namespace eventsetup {
88 class ESRecordsToProductResolverIndices;
107 void push(oneapi::tbb::task_group& iG,
F&& iF) {
148 template <
typename T>
150 typename T::TransitionInfoType
const&,
154 typename T::Context
const*) noexcept;
156 template <
typename T>
158 typename T::TransitionInfoType
const&,
162 typename T::Context
const*) noexcept;
164 template <
typename T>
168 typename T::Context
const*) noexcept;
172 size_t iTransformIndex,
218 std::unordered_multimap<
std::string, std::tuple<TypeID const*, const char*, edm::ProductResolverIndex>>
const&
223 std::vector<ModuleProcessName>& modulesInPreviousProcesses,
225 std::map<std::string, ModuleDescription const*>
const& labelsToDesc)
const = 0;
229 virtual std::vector<ConsumesInfo>
consumesInfo()
const = 0;
235 timesRun_.store(0, std::memory_order_release);
259 template <
typename O>
275 size_t iTransformIndex,
303 template <
typename T>
316 Principal const& iPrincipal)
const noexcept = 0;
329 bool isTryToContinue)
const noexcept;
332 template <
bool IS_EVENT>
341 template <
bool IS_EVENT>
350 template <
bool IS_EVENT>
360 template <
typename T>
364 typename T::TransitionInfoType
const&,
392 typename
T::TransitionInfoType
const&,
411 typename T::TransitionInfoType
const& transitionInfo,
415 typename T::Context
const*
context,
416 oneapi::tbb::task_group* iGroup) noexcept
418 m_transitionInfo(transitionInfo),
419 m_streamID(streamID),
420 m_parentContext(parentContext),
422 m_serviceToken(
token),
428 EnableQueueGuard(EnableQueueGuard
const&) =
delete;
429 EnableQueueGuard&
operator=(EnableQueueGuard
const&) =
delete;
430 EnableQueueGuard&
operator=(EnableQueueGuard&&) =
delete;
445 std::exception_ptr temp_excptr;
446 auto excptr = exceptionPtr();
448 if (!m_worker->hasAcquire()) {
452 m_worker->emitPostModuleEventPrefetchingSignal();
454 temp_excptr = std::current_exception();
456 excptr = temp_excptr;
460 }
else if constexpr (std::is_same_v<typename T::Context, StreamContext>) {
461 m_worker->emitPostModuleStreamPrefetchingSignal();
462 }
else if constexpr (std::is_same_v<typename T::Context, GlobalContext>) {
463 m_worker->emitPostModuleGlobalPrefetchingSignal();
467 if (
auto queue = m_worker->serializeRunModule()) {
468 auto f = [worker = m_worker,
469 info = m_transitionInfo,
470 streamID = m_streamID,
471 parentContext = m_parentContext,
472 sContext = m_context,
473 serviceToken = m_serviceToken]() {
481 std::exception_ptr ptr;
482 worker->template runModuleAfterAsyncPrefetch<T>(ptr,
info, streamID, parentContext, sContext);
487 gQueue->push(*m_group, [
queue, gQueue,
f,
group = m_group]()
mutable {
498 m_worker->runModuleAfterAsyncPrefetch<
T>(excptr, m_transitionInfo, m_streamID, m_parentContext, m_context);
515 template <
typename T,
typename DUMMY =
void>
519 typename T::TransitionInfoType
const&,
526 template <
typename DUMMY>
535 m_eventTransitionInfo(eventTransitionInfo),
536 m_parentContext(parentContext),
538 m_serviceToken(
token) {}
546 std::exception_ptr temp_excptr;
551 m_worker->emitPostModuleEventPrefetchingSignal();
553 temp_excptr = std::current_exception();
555 excptr = temp_excptr;
560 if (
auto queue = m_worker->serializeRunModule()) {
561 queue.push(*m_holder.group(),
563 info = m_eventTransitionInfo,
564 parentContext = m_parentContext,
565 serviceToken = m_serviceToken,
566 holder = m_holder]() {
570 std::exception_ptr ptr;
571 worker->runAcquireAfterAsyncPrefetch(ptr,
info, parentContext, holder);
577 m_worker->runAcquireAfterAsyncPrefetch(excptr, m_eventTransitionInfo, m_parentContext,
std::move(m_holder));
594 oneapi::tbb::task_group*
group,
634 template <
typename T>
635 class ModuleSignalSentry {
638 typename T::Context
const*
context,
640 : a_(
a), context_(
context), moduleCallingContext_(moduleCallingContext) {}
642 ~ModuleSignalSentry() {
649 T::postModuleSignal(a_, context_, moduleCallingContext_);
654 void preModuleSignal() {
659 ex.
addContext(
"Handling pre module signal, likely in a service function immediately before module method");
664 void postModuleSignal() {
673 ex.
addContext(
"Handling post module signal, likely in a service function immediately after module method");
681 typename T::Context
const* context_;
687 namespace workerhelper {
706 worker->esPrefetchAsync(waitingTask,
info.eventSetupImpl(), transition,
token);
725 ModuleSignalSentry<Arg> cpp(actReg,
context, mcc);
728 cpp.preModuleSignal();
733 cpp.postModuleSignal();
742 worker->esPrefetchAsync(waitingTask,
info.eventSetupImpl(), transition,
token);
759 ModuleSignalSentry<Arg> cpp(actReg,
context, mcc);
760 cpp.preModuleSignal();
762 cpp.postModuleSignal();
771 worker->esPrefetchAsync(waitingTask,
info.eventSetupImpl(), transition,
token);
791 ModuleSignalSentry<Arg> cpp(actReg,
context, mcc);
792 cpp.preModuleSignal();
794 cpp.postModuleSignal();
804 worker->esPrefetchAsync(waitingTask,
info.eventSetupImpl(), transition,
token);
824 ModuleSignalSentry<Arg> cpp(actReg,
context, mcc);
825 cpp.preModuleSignal();
827 cpp.postModuleSignal();
837 worker->esPrefetchAsync(waitingTask,
info.eventSetupImpl(), transition,
token);
855 ModuleSignalSentry<Arg> cpp(actReg,
context, mcc);
856 cpp.preModuleSignal();
858 cpp.postModuleSignal();
867 worker->esPrefetchAsync(waitingTask,
info.eventSetupImpl(), transition,
token);
872 return iWorker->globalLuminosityBlocksQueue();
886 ModuleSignalSentry<Arg> cpp(actReg,
context, mcc);
887 cpp.preModuleSignal();
889 cpp.postModuleSignal();
898 worker->esPrefetchAsync(waitingTask,
info.eventSetupImpl(), transition,
token);
919 ModuleSignalSentry<Arg> cpp(actReg,
context, mcc);
920 cpp.preModuleSignal();
922 cpp.postModuleSignal();
932 worker->esPrefetchAsync(waitingTask,
info.eventSetupImpl(), transition,
token);
938 return iWorker->globalLuminosityBlocksQueue();
954 ModuleSignalSentry<Arg> cpp(actReg,
context, mcc);
955 cpp.preModuleSignal();
957 cpp.postModuleSignal();
967 worker->esPrefetchAsync(waitingTask,
info.eventSetupImpl(), transition,
token);
984 ModuleSignalSentry<Arg> cpp(actReg,
context, mcc);
985 cpp.preModuleSignal();
987 cpp.postModuleSignal();
1008 ModuleSignalSentry<Arg> cpp(actReg,
context, mcc);
1009 cpp.preModuleSignal();
1011 cpp.postModuleSignal();
1034 ModuleSignalSentry<Arg> cpp(actReg,
context, mcc);
1035 cpp.preModuleSignal();
1037 cpp.postModuleSignal();
1051 template <
typename T>
1055 typename T::TransitionInfoType
const& transitionInfo,
1057 Principal const& principal = transitionInfo.principal();
1062 actReg_->preModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
1063 }
else if constexpr (std::is_same_v<typename T::Context, StreamContext>) {
1064 actReg_->preModuleStreamPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
1065 }
else if constexpr (std::is_same_v<typename T::Context, GlobalContext>) {
1066 actReg_->preModuleGlobalPrefetchingSignal_.emit(*moduleCallingContext_.getGlobalContext(), moduleCallingContext_);
1070 edPrefetchAsync(iTask,
token, principal);
1073 preActionBeforeRunEventAsync(iTask, moduleCallingContext_, principal);
1077 template <
typename T>
1079 typename T::TransitionInfoType
const& transitionInfo,
1083 typename T::Context
const*
context) noexcept {
1089 bool expected =
false;
1090 bool workStarted = workStarted_.compare_exchange_strong(expected,
true);
1092 waitingTasks_.add(
task);
1094 timesVisited_.fetch_add(1, std::memory_order_relaxed);
1108 struct DestroyTask {
1111 ~DestroyTask() noexcept {
1112 auto p = m_task.exchange(
nullptr);
1121 std::atomic<edm::WaitingTask*> m_task;
1125 auto ownRunTask = std::make_shared<DestroyTask>(moduleTask);
1129 [
this, weakToken, transitionInfo, parentContext, ownRunTask,
group](std::exception_ptr
const* iExcept) {
1132 AcquireTask<T> t(
this, transitionInfo, weakToken.lock(), parentContext, runTaskHolder);
1138 auto ownModuleTask = std::make_shared<DestroyTask>(moduleTask);
1140 auto selectionTask =
1142 std::exception_ptr
const*)
mutable {
1150 prePrefetchSelectionAsync(*
group, selectionTask,
token, streamID, &transitionInfo.principal());
1167 template <
typename T>
1169 typename T::TransitionInfoType
const& transitionInfo,
1172 typename T::Context
const*
context) noexcept {
1173 std::exception_ptr exceptionPtr;
1174 bool shouldRun =
true;
1176 if (shouldRethrowException(iEPtr, parentContext, T::isEvent_, shouldTryToContinue_)) {
1177 exceptionPtr = iEPtr;
1178 setException<T::isEvent_>(exceptionPtr);
1181 if (not shouldTryToContinue_) {
1182 setPassed<T::isEvent_>();
1189 CMS_SA_ALLOW try { runModule<T>(transitionInfo, streamID, parentContext,
context); }
catch (...) {
1190 exceptionPtr = std::current_exception();
1195 waitingTasks_.doneWaiting(exceptionPtr);
1196 return exceptionPtr;
1199 template <
typename T>
1201 typename T::TransitionInfoType
const& transitionInfo,
1205 typename T::Context
const*
context) noexcept {
1211 bool expected =
false;
1212 auto workStarted = workStarted_.compare_exchange_strong(expected,
true);
1214 waitingTasks_.add(
task);
1217 auto toDo = [
this,
info = transitionInfo, streamID, parentContext,
context, weakToken]() {
1218 std::exception_ptr exceptionPtr;
1224 this->runModule<T>(
info, streamID, parentContext,
context);
1226 exceptionPtr = std::current_exception();
1228 this->waitingTasks_.doneWaiting(exceptionPtr);
1231 if (needsESPrefetching(T::transition_)) {
1233 auto afterPrefetch =
1236 this->waitingTasks_.doneWaiting(*iExcept);
1238 if (
auto queue = this->serializeRunModule()) {
1247 WaitingTaskHolder(*
group, afterPrefetch), transitionInfo.eventSetupImpl(), T::transition_, serviceToken);
1250 if (
auto queue = this->serializeRunModule()) {
1259 template <
typename T>
1263 typename T::Context
const*
context) {
1270 timesRun_.fetch_add(1, std::memory_order_relaxed);
1280 setPassed<T::isEvent_>();
1282 setFailed<T::isEvent_>();
1289 setException<T::isEvent_>(std::current_exception());
1292 rc = setPassed<T::isEvent_>();
1299 template <
typename T>
1303 typename T::Context
const*
context) noexcept {
1304 timesVisited_.fetch_add(1, std::memory_order_relaxed);
1305 std::exception_ptr prefetchingException;
1306 return runModuleAfterAsyncPrefetch<T>(prefetchingException, transitionInfo, streamID, parentContext,
context);
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition) noexcept
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, bool isTryToContinue) const noexcept
std::atomic< int > timesVisited_
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker) noexcept
virtual void implDoAcquire(EventTransitionInfo const &, ModuleCallingContext const *, WaitingTaskWithArenaHolder &)=0
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
virtual bool implDoBeginProcessBlock(ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
void resetModuleDescription(ModuleDescription const *)
void push(oneapi::tbb::task_group &iG, F &&iF)
T::Context const * m_context
static void esPrefetchAsync(Worker *, WaitingTaskHolder, ServiceToken const &, ProcessBlockTransitionInfo const &, Transition) noexcept
static bool call(Worker *iWorker, StreamID, EventTransitionInfo const &info, ActivityRegistry *, ModuleCallingContext const *mcc, Arg::Context const *)
static bool wantsTransition(Worker const *iWorker) noexcept
std::atomic< int > numberOfPathsLeftToRun_
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
unsigned int ProductResolverIndex
std::exception_ptr setException(std::exception_ptr iException)
static bool call(Worker *iWorker, StreamID, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
void doWorkNoPrefetchingAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *) noexcept
void setEarlyDeleteHelper(EarlyDeleteHelper *iHelper)
int timesPassed() const noexcept
std::exception_ptr exceptionPtr() const noexcept
Returns exception thrown by dependent task.
static bool call(Worker *iWorker, StreamID, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
static SerialTaskQueue * enableGlobalQueue(Worker *) noexcept
AcquireTask(Worker *worker, EventTransitionInfo const &eventTransitionInfo, ServiceToken const &token, ParentContext const &parentContext, WaitingTaskWithArenaHolder holder) noexcept
int timesVisited() const noexcept
int timesFailed() const noexcept
static SerialTaskQueue * enableGlobalQueue(Worker *iWorker) noexcept
static void esPrefetchAsync(Worker *, WaitingTaskHolder, ServiceToken const &, ProcessBlockTransitionInfo const &, Transition) noexcept
ParentContext const m_parentContext
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker) noexcept
static bool call(Worker *iWorker, StreamID, ProcessBlockTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
ParentContext const m_parentContext
std::atomic< int > timesExcept_
oneapi::tbb::task_group * m_group
std::atomic< bool > workStarted_
virtual bool implDoEnd(RunTransitionInfo const &, ModuleCallingContext const *)=0
OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > Arg
std::atomic< int > timesFailed_
void emitPostModuleStreamPrefetchingSignal()
bool needsESPrefetching(Transition iTrans) const noexcept
WaitingTask * m_runModuleTask
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker) noexcept
void doTransformAsync(WaitingTaskHolder, size_t iTransformIndex, EventPrincipal const &, ServiceToken const &, StreamID, ModuleCallingContext const &, StreamContext const *) noexcept
virtual void implRespondToCloseInputFile(FileBlock const &fb)=0
std::shared_ptr< ActivityRegistry > actReg_
void clearCounters() noexcept
static bool wantsTransition(Worker const *iWorker) noexcept
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker) noexcept
static SerialTaskQueue * pauseGlobalQueue(Worker *) noexcept
static bool needToRunSelection(Worker const *iWorker) noexcept
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom(BranchType) const =0
int timesRun() const noexcept
TaskQueueAdaptor(SerialTaskQueueChain *iChain)
virtual bool hasAcquire() const noexcept=0
static bool wantsTransition(Worker const *iWorker) noexcept
void beginStream(StreamID, StreamContext const &)
static SerialTaskQueue * enableGlobalQueue(Worker *) noexcept
virtual void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const &)=0
virtual void implRespondToCloseOutputFile()=0
virtual ProductResolverIndex itemToGetForTransform(size_t iTransformIndex) const noexcept=0
ActivityRegistry * activityRegistry()
static bool call(Worker *iWorker, StreamID id, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
static bool needToRunSelection(Worker const *iWorker) noexcept
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
EventTransitionInfo m_eventTransitionInfo
TaskQueueAdaptor()=default
void reset()
Resets access to the resource so that added tasks will wait.
SerialTaskQueueChain * serial_
static bool call(Worker *iWorker, StreamID id, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
virtual std::string workerType() const =0
ModuleDescription const * moduleDescription() const noexcept
ExceptionToActionTable const * actions_
void runAcquireAfterAsyncPrefetch(std::exception_ptr, EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder) noexcept
static SerialTaskQueue * enableGlobalQueue(Worker *) noexcept
virtual void preActionBeforeRunEventAsync(WaitingTaskHolder iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const noexcept=0
LimitedTaskQueue * limited_
bool ranAcquireWithoutException_
virtual bool hasAccumulator() const noexcept=0
void setActivityRegistry(std::shared_ptr< ActivityRegistry > areg)
virtual bool implDoBegin(RunTransitionInfo const &, ModuleCallingContext const *)=0
virtual void itemsToGet(BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
oneapi::tbb::task_group * m_group
ModuleCallingContext moduleCallingContext_
virtual bool wantsInputProcessBlocks() const noexcept=0
void edPrefetchAsync(WaitingTaskHolder, ServiceToken const &, Principal const &) const noexcept
std::exception_ptr handleExternalWorkException(std::exception_ptr iEPtr, ParentContext const &parentContext) noexcept
virtual void implDoTransformAsync(WaitingTaskHolder, size_t iTransformIndex, EventPrincipal const &, ParentContext const &, ServiceWeakToken const &) noexcept=0
static bool needToRunSelection(Worker const *iWorker) noexcept
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker) noexcept
std::exception_ptr cached_exception_
virtual bool implDoPrePrefetchSelection(StreamID, EventPrincipal const &, ModuleCallingContext const *)=0
static SerialTaskQueue * enableGlobalQueue(Worker *) noexcept
virtual std::vector< ESRecordIndex > const & esRecordsToGetFrom(Transition) const =0
void emitPostModuleGlobalPrefetchingSignal()
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker) noexcept
virtual bool implDo(EventTransitionInfo const &, ModuleCallingContext const *)=0
virtual void selectInputProcessBlocks(ProductRegistry const &, ProcessBlockHelperBase const &)=0
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker) noexcept
bool resume()
Resumes processing if the queue was paused.
std::exception_ptr runModuleDirectly(typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *) noexcept
virtual bool wantsProcessBlocks() const noexcept=0
static bool needToRunSelection(Worker const *iWorker) noexcept
EnableQueueGuard(EnableQueueGuard &&iGuard)
static SerialTaskQueue * enableGlobalQueue(Worker *) noexcept
Worker & operator=(Worker const &)=delete
static bool needToRunSelection(Worker const *iWorker) noexcept
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker) noexcept
edm::WaitingTaskList waitingTasks_
T::TransitionInfoType m_transitionInfo
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker) noexcept
virtual void implRegisterThinnedAssociations(ProductRegistry const &, ThinnedAssociationsHelper &)=0
ServiceWeakToken m_serviceToken
virtual bool implDoStreamBegin(StreamID, RunTransitionInfo const &, ModuleCallingContext const *)=0
virtual void resolvePutIndicies(BranchType iBranchType, std::unordered_multimap< std::string, std::tuple< TypeID const *, const char *, edm::ProductResolverIndex >> const &iIndicies)=0
std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr, typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *) noexcept
OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > Arg
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
static bool wantsTransition(Worker const *iWorker) noexcept
void endStream(StreamID, StreamContext const &)
EnableQueueGuard(SerialTaskQueue *iQueue)
virtual std::vector< ConsumesInfo > consumesInfo() const =0
void add(oneapi::tbb::task_group *, WaitingTask *)
Adds task to the waiting list.
GlobalContext const * getGlobalContext() const noexcept(false)
void registerThinnedAssociations(ProductRegistry const ®istry, ThinnedAssociationsHelper &helper)
virtual bool implNeedToRunSelection() const noexcept=0
#define CMS_THREAD_GUARD(_var_)
int timesExcept() const noexcept
Worker(ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
virtual void itemsToGetForSelection(std::vector< ProductResolverIndexAndSkipBit > &) const =0
FunctorWaitingTask< F > * make_waiting_task(F f)
std::atomic< State > state_
ModuleDescription const * description() const noexcept
def template(fileName, svg, replaceme="REPLACEME")
void callWhenDoneAsync(WaitingTaskHolder task)
WaitingTaskWithArenaHolder m_holder
static bool needToRunSelection(Worker const *iWorker) noexcept
static bool wantsTransition(Worker const *iWorker) noexcept
static bool wantsTransition(Worker const *iWorker) noexcept
virtual std::vector< ESResolverIndex > const & esItemsToGetFrom(Transition) const =0
void prefetchAsync(WaitingTaskHolder, ServiceToken const &, ParentContext const &, typename T::TransitionInfoType const &, Transition) noexcept
static bool call(Worker *iWorker, StreamID, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
virtual void implEndJob()=0
virtual bool wantsGlobalLuminosityBlocks() const noexcept=0
virtual bool wantsStreamRuns() const noexcept=0
ServiceToken lock() const
static bool wantsTransition(Worker const *iWorker) noexcept
ServiceWeakToken m_serviceToken
RunModuleTask(Worker *worker, typename T::TransitionInfoType const &transitionInfo, ServiceToken const &token, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context, oneapi::tbb::task_group *iGroup) noexcept
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, EventTransitionInfo const &info, Transition transition) noexcept
virtual Types moduleType() const =0
static bool call(Worker *iWorker, StreamID, ProcessBlockTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
TaskQueueAdaptor(LimitedTaskQueue *iLimited)
OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > Arg
StreamContext const * getStreamContext() const noexcept(false)
static SerialTaskQueue * enableGlobalQueue(Worker *) noexcept
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition) noexcept
virtual bool implDoEndProcessBlock(ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
virtual bool implDoStreamEnd(StreamID, RunTransitionInfo const &, ModuleCallingContext const *)=0
virtual TaskQueueAdaptor serializeRunModule()=0
static SerialTaskQueue * enableGlobalQueue(Worker *) noexcept
virtual SerialTaskQueue * globalRunsQueue()=0
virtual bool wantsStreamLuminosityBlocks() const noexcept=0
OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > Arg
HandleExternalWorkExceptionTask(Worker *worker, oneapi::tbb::task_group *group, WaitingTask *runModuleTask, ParentContext const &parentContext) noexcept
virtual void implBeginJob()=0
static void esPrefetchAsync(Worker *, WaitingTaskHolder, ServiceToken const &, ProcessBlockTransitionInfo const &, Transition) noexcept
void respondToOpenInputFile(FileBlock const &fb)
static bool needToRunSelection(Worker const *iWorker) noexcept
void respondToCloseOutputFile()
void prePrefetchSelectionAsync(oneapi::tbb::task_group &, WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *) noexcept
static bool needToRunSelection(Worker const *iWorker) noexcept
void skipOnPath(EventPrincipal const &iEvent)
static bool call(Worker *iWorker, StreamID id, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
static bool needToRunSelection(Worker const *iWorker) noexcept
void addContext(std::string const &context)
virtual void implBeginStream(StreamID)=0
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition) noexcept
virtual bool implDoAccessInputProcessBlock(ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
static SerialTaskQueue * enableGlobalQueue(Worker *iWorker) noexcept
void addedToPath() noexcept
virtual void implEndStream(StreamID)=0
virtual void doClearModule()=0
void respondToCloseInputFile(FileBlock const &fb)
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition) noexcept
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker) noexcept
static bool needToRunSelection(Worker const *iWorker) noexcept
virtual void convertCurrentProcessAlias(std::string const &processName)=0
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition) noexcept
bool runModule(typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
void emitPostModuleEventPrefetchingSignal()
ParentContext const m_parentContext
AcquireTask(Worker *, typename T::TransitionInfoType const &, ServiceToken const &, ParentContext const &, WaitingTaskWithArenaHolder) noexcept
edm::WaitingTaskList & waitingTaskList() noexcept
void checkForShouldTryToContinue(ModuleDescription const &)
std::atomic< int > timesPassed_
void doWorkAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *) noexcept
void runAcquire(EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder &)
void endJob(GlobalContext const &)
void postDoEvent(EventPrincipal const &)
static bool wantsTransition(Worker const *iWorker) noexcept
static SerialTaskQueue * enableGlobalQueue(Worker *) noexcept
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition) noexcept
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition) noexcept
static bool call(Worker *iWorker, StreamID id, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
static bool wantsTransition(Worker const *iWorker) noexcept
static bool call(Worker *iWorker, StreamID, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
int timesPass() const noexcept
void esPrefetchAsync(WaitingTaskHolder, EventSetupImpl const &, Transition, ServiceToken const &) noexcept
auto wrap(F iFunc) -> decltype(iFunc())
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
static uInt32 F(BLOWFISH_CTX *ctx, uInt32 x)
std::atomic< int > timesRun_
static bool wantsTransition(Worker const *iWorker) noexcept
static bool needToRunSelection(Worker const *iWorker) noexcept
static bool call(Worker *iWorker, StreamID, ProcessBlockTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
static SerialTaskQueue * enableGlobalQueue(Worker *) noexcept
static bool wantsTransition(Worker const *iWorker) noexcept
virtual void implRespondToOpenInputFile(FileBlock const &fb)=0
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker) noexcept
void beginJob(GlobalContext const &)
OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > Arg
static bool wantsTransition(Worker const *iWorker) noexcept
State state() const noexcept
virtual size_t transformIndex(edm::BranchDescription const &) const noexcept=0
virtual SerialTaskQueue * globalLuminosityBlocksQueue()=0
virtual ConcurrencyTypes moduleConcurrencyType() const =0
virtual bool wantsGlobalRuns() const noexcept=0
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition) noexcept
BranchType const & branchType() const
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
static bool needToRunSelection(Worker const *iWorker) noexcept
bool shouldTryToContinue_
static SerialTaskQueue * enableGlobalQueue(Worker *) noexcept
virtual void modulesWhoseProductsAreConsumed(std::array< std::vector< ModuleDescription const *> *, NumBranchTypes > &modules, std::vector< ModuleProcessName > &modulesInPreviousProcesses, ProductRegistry const &preg, std::map< std::string, ModuleDescription const *> const &labelsToDesc) const =0
virtual void itemsMayGet(BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0