|
|
Go to the documentation of this file. 1 #ifndef FWCore_Framework_Worker_h
2 #define FWCore_Framework_Worker_h
67 #include <unordered_map>
73 class ProductResolverIndexHelper;
74 class ProductResolverIndexAndSkipBit;
77 class ProductRegistry;
78 class ThinnedAssociationsHelper;
80 namespace workerhelper {
84 namespace eventsetup {
136 template <
typename T>
145 template <
typename T>
147 typename T::TransitionInfoType
const&,
151 typename T::Context
const*);
153 template <
typename T>
155 typename T::TransitionInfoType
const&,
159 typename T::Context
const*);
161 template <
typename T>
165 typename T::Context
const*);
201 std::unordered_multimap<
std::string, std::tuple<TypeID const*, const char*, edm::ProductResolverIndex>>
const&
205 std::vector<ModuleDescription const*>&
modules,
207 std::map<std::string, ModuleDescription const*>
const& labelsToDesc)
const = 0;
211 virtual std::vector<ConsumesInfo>
consumesInfo()
const = 0;
216 timesRun_.store(0, std::memory_order_release);
237 template <
typename O>
271 template <
typename T>
298 template <
bool IS_EVENT>
307 template <
bool IS_EVENT>
316 template <
bool IS_EVENT>
326 template <
typename T>
343 template <
typename T>
345 typename T::TransitionInfoType
const&,
348 typename T::Context
const*);
359 template <
typename T>
363 typename T::TransitionInfoType
const& transitionInfo,
367 typename T::Context
const* context)
395 std::exception_ptr temp_excptr;
397 if constexpr (T::isEvent_) {
404 temp_excptr = std::current_exception();
406 excptr = &temp_excptr;
427 std::exception_ptr* ptr =
nullptr;
428 worker->template runModuleAfterAsyncPrefetch<T>(ptr,
info, streamID, parentContext, sContext);
433 gQueue->push([
queue, gQueue,
f]()
mutable {
461 template <
typename T,
typename DUMMY =
void>
465 typename T::TransitionInfoType
const&,
472 template <
typename DUMMY>
481 m_eventTransitionInfo(eventTransitionInfo),
482 m_parentContext(parentContext),
484 m_serviceToken(
token) {}
492 std::exception_ptr temp_excptr;
497 m_worker->emitPostModuleEventPrefetchingSignal();
499 temp_excptr = std::current_exception();
501 excptr = &temp_excptr;
506 if (
auto queue = m_worker->serializeRunModule()) {
507 queue.push([worker = m_worker,
508 info = m_eventTransitionInfo,
509 parentContext = m_parentContext,
510 serviceToken = m_serviceToken,
511 holder = m_holder]() {
515 std::exception_ptr* ptr =
nullptr;
516 worker->runAcquireAfterAsyncPrefetch(ptr,
info, parentContext, holder);
522 m_worker->runAcquireAfterAsyncPrefetch(excptr, m_eventTransitionInfo, m_parentContext,
std::move(m_holder));
573 template <
typename T>
574 class ModuleSignalSentry {
577 typename T::Context
const* context,
579 : a_(
a), context_(context), moduleCallingContext_(moduleCallingContext) {
581 T::preModuleSignal(a_, context, moduleCallingContext_);
584 ~ModuleSignalSentry() {
586 T::postModuleSignal(a_, context_, moduleCallingContext_);
591 typename T::Context
const* context_;
597 namespace workerhelper {
635 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
660 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
685 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
710 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
736 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
761 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
787 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
812 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
837 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
857 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
877 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
889 template <
typename T>
893 typename T::TransitionInfoType
const& transitionInfo,
895 Principal const& principal = transitionInfo.principal();
904 iTask->increment_ref_count();
913 if (0 == iTask->decrement_ref_count()) {
915 tbb::task::spawn(*iTask);
919 template <
typename T>
921 typename T::TransitionInfoType
const& transitionInfo,
925 typename T::Context
const* context) {
931 bool expected =
false;
932 bool workStarted =
workStarted_.compare_exchange_strong(expected,
true);
935 if constexpr (T::isEvent_) {
946 auto runTask =
new (tbb::task::allocate_root())
954 auto p = m_task.load();
956 tbb::task::destroy(*
p);
961 auto t = m_task.load();
962 m_task.store(
nullptr);
966 std::atomic<edm::WaitingTask*> m_task;
969 auto ownRunTask = std::make_shared<DestroyTask>(runTask);
971 tbb::task::allocate_root(),
972 [ownRunTask, parentContext,
info = transitionInfo,
token,
this](std::exception_ptr
const*)
mutable {
974 prefetchAsync<T>(ownRunTask->release(),
token, parentContext,
info, T::transition_);
978 WaitingTask* moduleTask =
new (tbb::task::allocate_root())
980 if constexpr (T::isEvent_) {
984 moduleTask =
new (tbb::task::allocate_root())
988 prefetchAsync<T>(moduleTask,
token, parentContext, transitionInfo, T::transition_);
993 template <
typename T>
995 typename T::TransitionInfoType
const& transitionInfo,
998 typename T::Context
const* context) {
999 std::exception_ptr exceptionPtr;
1003 exceptionPtr = *iEPtr;
1004 setException<T::isEvent_>(exceptionPtr);
1006 setPassed<T::isEvent_>();
1011 CMS_SA_ALLOW try { runModule<T>(transitionInfo, streamID, parentContext, context); }
catch (...) {
1012 exceptionPtr = std::current_exception();
1016 return exceptionPtr;
1019 template <
typename T>
1021 typename T::TransitionInfoType
const& transitionInfo,
1025 typename T::Context
const* context) {
1031 bool expected =
false;
1032 auto workStarted =
workStarted_.compare_exchange_strong(expected,
true);
1036 auto toDo = [
this,
info = transitionInfo, streamID, parentContext, context, serviceToken]() {
1037 std::exception_ptr exceptionPtr;
1043 this->runModule<T>(
info, streamID, parentContext, context);
1045 exceptionPtr = std::current_exception();
1052 tbb::task::allocate_root(), [toDo =
std::move(toDo),
this](std::exception_ptr
const* iExcept) {
1060 tbb::task::spawn(*taskToDo);
1064 esPrefetchAsync(afterPrefetch, transitionInfo.eventSetupImpl(), T::transition_, serviceToken);
1070 tbb::task::spawn(*taskToDo);
1076 template <
typename T>
1080 typename T::Context
const* context) {
1081 if constexpr (T::isEvent_) {
1098 bool expected =
false;
1099 if (not
workStarted_.compare_exchange_strong(expected,
true)) {
1102 waitTask->increment_ref_count();
1106 waitTask->wait_for_all();
1129 if constexpr (T::isEvent_) {
1133 waitTask->set_ref_count(2);
1136 waitTask->decrement_ref_count();
1137 waitTask->wait_for_all();
1153 waitTask->set_ref_count(2);
1157 waitTask->decrement_ref_count();
1158 waitTask->wait_for_all();
1160 if (waitTask->exceptionPtr() !=
nullptr) {
1162 setException<T::isEvent_>(*waitTask->exceptionPtr());
1166 setPassed<T::isEvent_>();
1174 prefetchSentry.release();
1177 queue.pushAndWait([&]() {
1181 CMS_SA_ALLOW try { rc = runModule<T>(transitionInfo, streamID, parentContext, context); }
catch (...) {
1186 CMS_SA_ALLOW try { rc = runModule<T>(transitionInfo, streamID, parentContext, context); }
catch (...) {
1198 template <
typename T>
1202 typename T::Context
const* context) {
1208 if constexpr (T::isEvent_) {
1209 timesRun_.fetch_add(1, std::memory_order_relaxed);
1219 setPassed<T::isEvent_>();
1221 setFailed<T::isEvent_>();
1228 setException<T::isEvent_>(std::current_exception());
1231 rc = setPassed<T::isEvent_>();
1238 template <
typename T>
1242 typename T::Context
const* context) {
1244 std::exception_ptr
const* prefetchingException =
nullptr;
1245 return runModuleAfterAsyncPrefetch<T>(prefetchingException, transitionInfo, streamID, parentContext, context);
EnableQueueGuard & operator=(EnableQueueGuard const &)=delete
static bool wantsTransition(Worker const *iWorker)
static void esPrefetchAsync(Worker *worker, WaitingTask *waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition)
virtual std::vector< ConsumesInfo > consumesInfo() const =0
tbb::task * execute() override
static bool call(Worker *iWorker, StreamID id, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
static constexpr void esPrefetchAsync(Worker *, WaitingTask *, ServiceToken const &, ProcessBlockTransitionInfo const &, Transition)
virtual void itemsToGetForSelection(std::vector< ProductResolverIndexAndSkipBit > &) const =0
static bool call(Worker *iWorker, StreamID id, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
virtual bool implDoPrePrefetchSelection(StreamID, EventPrincipal const &, ModuleCallingContext const *)=0
HandleExternalWorkExceptionTask(Worker *worker, WaitingTask *runModuleTask, ParentContext const &parentContext)
OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > Arg
void esPrefetchAsync(WaitingTask *, EventSetupImpl const &, Transition, ServiceToken const &)
virtual void itemsMayGet(BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
void doWorkAsync(WaitingTask *, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
tbb::task * execute() override
virtual bool implDoAccessInputProcessBlock(ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
static SerialTaskQueue * enableGlobalQueue(Worker *)
OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > Arg
static SerialTaskQueue * enableGlobalQueue(Worker *)
bool runModule(typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
std::shared_ptr< ActivityRegistry > actReg_
virtual bool wantsGlobalRuns() const =0
virtual void implEndJob()=0
virtual void convertCurrentProcessAlias(std::string const &processName)=0
static bool needToRunSelection(Worker const *iWorker)
void prePrefetchSelectionAsync(WaitingTask *task, ServiceToken const &, StreamID stream, void const *)
std::atomic< int > timesPassed_
LimitedTaskQueue * limited_
std::exception_ptr runModuleDirectly(typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
virtual void preActionBeforeRunEventAsync(WaitingTask *iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const =0
bool resume()
Resumes processing if the queue was paused.
std::atomic< int > timesVisited_
void add(WaitingTask *)
Adds task to the waiting list.
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
virtual bool wantsGlobalLuminosityBlocks() const =0
ServiceToken m_serviceToken
void resetModuleDescription(ModuleDescription const *)
virtual bool wantsInputProcessBlocks() const =0
void postDoEvent(EventPrincipal const &)
ExceptionToActionTable const * actions_
virtual void implBeginJob()=0
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
void prefetchAsync(WaitingTask *, ServiceToken const &, ParentContext const &, typename T::TransitionInfoType const &, Transition)
static constexpr void esPrefetchAsync(Worker *, WaitingTask *, ServiceToken const &, ProcessBlockTransitionInfo const &, Transition)
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
Worker(ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
void respondToOpenInputFile(FileBlock const &fb)
std::exception_ptr setException(std::exception_ptr iException)
virtual void itemsToGet(BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
std::atomic< int > numberOfPathsLeftToRun_
void registerThinnedAssociations(ProductRegistry const ®istry, ThinnedAssociationsHelper &helper)
ServiceToken m_serviceToken
tbb::task * execute() override
static SerialTaskQueue * enableGlobalQueue(Worker *)
static bool call(Worker *iWorker, StreamID, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
ModuleDescription const * moduleDescription() const
const ParentContext m_parentContext
virtual bool implDoBegin(RunTransitionInfo const &, ModuleCallingContext const *)=0
OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > Arg
static bool needToRunSelection(Worker const *iWorker)
virtual bool wantsStreamLuminosityBlocks() const =0
static bool wantsTransition(Worker const *iWorker)
static bool needToRunSelection(Worker const *iWorker)
virtual void implEndStream(StreamID)=0
std::atomic< bool > workStarted_
virtual std::vector< ProductResolverIndex > const & itemsShouldPutInEvent() const =0
static SerialTaskQueue * enableGlobalQueue(Worker *)
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
Worker & operator=(Worker const &)=delete
void reset()
Resets access to the resource so that added tasks will wait.
static uInt32 F(BLOWFISH_CTX *ctx, uInt32 x)
virtual void implRespondToCloseInputFile(FileBlock const &fb)=0
static SerialTaskQueue * enableGlobalQueue(Worker *)
static bool wantsTransition(Worker const *iWorker)
virtual bool implDoStreamEnd(StreamID, RunTransitionInfo const &, ModuleCallingContext const *)=0
void pushAndWait(T &&iAction)
synchronously pushes functor iAction into queue
void runAcquireAfterAsyncPrefetch(std::exception_ptr const *, EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder)
virtual bool implDoBeginProcessBlock(ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
std::exception_ptr const * exceptionPtr() const
Returns exception thrown by dependent task.
static bool needToRunSelection(Worker const *iWorker)
void edPrefetchAsync(WaitingTask *, ServiceToken const &, Principal const &) const
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
void setActivityRegistry(std::shared_ptr< ActivityRegistry > areg)
static bool needToRunSelection(Worker const *iWorker)
std::atomic< int > timesRun_
static bool needToRunSelection(Worker const *iWorker)
void push(T &&iAction)
asynchronously pushes functor iAction into queue
void callWhenDoneAsync(WaitingTask *task)
AcquireTask(Worker *, typename T::TransitionInfoType const &, ServiceToken const &, ParentContext const &, WaitingTaskWithArenaHolder)
virtual void resolvePutIndicies(BranchType iBranchType, std::unordered_multimap< std::string, std::tuple< TypeID const *, const char *, edm::ProductResolverIndex >> const &iIndicies)=0
static bool wantsTransition(Worker const *iWorker)
auto wrap(F iFunc) -> decltype(iFunc())
static bool wantsTransition(Worker const *iWorker)
virtual void modulesWhoseProductsAreConsumed(std::vector< ModuleDescription const * > &modules, ProductRegistry const &preg, std::map< std::string, ModuleDescription const * > const &labelsToDesc) const =0
std::atomic< State > state_
BranchType const & branchType() const
void pushAndWait(T &&iAction)
synchronously pushes functor iAction into queue
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
ModuleDescription const * descPtr() const
const ParentContext m_parentContext
OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > Arg
static bool needToRunSelection(Worker const *iWorker)
AcquireTask(Worker *worker, EventTransitionInfo const &eventTransitionInfo, ServiceToken const &token, ParentContext const &parentContext, WaitingTaskWithArenaHolder holder)
static bool needToRunSelection(Worker const *iWorker)
static bool call(Worker *iWorker, StreamID, EventTransitionInfo const &info, ActivityRegistry *, ModuleCallingContext const *mcc, Arg::Context const *)
static bool call(Worker *iWorker, StreamID, ProcessBlockTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
std::atomic< int > timesExcept_
virtual bool implDoEnd(RunTransitionInfo const &, ModuleCallingContext const *)=0
static void esPrefetchAsync(Worker *worker, WaitingTask *waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition)
static bool needToRunSelection(Worker const *iWorker)
const ParentContext m_parentContext
static bool call(Worker *iWorker, StreamID id, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
static bool needToRunSelection(Worker const *iWorker)
static bool wantsTransition(Worker const *iWorker)
void runAcquire(EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder &)
virtual TaskQueueAdaptor serializeRunModule()=0
virtual void implDoAcquire(EventTransitionInfo const &, ModuleCallingContext const *, WaitingTaskWithArenaHolder &)=0
static bool wantsTransition(Worker const *iWorker)
virtual bool wantsProcessBlocks() const =0
static SerialTaskQueue * enableGlobalQueue(Worker *iWorker)
edm::WaitingTaskList waitingTasks_
static bool call(Worker *iWorker, StreamID, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
virtual void implRegisterThinnedAssociations(ProductRegistry const &, ThinnedAssociationsHelper &)=0
ServiceToken presentToken() const
StreamContext const * getStreamContext() const
static void esPrefetchAsync(Worker *worker, WaitingTask *waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition)
T::TransitionInfoType m_transitionInfo
static SerialTaskQueue * pauseGlobalQueue(Worker *)
WaitingTask * m_runModuleTask
void prePrefetchSelectionAsync(WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *)
static bool wantsTransition(Worker const *iWorker)
static SerialTaskQueue * enableGlobalQueue(Worker *)
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
RunModuleTask(Worker *worker, typename T::TransitionInfoType const &transitionInfo, ServiceToken const &token, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
void setEarlyDeleteHelper(EarlyDeleteHelper *iHelper)
bool ranAcquireWithoutException_
static void exceptionContext(cms::Exception &ex, ModuleCallingContext const *mcc)
void doWorkNoPrefetchingAsync(WaitingTask *, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
virtual void implRespondToOpenInputFile(FileBlock const &fb)=0
bool doWork(typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
TaskQueueAdaptor()=default
ModuleCallingContext moduleCallingContext_
OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > Arg
void respondToCloseInputFile(FileBlock const &fb)
static SerialTaskQueue * enableGlobalQueue(Worker *)
std::atomic< int > timesFailed_
static bool needToRunSelection(Worker const *iWorker)
static bool wantsTransition(Worker const *iWorker)
static void esPrefetchAsync(Worker *worker, WaitingTask *waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition)
ModuleDescription const & description() const
virtual bool implDo(EventTransitionInfo const &, ModuleCallingContext const *)=0
std::exception_ptr cached_exception_
static ServiceRegistry & instance()
bool needsESPrefetching(Transition iTrans) const noexcept
virtual std::vector< ESRecordIndex > const & esRecordsToGetFrom(Transition) const =0
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
virtual Types moduleType() const =0
std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr const *, typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
EnableQueueGuard(EnableQueueGuard &&iGuard)
virtual bool wantsStreamRuns() const =0
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom(BranchType) const =0
std::unique_ptr< T, impl::DeviceDeleter > unique_ptr
virtual bool hasAcquire() const =0
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
static bool wantsTransition(Worker const *iWorker)
static bool call(Worker *iWorker, StreamID, ProcessBlockTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
virtual SerialTaskQueue * globalRunsQueue()=0
static SerialTaskQueue * enableGlobalQueue(Worker *)
static bool call(Worker *iWorker, StreamID id, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
static bool call(Worker *iWorker, StreamID, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
static SerialTaskQueue * enableGlobalQueue(Worker *)
virtual void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const &)=0
TaskQueueAdaptor(SerialTaskQueueChain *iChain)
virtual bool hasAccumulator() const =0
static void esPrefetchAsync(Worker *worker, WaitingTask *waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition)
void skipOnPath(EventPrincipal const &iEvent)
ActivityRegistry * activityRegistry()
EnableQueueGuard(SerialTaskQueue *iQueue)
static SerialTaskQueue * enableGlobalQueue(Worker *iWorker)
virtual std::vector< ESProxyIndex > const & esItemsToGetFrom(Transition) const =0
SerialTaskQueueChain * serial_
virtual SerialTaskQueue * globalLuminosityBlocksQueue()=0
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
static constexpr void esPrefetchAsync(Worker *, WaitingTask *, ServiceToken const &, ProcessBlockTransitionInfo const &, Transition)
std::exception_ptr handleExternalWorkException(std::exception_ptr const *iEPtr, ParentContext const &parentContext)
virtual void implBeginStream(StreamID)=0
T::Context const * m_context
static bool call(Worker *iWorker, StreamID, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
void emitPostModuleEventPrefetchingSignal()
static void esPrefetchAsync(Worker *worker, WaitingTask *waitingTask, ServiceToken const &token, EventTransitionInfo const &info, Transition transition)
static SerialTaskQueue * enableGlobalQueue(Worker *)
tbb::task * execute() override
static void esPrefetchAsync(Worker *worker, WaitingTask *waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition)
WaitingTaskWithArenaHolder m_holder
static bool needToRunSelection(Worker const *iWorker)
static bool wantsTransition(Worker const *iWorker)
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
EventTransitionInfo m_eventTransitionInfo
#define CMS_THREAD_GUARD(_var_)
static bool wantsTransition(Worker const *iWorker)
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
void beginStream(StreamID id, StreamContext &streamContext)
virtual bool implNeedToRunSelection() const =0
static void esPrefetchAsync(Worker *worker, WaitingTask *waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition)
void push(T &&iAction)
asynchronously pushes functor iAction into queue
virtual bool implDoStreamBegin(StreamID, RunTransitionInfo const &, ModuleCallingContext const *)=0
void endStream(StreamID id, StreamContext &streamContext)
static void esPrefetchAsync(Worker *worker, WaitingTask *waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition)
virtual std::string workerType() const =0
virtual bool implDoEndProcessBlock(ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
TaskQueueAdaptor(LimitedTaskQueue *iLimited)
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent) const
static bool call(Worker *iWorker, StreamID, ProcessBlockTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)