1 #ifndef FWCore_Framework_Worker_h 2 #define FWCore_Framework_Worker_h 63 #include <unordered_map> 68 class ProductResolverIndexHelper;
69 class ProductResolverIndexAndSkipBit;
72 class ProductRegistry;
73 class ThinnedAssociationsHelper;
75 namespace workerhelper {
91 operator bool() {
return serial_ !=
nullptr or limited_ !=
nullptr; }
118 virtual bool wantsGlobalRuns()
const = 0;
119 virtual bool wantsGlobalLuminosityBlocks()
const = 0;
120 virtual bool wantsStreamRuns()
const = 0;
121 virtual bool wantsStreamLuminosityBlocks()
const = 0;
126 template <
typename T>
127 bool doWork(
typename T::MyPrincipal
const&,
EventSetup const&
c,
130 typename T::Context
const* context);
140 void const*) {assert(
false);}
142 template <
typename T>
144 typename T::MyPrincipal
const&,
EventSetup const& c,
147 typename T::Context
const* context);
149 template <
typename T>
151 typename T::MyPrincipal
const&,
156 typename T::Context
const* context);
158 template <
typename T>
159 std::exception_ptr runModuleDirectly(
typename T::MyPrincipal
const& ep,
163 typename T::Context
const* context);
166 waitingTasks_.add(task);
179 cached_exception_ = std::exception_ptr();
181 waitingTasks_.reset();
182 workStarted_ =
false;
183 numberOfPathsLeftToRun_ = numberOfPathsOn_;
192 void setActivityRegistry(std::shared_ptr<ActivityRegistry> areg);
197 virtual void updateLookup(
BranchType iBranchType,
199 virtual void resolvePutIndicies(
BranchType iBranchType,
200 std::unordered_multimap<
std::string, std::tuple<TypeID const*, const char*, edm::ProductResolverIndex>>
const& iIndicies) = 0;
202 virtual void modulesWhoseProductsAreConsumed(std::vector<ModuleDescription const*>&
modules,
204 std::map<std::string, ModuleDescription const*>
const& labelsToDesc)
const = 0;
208 virtual std::vector<ConsumesInfo> consumesInfo()
const = 0;
210 virtual Types moduleType()
const =0;
213 timesRun_.store(0,std::memory_order_release);
214 timesVisited_.store(0,std::memory_order_release);
215 timesPassed_.store(0,std::memory_order_release);
216 timesFailed_.store(0,std::memory_order_release);
217 timesExcept_.store(0,std::memory_order_release);
224 int timesRun()
const {
return timesRun_.load(std::memory_order_acquire); }
225 int timesVisited()
const {
return timesVisited_.load(std::memory_order_acquire); }
226 int timesPassed()
const {
return timesPassed_.load(std::memory_order_acquire); }
227 int timesFailed()
const {
return timesFailed_.load(std::memory_order_acquire); }
228 int timesExcept()
const {
return timesExcept_.load(std::memory_order_acquire); }
233 virtual bool hasAccumulator()
const = 0;
241 virtual void itemsToGetForSelection(std::vector<ProductResolverIndexAndSkipBit>&)
const = 0;
242 virtual bool implNeedToRunSelection()
const = 0;
248 virtual bool implDoPrePrefetchSelection(
StreamID id,
267 virtual void implBeginJob() = 0;
268 virtual void implEndJob() = 0;
269 virtual void implBeginStream(
StreamID) = 0;
270 virtual void implEndStream(
StreamID) = 0;
278 template <
typename T>
279 bool runModule(
typename T::MyPrincipal
const&,
EventSetup const& c,
282 typename T::Context
const* context);
284 virtual void itemsToGet(
BranchType, std::vector<ProductResolverIndexAndSkipBit>&)
const = 0;
285 virtual void itemsMayGet(
BranchType, std::vector<ProductResolverIndexAndSkipBit>&)
const = 0;
287 virtual std::vector<ProductResolverIndexAndSkipBit>
const& itemsToGetFrom(
BranchType)
const = 0;
290 virtual std::vector<ProductResolverIndex>
const& itemsShouldPutInEvent()
const = 0;
294 virtual void implRespondToOpenInputFile(
FileBlock const&
fb) = 0;
295 virtual void implRespondToCloseInputFile(
FileBlock const& fb) = 0;
315 template<
typename T>
320 std::ostringstream iost;
329 bool shouldRethrowException(std::exception_ptr iPtr,
334 template<
bool IS_EVENT>
337 timesPassed_.fetch_add(1,std::memory_order_relaxed);
343 template<
bool IS_EVENT>
346 timesFailed_.fetch_add(1,std::memory_order_relaxed);
352 template<
bool IS_EVENT>
355 timesExcept_.fetch_add(1,std::memory_order_relaxed);
357 cached_exception_ = iException;
359 return cached_exception_;
368 actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),moduleCallingContext_);
371 virtual bool hasAcquire()
const = 0;
374 std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr
const * iEPtr,
375 typename T::MyPrincipal
const& ep,
379 typename T::Context
const* context);
386 void runAcquireAfterAsyncPrefetch(std::exception_ptr
const* iEPtr,
392 std::exception_ptr handleExternalWorkException(std::exception_ptr
const* iEPtr,
395 template<
typename T>
399 typename T::MyPrincipal
const& ep,
404 typename T::Context
const* context):
408 m_streamID(streamID),
409 m_parentContext(parentContext),
411 m_serviceToken(token) {}
429 std::exception_ptr temp_excptr;
430 auto excptr = exceptionPtr();
431 if(T::isEvent_ && !m_worker->hasAcquire()) {
434 m_worker->emitPostModuleEventPrefetchingSignal();
436 temp_excptr = std::current_exception();
438 excptr = &temp_excptr;
444 if(
auto queue = m_worker->serializeRunModule()) {
448 &es, streamID = m_streamID,
449 parentContext = m_parentContext,
450 sContext = m_context, serviceToken = m_serviceToken]()
459 std::exception_ptr* ptr =
nullptr;
460 worker->template runModuleAfterAsyncPrefetch<T>(ptr,
470 gQueue->push( [
queue,gQueue,
f]()
mutable {
480 m_worker->runModuleAfterAsyncPrefetch<
T>(excptr,
503 template <
typename T,
typename DUMMY =
void>
507 typename T::MyPrincipal
const& ep,
512 tbb::task*
execute()
override {
return nullptr; }
515 template <
typename DUMMY>
527 m_parentContext(parentContext),
529 m_serviceToken(token) {}
537 std::exception_ptr temp_excptr;
538 auto excptr = exceptionPtr();
541 m_worker->emitPostModuleEventPrefetchingSignal();
543 temp_excptr = std::current_exception();
545 excptr = &temp_excptr;
550 if(
auto queue = m_worker->serializeRunModule()) {
554 &es, parentContext = m_parentContext,
555 serviceToken = m_serviceToken, holder = m_holder]()
560 std::exception_ptr* ptr =
nullptr;
561 worker->runAcquireAfterAsyncPrefetch(ptr,
571 m_worker->runAcquireAfterAsyncPrefetch(excptr,
625 std::atomic<
bool> workStarted_;
626 bool ranAcquireWithoutException_;
630 template <
typename T>
631 class ModuleSignalSentry {
634 typename T::Context
const* context,
636 a_(a), context_(context), moduleCallingContext_(moduleCallingContext) {
638 if(a_) T::preModuleSignal(a_, context, moduleCallingContext_);
641 ~ModuleSignalSentry() {
642 if(a_) T::postModuleSignal(a_, context_, moduleCallingContext_);
647 typename T::Context
const* context_;
653 namespace workerhelper {
664 return iWorker->
implDo(ep,es, mcc);
686 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
708 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
730 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
751 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
773 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
794 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
816 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
837 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
851 template <
typename T>
853 typename T::MyPrincipal
const& ep,
858 typename T::Context
const* context) {
863 waitingTasks_.add(task);
865 timesVisited_.fetch_add(1,std::memory_order_relaxed);
868 bool expected =
false;
869 if(workStarted_.compare_exchange_strong(expected,
true)) {
877 this, ep,es,token,streamID,parentContext,context);
885 auto p = m_task.load();
892 auto t = m_task.load();
893 m_task.store(
nullptr);
897 std::atomic<edm::WaitingTask*> m_task;
900 auto ownRunTask = std::make_shared<DestroyTask>(runTask);
901 auto selectionTask =
make_waiting_task(tbb::task::allocate_root(), [ownRunTask,parentContext,&ep,token,
this] (std::exception_ptr
const* )
mutable {
904 prefetchAsync(ownRunTask->release(), token, parentContext, ep);
906 prePrefetchSelectionAsync(selectionTask,token,streamID, &ep);
909 this, ep, es, token, streamID, parentContext, context);
910 if (T::isEvent_ && hasAcquire()) {
912 new (tbb::task::allocate_root())
917 this, ep, es, token, parentContext,
std::move(runTaskHolder));
919 prefetchAsync(moduleTask, token, parentContext, ep);
926 typename T::MyPrincipal
const& ep,
930 typename T::Context
const* context) {
931 std::exception_ptr exceptionPtr;
935 if(shouldRethrowException(*iEPtr, parentContext, T::isEvent_, idValue)) {
936 exceptionPtr = *iEPtr;
937 setException<T::isEvent_>(exceptionPtr);
939 setPassed<T::isEvent_>();
944 runModule<T>(ep,es,streamID,parentContext,context);
946 exceptionPtr = std::current_exception();
949 waitingTasks_.doneWaiting(exceptionPtr);
953 template <
typename T>
955 typename T::MyPrincipal
const&
principal,
960 typename T::Context
const* context) {
964 waitingTasks_.add(task);
965 bool expected =
false;
966 if(workStarted_.compare_exchange_strong(expected,
true)) {
968 auto toDo =[
this, &
principal, &es, streamID,parentContext,context, serviceToken]()
970 std::exception_ptr exceptionPtr;
981 exceptionPtr = std::current_exception();
983 this->waitingTasks_.doneWaiting(exceptionPtr);
985 if(
auto queue = this->serializeRunModule()) {
989 tbb::task::spawn(*task);
994 template <
typename T>
999 typename T::Context
const* context) {
1002 timesVisited_.fetch_add(1,std::memory_order_relaxed);
1008 case Pass:
return true;
1009 case Fail:
return false;
1011 std::rethrow_exception(cached_exception_);
1015 bool expected =
false;
1016 if(not workStarted_.compare_exchange_strong(expected,
true) ) {
1019 waitTask->increment_ref_count();
1021 waitingTasks_.add(waitTask.get());
1023 waitTask->wait_for_all();
1026 case Ready: assert(
false);
1027 case Pass:
return true;
1028 case Fail:
return false;
1030 std::rethrow_exception(cached_exception_);
1039 std::unique_ptr<ModuleCallingContext, decltype(resetContext)> prefetchSentry(&moduleCallingContext_,resetContext);
1045 waitTask->set_ref_count(2);
1046 prePrefetchSelectionAsync(waitTask.get(),
1049 waitTask->decrement_ref_count();
1050 waitTask->wait_for_all();
1052 if(state() !=
Ready) {
1062 auto sentryFunc = [
this](
void*) {
1063 emitPostModuleEventPrefetchingSignal();
1065 std::unique_ptr<ActivityRegistry, decltype(sentryFunc)> signalSentry(actReg_.get(),sentryFunc);
1068 waitTask->set_ref_count(2);
1071 waitTask->decrement_ref_count();
1072 waitTask->wait_for_all();
1074 if(waitTask->exceptionPtr() !=
nullptr) {
1076 if(shouldRethrowException(*waitTask->exceptionPtr(), parentContext, T::isEvent_, idValue)) {
1077 setException<T::isEvent_>(*waitTask->exceptionPtr());
1078 waitingTasks_.doneWaiting(cached_exception_);
1079 std::rethrow_exception(cached_exception_);
1081 setPassed<T::isEvent_>();
1082 waitingTasks_.doneWaiting(
nullptr);
1089 prefetchSentry.release();
1090 if(
auto queue = serializeRunModule()) {
1092 queue.pushAndWait([&]() {
1096 rc = runModule<T>(ep,es,streamID,parentContext,context);
1102 rc = runModule<T>(ep,es,streamID,parentContext,context);
1106 if(state_ == Exception) {
1107 waitingTasks_.doneWaiting(cached_exception_);
1108 std::rethrow_exception(cached_exception_);
1111 waitingTasks_.doneWaiting(
nullptr);
1116 template <
typename T>
1121 typename T::Context
const* context) {
1128 timesRun_.fetch_add(1,std::memory_order_relaxed);
1138 setPassed<T::isEvent_>();
1140 setFailed<T::isEvent_>();
1146 if(shouldRethrowException(std::current_exception(), parentContext, T::isEvent_, idValue)) {
1147 assert(not cached_exception_);
1148 setException<T::isEvent_>(std::current_exception());
1149 std::rethrow_exception(cached_exception_);
1151 rc = setPassed<T::isEvent_>();
1158 template <
typename T>
1163 typename T::Context
const* context) {
1165 timesVisited_.fetch_add(1,std::memory_order_relaxed);
1166 std::exception_ptr
const* prefetchingException =
nullptr;
1167 return runModuleAfterAsyncPrefetch<T>(prefetchingException, ep, es, streamID, parentContext, context);
void push(T &&iAction)
asynchronously pushes functor iAction into queue
static SerialTaskQueue * enableGlobalQueue(Worker *)
static bool call(Worker *iWorker, StreamID, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
tbb::task * execute() override
std::atomic< int > timesVisited_
static SerialTaskQueue * pauseGlobalQueue(Worker *)
ModuleDescription const & description() const
virtual bool implDoStreamBegin(StreamID id, RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
T::Context const * m_context
bool runModule(typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
void callWhenDoneAsync(WaitingTask *task)
T::MyPrincipal const & m_principal
std::atomic< int > numberOfPathsLeftToRun_
std::exception_ptr setException(std::exception_ptr iException)
virtual bool wantsGlobalRuns() const =0
static bool needToRunSelection(Worker const *iWorker)
static SerialTaskQueue * enableGlobalQueue(Worker *)
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
static bool wantsTransition(Worker const *iWorker)
static bool call(Worker *iWorker, StreamID id, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin > Arg
static bool needToRunSelection(Worker const *iWorker)
ParentContext const m_parentContext
ParentContext const m_parentContext
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
static SerialTaskQueue * enableGlobalQueue(Worker *)
std::atomic< int > timesExcept_
static bool wantsTransition(Worker const *iWorker)
EventPrincipal const & m_principal
static bool needToRunSelection(Worker const *iWorker)
OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > Arg
std::atomic< int > timesFailed_
WaitingTask * m_runModuleTask
bool doWork(typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
static bool call(Worker *iWorker, StreamID, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
TaskQueueAdaptor(SerialTaskQueueChain *iChain)
static bool needToRunSelection(Worker const *iWorker)
static bool call(Worker *iWorker, StreamID, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
static SerialTaskQueue * enableGlobalQueue(Worker *)
static bool call(Worker *iWorker, StreamID, EventPrincipal const &ep, EventSetup const &es, ActivityRegistry *, ModuleCallingContext const *mcc, Arg::Context const *)
void prePrefetchSelectionAsync(WaitingTask *task, ServiceToken const &, StreamID stream, void const *)
void push(T &&iAction)
asynchronously pushes functor iAction into queue
ActivityRegistry * activityRegistry()
void pushAndWait(T &&iAction)
synchronously pushes functor iAction into queue
static bool call(Worker *iWorker, StreamID id, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
void exceptionContext(std::ostream &, GlobalContext const &)
ExceptionToActionTable const * actions_
ServiceToken presentToken() const
void doWorkNoPrefetchingAsync(WaitingTask *task, typename T::MyPrincipal const &, EventSetup const &c, ServiceToken const &token, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
std::exception_ptr runModuleDirectly(typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
ModuleCallingContext moduleCallingContext_
tbb::task * execute() override
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
AcquireTask(Worker *worker, typename T::MyPrincipal const &ep, EventSetup const &es, ServiceToken const &token, ParentContext const &parentContext, WaitingTaskWithArenaHolder holder)
static bool needToRunSelection(Worker const *iWorker)
static bool needToRunSelection(Worker const *iWorker)
bool resume()
Resumes processing if the queue was paused.
static bool wantsTransition(Worker const *iWorker)
EnableQueueGuard(EnableQueueGuard &&iGuard)
virtual bool implDoEnd(RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
static bool needToRunSelection(Worker const *iWorker)
virtual bool implNeedToRunSelection() const =0
static SerialTaskQueue * enableGlobalQueue(Worker *)
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
ServiceToken m_serviceToken
OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > Arg
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
ModuleDescription const * descPtr() const
virtual bool implDoBegin(RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
EnableQueueGuard(SerialTaskQueue *iQueue)
void registerThinnedAssociations(ProductRegistry const ®istry, ThinnedAssociationsHelper &helper)
static ServiceRegistry & instance()
static bool wantsTransition(Worker const *iWorker)
#define CMS_THREAD_GUARD(_var_)
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd > Arg
virtual bool wantsStreamLuminosityBlocks() const =0
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
tbb::task * execute() override
std::atomic< State > state_
static SerialTaskQueue * enableGlobalQueue(Worker *)
static bool call(Worker *iWorker, StreamID id, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
static bool call(Worker *iWorker, StreamID, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
static SerialTaskQueue * enableGlobalQueue(Worker *)
WaitingTaskWithArenaHolder m_holder
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd > Arg
TransitionIDValue(T const &iP)
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
static bool wantsTransition(Worker const *iWorker)
static bool wantsTransition(Worker const *iWorker)
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
static bool wantsTransition(Worker const *iWorker)
static bool wantsTransition(Worker const *iWorker)
TaskQueueAdaptor(LimitedTaskQueue *iLimited)
OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > Arg
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
std::string value() const override
virtual SerialTaskQueue * globalRunsQueue()=0
OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > Arg
std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr const *iEPtr, typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
void respondToOpenInputFile(FileBlock const &fb)
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
virtual bool wantsGlobalLuminosityBlocks() const =0
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
static SerialTaskQueue * enableGlobalQueue(Worker *iWorker)
AcquireTask(Worker *worker, EventPrincipal const &ep, EventSetup const &es, ServiceToken const &token, ParentContext const &parentContext, WaitingTaskWithArenaHolder holder)
void doWorkAsync(WaitingTask *task, typename T::MyPrincipal const &, EventSetup const &c, ServiceToken const &token, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
static bool call(Worker *iWorker, StreamID id, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
void respondToCloseInputFile(FileBlock const &fb)
virtual bool wantsStreamRuns() const =0
virtual ~TransitionIDValueBase()
void emitPostModuleEventPrefetchingSignal()
virtual bool implDo(EventPrincipal const &, EventSetup const &c, ModuleCallingContext const *mcc)=0
ParentContext const m_parentContext
static SerialTaskQueue * enableGlobalQueue(Worker *iWorker)
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
std::atomic< int > timesPassed_
static bool needToRunSelection(Worker const *iWorker)
void pushAndWait(T &&iAction)
synchronously pushes functor iAction into queue
static bool needToRunSelection(Worker const *iWorker)
virtual bool implDoStreamEnd(StreamID id, RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
auto wrap(F iFunc) -> decltype(iFunc())
static bool wantsTransition(Worker const *iWorker)
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin > Arg
static uInt32 F(BLOWFISH_CTX *ctx, uInt32 x)
std::atomic< int > timesRun_
RunModuleTask(Worker *worker, typename T::MyPrincipal const &ep, EventSetup const &es, ServiceToken const &token, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > Arg
ServiceToken m_serviceToken
virtual SerialTaskQueue * globalLuminosityBlocksQueue()=0