1 #ifndef FWCore_Framework_Worker_h 2 #define FWCore_Framework_Worker_h 70 #include <unordered_map> 76 class ModuleProcessName;
77 class ProductResolverIndexHelper;
78 class ProductResolverIndexAndSkipBit;
81 class ProductRegistry;
82 class ThinnedAssociationsHelper;
84 namespace workerhelper {
88 namespace eventsetup {
108 void push(oneapi::tbb::task_group& iG,
F&& iF) {
149 template <
typename T>
151 typename T::TransitionInfoType
const&,
155 typename T::Context
const*) noexcept;
157 template <
typename T>
159 typename T::TransitionInfoType
const&,
163 typename T::Context
const*) noexcept;
165 template <
typename T>
169 typename T::Context
const*) noexcept;
173 size_t iTransformIndex,
219 std::unordered_multimap<
std::string, std::tuple<TypeID const*, const char*, edm::ProductResolverIndex>>
const&
224 std::vector<ModuleProcessName>& modulesInPreviousProcesses,
226 std::map<std::string, ModuleDescription const*>
const& labelsToDesc)
const = 0;
230 virtual std::vector<ConsumesInfo>
consumesInfo()
const = 0;
236 timesRun_.store(0, std::memory_order_release);
260 template <
typename O>
276 size_t iTransformIndex,
304 template <
typename T>
317 Principal const& iPrincipal)
const noexcept = 0;
330 bool isTryToContinue)
const noexcept;
333 template <
bool IS_EVENT>
342 template <
bool IS_EVENT>
351 template <
bool IS_EVENT>
361 template <
typename T>
365 typename T::TransitionInfoType
const&,
393 typename
T::TransitionInfoType
const&,
412 typename T::TransitionInfoType
const& transitionInfo,
416 typename T::Context
const*
context,
417 oneapi::tbb::task_group* iGroup) noexcept
419 m_transitionInfo(transitionInfo),
420 m_streamID(streamID),
421 m_parentContext(parentContext),
423 m_serviceToken(
token),
429 EnableQueueGuard(EnableQueueGuard
const&) =
delete;
430 EnableQueueGuard&
operator=(EnableQueueGuard
const&) =
delete;
431 EnableQueueGuard&
operator=(EnableQueueGuard&&) =
delete;
446 std::exception_ptr temp_excptr;
447 auto excptr = exceptionPtr();
449 if (!m_worker->hasAcquire()) {
453 m_worker->emitPostModuleEventPrefetchingSignal();
455 temp_excptr = std::current_exception();
457 excptr = temp_excptr;
461 }
else if constexpr (std::is_same_v<typename T::Context, StreamContext>) {
462 m_worker->emitPostModuleStreamPrefetchingSignal();
463 }
else if constexpr (std::is_same_v<typename T::Context, GlobalContext>) {
464 m_worker->emitPostModuleGlobalPrefetchingSignal();
468 if (
auto queue = m_worker->serializeRunModule()) {
469 auto f = [worker = m_worker,
470 info = m_transitionInfo,
471 streamID = m_streamID,
472 parentContext = m_parentContext,
473 sContext = m_context,
474 serviceToken = m_serviceToken]() {
482 std::exception_ptr ptr;
483 worker->template runModuleAfterAsyncPrefetch<T>(ptr,
info, streamID, parentContext, sContext);
488 gQueue->push(*m_group, [
queue, gQueue,
f,
group = m_group]()
mutable {
499 m_worker->runModuleAfterAsyncPrefetch<
T>(excptr, m_transitionInfo, m_streamID, m_parentContext, m_context);
516 template <
typename T,
typename DUMMY =
void>
520 typename T::TransitionInfoType
const&,
527 template <
typename DUMMY>
536 m_eventTransitionInfo(eventTransitionInfo),
537 m_parentContext(parentContext),
539 m_serviceToken(
token) {}
547 std::exception_ptr temp_excptr;
552 m_worker->emitPostModuleEventPrefetchingSignal();
554 temp_excptr = std::current_exception();
556 excptr = temp_excptr;
561 if (
auto queue = m_worker->serializeRunModule()) {
562 queue.push(*m_holder.group(),
564 info = m_eventTransitionInfo,
565 parentContext = m_parentContext,
566 serviceToken = m_serviceToken,
567 holder = m_holder]() {
571 std::exception_ptr ptr;
572 worker->runAcquireAfterAsyncPrefetch(ptr,
info, parentContext, holder);
578 m_worker->runAcquireAfterAsyncPrefetch(excptr, m_eventTransitionInfo, m_parentContext,
std::move(m_holder));
595 oneapi::tbb::task_group*
group,
635 template <
typename T>
636 class ModuleSignalSentry {
639 typename T::Context
const*
context,
641 : a_(
a), context_(
context), moduleCallingContext_(moduleCallingContext) {}
643 ~ModuleSignalSentry() {
650 T::postModuleSignal(a_, context_, moduleCallingContext_);
655 void preModuleSignal() {
660 ex.
addContext(
"Handling pre module signal, likely in a service function immediately before module method");
665 void postModuleSignal() {
674 ex.
addContext(
"Handling post module signal, likely in a service function immediately after module method");
682 typename T::Context
const* context_;
688 namespace workerhelper {
707 worker->esPrefetchAsync(waitingTask,
info.eventSetupImpl(), transition,
token);
726 ModuleSignalSentry<Arg> cpp(actReg,
context, mcc);
729 cpp.preModuleSignal();
734 cpp.postModuleSignal();
743 worker->esPrefetchAsync(waitingTask,
info.eventSetupImpl(), transition,
token);
760 ModuleSignalSentry<Arg> cpp(actReg,
context, mcc);
761 cpp.preModuleSignal();
763 cpp.postModuleSignal();
772 worker->esPrefetchAsync(waitingTask,
info.eventSetupImpl(), transition,
token);
792 ModuleSignalSentry<Arg> cpp(actReg,
context, mcc);
793 cpp.preModuleSignal();
795 cpp.postModuleSignal();
805 worker->esPrefetchAsync(waitingTask,
info.eventSetupImpl(), transition,
token);
825 ModuleSignalSentry<Arg> cpp(actReg,
context, mcc);
826 cpp.preModuleSignal();
828 cpp.postModuleSignal();
838 worker->esPrefetchAsync(waitingTask,
info.eventSetupImpl(), transition,
token);
856 ModuleSignalSentry<Arg> cpp(actReg,
context, mcc);
857 cpp.preModuleSignal();
859 cpp.postModuleSignal();
868 worker->esPrefetchAsync(waitingTask,
info.eventSetupImpl(), transition,
token);
873 return iWorker->globalLuminosityBlocksQueue();
887 ModuleSignalSentry<Arg> cpp(actReg,
context, mcc);
888 cpp.preModuleSignal();
890 cpp.postModuleSignal();
899 worker->esPrefetchAsync(waitingTask,
info.eventSetupImpl(), transition,
token);
920 ModuleSignalSentry<Arg> cpp(actReg,
context, mcc);
921 cpp.preModuleSignal();
923 cpp.postModuleSignal();
933 worker->esPrefetchAsync(waitingTask,
info.eventSetupImpl(), transition,
token);
939 return iWorker->globalLuminosityBlocksQueue();
955 ModuleSignalSentry<Arg> cpp(actReg,
context, mcc);
956 cpp.preModuleSignal();
958 cpp.postModuleSignal();
968 worker->esPrefetchAsync(waitingTask,
info.eventSetupImpl(), transition,
token);
985 ModuleSignalSentry<Arg> cpp(actReg,
context, mcc);
986 cpp.preModuleSignal();
988 cpp.postModuleSignal();
1008 ModuleSignalSentry<Arg> cpp(actReg,
context, mcc);
1009 cpp.preModuleSignal();
1011 cpp.postModuleSignal();
1031 ModuleSignalSentry<Arg> cpp(actReg,
context, mcc);
1032 cpp.preModuleSignal();
1034 cpp.postModuleSignal();
1046 template <
typename T>
1050 typename T::TransitionInfoType
const& transitionInfo,
1052 Principal const& principal = transitionInfo.principal();
1057 actReg_->preModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
1058 }
else if constexpr (std::is_same_v<typename T::Context, StreamContext>) {
1059 actReg_->preModuleStreamPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
1060 }
else if constexpr (std::is_same_v<typename T::Context, GlobalContext>) {
1061 actReg_->preModuleGlobalPrefetchingSignal_.emit(*moduleCallingContext_.getGlobalContext(), moduleCallingContext_);
1065 edPrefetchAsync(iTask,
token, principal);
1068 preActionBeforeRunEventAsync(iTask, moduleCallingContext_, principal);
1072 template <
typename T>
1074 typename T::TransitionInfoType
const& transitionInfo,
1078 typename T::Context
const*
context) noexcept {
1084 bool expected =
false;
1085 bool workStarted = workStarted_.compare_exchange_strong(expected,
true);
1087 waitingTasks_.add(
task);
1089 timesVisited_.fetch_add(1, std::memory_order_relaxed);
1103 struct DestroyTask {
1106 ~DestroyTask() noexcept {
1107 auto p = m_task.exchange(
nullptr);
1116 std::atomic<edm::WaitingTask*> m_task;
1120 auto ownRunTask = std::make_shared<DestroyTask>(moduleTask);
1124 [
this, weakToken, transitionInfo, parentContext, ownRunTask,
group](std::exception_ptr
const* iExcept) {
1127 AcquireTask<T> t(
this, transitionInfo, weakToken.lock(), parentContext, runTaskHolder);
1133 auto ownModuleTask = std::make_shared<DestroyTask>(moduleTask);
1135 auto selectionTask =
1137 std::exception_ptr
const*)
mutable {
1145 prePrefetchSelectionAsync(*
group, selectionTask,
token, streamID, &transitionInfo.principal());
1162 template <
typename T>
1164 typename T::TransitionInfoType
const& transitionInfo,
1167 typename T::Context
const*
context) noexcept {
1168 std::exception_ptr exceptionPtr;
1169 bool shouldRun =
true;
1171 if (shouldRethrowException(iEPtr, parentContext, T::isEvent_, shouldTryToContinue_)) {
1172 exceptionPtr = iEPtr;
1173 setException<T::isEvent_>(exceptionPtr);
1176 if (not shouldTryToContinue_) {
1177 setPassed<T::isEvent_>();
1184 CMS_SA_ALLOW try { runModule<T>(transitionInfo, streamID, parentContext,
context); }
catch (...) {
1185 exceptionPtr = std::current_exception();
1190 waitingTasks_.doneWaiting(exceptionPtr);
1191 return exceptionPtr;
1194 template <
typename T>
1196 typename T::TransitionInfoType
const& transitionInfo,
1200 typename T::Context
const*
context) noexcept {
1206 bool expected =
false;
1207 auto workStarted = workStarted_.compare_exchange_strong(expected,
true);
1209 waitingTasks_.add(
task);
1212 auto toDo = [
this,
info = transitionInfo, streamID, parentContext,
context, weakToken]() {
1213 std::exception_ptr exceptionPtr;
1219 this->runModule<T>(
info, streamID, parentContext,
context);
1221 exceptionPtr = std::current_exception();
1223 this->waitingTasks_.doneWaiting(exceptionPtr);
1226 if (needsESPrefetching(T::transition_)) {
1228 auto afterPrefetch =
1231 this->waitingTasks_.doneWaiting(*iExcept);
1233 if (
auto queue = this->serializeRunModule()) {
1242 WaitingTaskHolder(*
group, afterPrefetch), transitionInfo.eventSetupImpl(), T::transition_, serviceToken);
1245 if (
auto queue = this->serializeRunModule()) {
1254 template <
typename T>
1258 typename T::Context
const*
context) {
1265 timesRun_.fetch_add(1, std::memory_order_relaxed);
1275 setPassed<T::isEvent_>();
1277 setFailed<T::isEvent_>();
1284 setException<T::isEvent_>(std::current_exception());
1287 rc = setPassed<T::isEvent_>();
1294 template <
typename T>
1298 typename T::Context
const*
context) noexcept {
1299 timesVisited_.fetch_add(1, std::memory_order_relaxed);
1300 std::exception_ptr prefetchingException;
1301 return runModuleAfterAsyncPrefetch<T>(prefetchingException, transitionInfo, streamID, parentContext,
context);
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition) noexcept
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, bool isTryToContinue) const noexcept
std::atomic< int > timesVisited_
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker) noexcept
virtual void implDoAcquire(EventTransitionInfo const &, ModuleCallingContext const *, WaitingTaskWithArenaHolder &)=0
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
virtual bool implDoBeginProcessBlock(ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
void resetModuleDescription(ModuleDescription const *)
void push(oneapi::tbb::task_group &iG, F &&iF)
T::Context const * m_context
static void esPrefetchAsync(Worker *, WaitingTaskHolder, ServiceToken const &, ProcessBlockTransitionInfo const &, Transition) noexcept
static bool call(Worker *iWorker, StreamID, EventTransitionInfo const &info, ActivityRegistry *, ModuleCallingContext const *mcc, Arg::Context const *)
static bool wantsTransition(Worker const *iWorker) noexcept
std::atomic< int > numberOfPathsLeftToRun_
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
unsigned int ProductResolverIndex
std::exception_ptr setException(std::exception_ptr iException)
static bool call(Worker *iWorker, StreamID, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
void doWorkNoPrefetchingAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *) noexcept
void setEarlyDeleteHelper(EarlyDeleteHelper *iHelper)
int timesPassed() const noexcept
std::exception_ptr exceptionPtr() const noexcept
Returns exception thrown by dependent task.
static bool call(Worker *iWorker, StreamID, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
static SerialTaskQueue * enableGlobalQueue(Worker *) noexcept
AcquireTask(Worker *worker, EventTransitionInfo const &eventTransitionInfo, ServiceToken const &token, ParentContext const &parentContext, WaitingTaskWithArenaHolder holder) noexcept
int timesVisited() const noexcept
int timesFailed() const noexcept
static SerialTaskQueue * enableGlobalQueue(Worker *iWorker) noexcept
static void esPrefetchAsync(Worker *, WaitingTaskHolder, ServiceToken const &, ProcessBlockTransitionInfo const &, Transition) noexcept
ParentContext const m_parentContext
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker) noexcept
static bool call(Worker *iWorker, StreamID, ProcessBlockTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
ParentContext const m_parentContext
std::atomic< int > timesExcept_
oneapi::tbb::task_group * m_group
std::atomic< bool > workStarted_
virtual bool implDoEnd(RunTransitionInfo const &, ModuleCallingContext const *)=0
OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > Arg
std::atomic< int > timesFailed_
void emitPostModuleStreamPrefetchingSignal()
bool needsESPrefetching(Transition iTrans) const noexcept
WaitingTask * m_runModuleTask
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker) noexcept
void doTransformAsync(WaitingTaskHolder, size_t iTransformIndex, EventPrincipal const &, ServiceToken const &, StreamID, ModuleCallingContext const &, StreamContext const *) noexcept
virtual void implRespondToCloseInputFile(FileBlock const &fb)=0
std::shared_ptr< ActivityRegistry > actReg_
void clearCounters() noexcept
static bool wantsTransition(Worker const *iWorker) noexcept
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker) noexcept
static SerialTaskQueue * pauseGlobalQueue(Worker *) noexcept
static bool needToRunSelection(Worker const *iWorker) noexcept
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom(BranchType) const =0
int timesRun() const noexcept
TaskQueueAdaptor(SerialTaskQueueChain *iChain)
virtual bool hasAcquire() const noexcept=0
static bool wantsTransition(Worker const *iWorker) noexcept
static SerialTaskQueue * enableGlobalQueue(Worker *) noexcept
virtual void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const &)=0
virtual void implRespondToCloseOutputFile()=0
virtual ProductResolverIndex itemToGetForTransform(size_t iTransformIndex) const noexcept=0
ActivityRegistry * activityRegistry()
static bool call(Worker *iWorker, StreamID id, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
void beginStream(StreamID id, StreamContext &streamContext)
static bool needToRunSelection(Worker const *iWorker) noexcept
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
EventTransitionInfo m_eventTransitionInfo
TaskQueueAdaptor()=default
void reset()
Resets access to the resource so that added tasks will wait.
SerialTaskQueueChain * serial_
static bool call(Worker *iWorker, StreamID id, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
virtual std::string workerType() const =0
ModuleDescription const * moduleDescription() const noexcept
ExceptionToActionTable const * actions_
void runAcquireAfterAsyncPrefetch(std::exception_ptr, EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder) noexcept
static SerialTaskQueue * enableGlobalQueue(Worker *) noexcept
virtual void preActionBeforeRunEventAsync(WaitingTaskHolder iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const noexcept=0
LimitedTaskQueue * limited_
bool ranAcquireWithoutException_
virtual bool hasAccumulator() const noexcept=0
void setActivityRegistry(std::shared_ptr< ActivityRegistry > areg)
virtual bool implDoBegin(RunTransitionInfo const &, ModuleCallingContext const *)=0
virtual void itemsToGet(BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
oneapi::tbb::task_group * m_group
ModuleCallingContext moduleCallingContext_
virtual bool wantsInputProcessBlocks() const noexcept=0
void edPrefetchAsync(WaitingTaskHolder, ServiceToken const &, Principal const &) const noexcept
std::exception_ptr handleExternalWorkException(std::exception_ptr iEPtr, ParentContext const &parentContext) noexcept
virtual void implDoTransformAsync(WaitingTaskHolder, size_t iTransformIndex, EventPrincipal const &, ParentContext const &, ServiceWeakToken const &) noexcept=0
static bool needToRunSelection(Worker const *iWorker) noexcept
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker) noexcept
std::exception_ptr cached_exception_
virtual bool implDoPrePrefetchSelection(StreamID, EventPrincipal const &, ModuleCallingContext const *)=0
static SerialTaskQueue * enableGlobalQueue(Worker *) noexcept
virtual std::vector< ESRecordIndex > const & esRecordsToGetFrom(Transition) const =0
void emitPostModuleGlobalPrefetchingSignal()
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker) noexcept
virtual bool implDo(EventTransitionInfo const &, ModuleCallingContext const *)=0
virtual void selectInputProcessBlocks(ProductRegistry const &, ProcessBlockHelperBase const &)=0
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker) noexcept
bool resume()
Resumes processing if the queue was paused.
std::exception_ptr runModuleDirectly(typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *) noexcept
virtual bool wantsProcessBlocks() const noexcept=0
static bool needToRunSelection(Worker const *iWorker) noexcept
EnableQueueGuard(EnableQueueGuard &&iGuard)
static SerialTaskQueue * enableGlobalQueue(Worker *) noexcept
Worker & operator=(Worker const &)=delete
static bool needToRunSelection(Worker const *iWorker) noexcept
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker) noexcept
edm::WaitingTaskList waitingTasks_
T::TransitionInfoType m_transitionInfo
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker) noexcept
virtual void implRegisterThinnedAssociations(ProductRegistry const &, ThinnedAssociationsHelper &)=0
ServiceWeakToken m_serviceToken
virtual bool implDoStreamBegin(StreamID, RunTransitionInfo const &, ModuleCallingContext const *)=0
virtual void resolvePutIndicies(BranchType iBranchType, std::unordered_multimap< std::string, std::tuple< TypeID const *, const char *, edm::ProductResolverIndex >> const &iIndicies)=0
std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr, typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *) noexcept
OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > Arg
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
static bool wantsTransition(Worker const *iWorker) noexcept
EnableQueueGuard(SerialTaskQueue *iQueue)
virtual std::vector< ConsumesInfo > consumesInfo() const =0
void add(oneapi::tbb::task_group *, WaitingTask *)
Adds task to the waiting list.
GlobalContext const * getGlobalContext() const noexcept(false)
void registerThinnedAssociations(ProductRegistry const ®istry, ThinnedAssociationsHelper &helper)
virtual bool implNeedToRunSelection() const noexcept=0
#define CMS_THREAD_GUARD(_var_)
int timesExcept() const noexcept
Worker(ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
virtual void itemsToGetForSelection(std::vector< ProductResolverIndexAndSkipBit > &) const =0
FunctorWaitingTask< F > * make_waiting_task(F f)
std::atomic< State > state_
ModuleDescription const * description() const noexcept
def template(fileName, svg, replaceme="REPLACEME")
void callWhenDoneAsync(WaitingTaskHolder task)
WaitingTaskWithArenaHolder m_holder
static bool needToRunSelection(Worker const *iWorker) noexcept
static bool wantsTransition(Worker const *iWorker) noexcept
static bool wantsTransition(Worker const *iWorker) noexcept
virtual std::vector< ESResolverIndex > const & esItemsToGetFrom(Transition) const =0
void prefetchAsync(WaitingTaskHolder, ServiceToken const &, ParentContext const &, typename T::TransitionInfoType const &, Transition) noexcept
static bool call(Worker *iWorker, StreamID, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
virtual void implEndJob()=0
virtual bool wantsGlobalLuminosityBlocks() const noexcept=0
virtual bool wantsStreamRuns() const noexcept=0
ServiceToken lock() const
static bool wantsTransition(Worker const *iWorker) noexcept
ServiceWeakToken m_serviceToken
RunModuleTask(Worker *worker, typename T::TransitionInfoType const &transitionInfo, ServiceToken const &token, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context, oneapi::tbb::task_group *iGroup) noexcept
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, EventTransitionInfo const &info, Transition transition) noexcept
virtual Types moduleType() const =0
static bool call(Worker *iWorker, StreamID, ProcessBlockTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
TaskQueueAdaptor(LimitedTaskQueue *iLimited)
OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > Arg
StreamContext const * getStreamContext() const noexcept(false)
static SerialTaskQueue * enableGlobalQueue(Worker *) noexcept
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition) noexcept
virtual bool implDoEndProcessBlock(ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
virtual bool implDoStreamEnd(StreamID, RunTransitionInfo const &, ModuleCallingContext const *)=0
virtual TaskQueueAdaptor serializeRunModule()=0
static SerialTaskQueue * enableGlobalQueue(Worker *) noexcept
virtual SerialTaskQueue * globalRunsQueue()=0
virtual bool wantsStreamLuminosityBlocks() const noexcept=0
OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > Arg
HandleExternalWorkExceptionTask(Worker *worker, oneapi::tbb::task_group *group, WaitingTask *runModuleTask, ParentContext const &parentContext) noexcept
virtual void implBeginJob()=0
static void esPrefetchAsync(Worker *, WaitingTaskHolder, ServiceToken const &, ProcessBlockTransitionInfo const &, Transition) noexcept
void respondToOpenInputFile(FileBlock const &fb)
static bool needToRunSelection(Worker const *iWorker) noexcept
void respondToCloseOutputFile()
void prePrefetchSelectionAsync(oneapi::tbb::task_group &, WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *) noexcept
static bool needToRunSelection(Worker const *iWorker) noexcept
void skipOnPath(EventPrincipal const &iEvent)
static bool call(Worker *iWorker, StreamID id, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
static bool needToRunSelection(Worker const *iWorker) noexcept
void addContext(std::string const &context)
virtual void implBeginStream(StreamID)=0
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition) noexcept
virtual bool implDoAccessInputProcessBlock(ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
static SerialTaskQueue * enableGlobalQueue(Worker *iWorker) noexcept
void addedToPath() noexcept
virtual void implEndStream(StreamID)=0
virtual void doClearModule()=0
void respondToCloseInputFile(FileBlock const &fb)
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition) noexcept
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker) noexcept
static bool needToRunSelection(Worker const *iWorker) noexcept
virtual void convertCurrentProcessAlias(std::string const &processName)=0
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition) noexcept
bool runModule(typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
void emitPostModuleEventPrefetchingSignal()
ParentContext const m_parentContext
AcquireTask(Worker *, typename T::TransitionInfoType const &, ServiceToken const &, ParentContext const &, WaitingTaskWithArenaHolder) noexcept
edm::WaitingTaskList & waitingTaskList() noexcept
void checkForShouldTryToContinue(ModuleDescription const &)
std::atomic< int > timesPassed_
void doWorkAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *) noexcept
void runAcquire(EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder &)
void endStream(StreamID id, StreamContext &streamContext)
void postDoEvent(EventPrincipal const &)
static bool wantsTransition(Worker const *iWorker) noexcept
static SerialTaskQueue * enableGlobalQueue(Worker *) noexcept
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition) noexcept
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition) noexcept
static bool call(Worker *iWorker, StreamID id, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
static bool wantsTransition(Worker const *iWorker) noexcept
static bool call(Worker *iWorker, StreamID, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
int timesPass() const noexcept
void esPrefetchAsync(WaitingTaskHolder, EventSetupImpl const &, Transition, ServiceToken const &) noexcept
auto wrap(F iFunc) -> decltype(iFunc())
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
static uInt32 F(BLOWFISH_CTX *ctx, uInt32 x)
std::atomic< int > timesRun_
static bool wantsTransition(Worker const *iWorker) noexcept
static bool needToRunSelection(Worker const *iWorker) noexcept
static bool call(Worker *iWorker, StreamID, ProcessBlockTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
static SerialTaskQueue * enableGlobalQueue(Worker *) noexcept
static bool wantsTransition(Worker const *iWorker) noexcept
virtual void implRespondToOpenInputFile(FileBlock const &fb)=0
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker) noexcept
OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > Arg
static bool wantsTransition(Worker const *iWorker) noexcept
State state() const noexcept
virtual size_t transformIndex(edm::BranchDescription const &) const noexcept=0
virtual SerialTaskQueue * globalLuminosityBlocksQueue()=0
virtual ConcurrencyTypes moduleConcurrencyType() const =0
virtual bool wantsGlobalRuns() const noexcept=0
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition) noexcept
BranchType const & branchType() const
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
static bool needToRunSelection(Worker const *iWorker) noexcept
bool shouldTryToContinue_
static SerialTaskQueue * enableGlobalQueue(Worker *) noexcept
virtual void modulesWhoseProductsAreConsumed(std::array< std::vector< ModuleDescription const *> *, NumBranchTypes > &modules, std::vector< ModuleProcessName > &modulesInPreviousProcesses, ProductRegistry const &preg, std::map< std::string, ModuleDescription const *> const &labelsToDesc) const =0
virtual void itemsMayGet(BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0