1 #ifndef FWCore_Framework_Worker_h 2 #define FWCore_Framework_Worker_h 63 #include <unordered_map> 68 class ProductResolverIndexHelper;
69 class ProductResolverIndexAndSkipBit;
72 class ProductRegistry;
73 class ThinnedAssociationsHelper;
75 namespace workerhelper {
91 operator bool() {
return serial_ !=
nullptr or limited_ !=
nullptr; }
118 virtual bool wantsGlobalRuns()
const = 0;
119 virtual bool wantsGlobalLuminosityBlocks()
const = 0;
120 virtual bool wantsStreamRuns()
const = 0;
121 virtual bool wantsStreamLuminosityBlocks()
const = 0;
123 template <
typename T>
124 bool doWork(
typename T::MyPrincipal
const&,
EventSetup const&
c,
127 typename T::Context
const* context);
135 void const*) {assert(
false);}
137 template <
typename T>
139 typename T::MyPrincipal
const&,
EventSetup const& c,
142 typename T::Context
const* context);
144 template <
typename T>
146 typename T::MyPrincipal
const&,
150 typename T::Context
const* context);
152 template <
typename T>
153 std::exception_ptr runModuleDirectly(
typename T::MyPrincipal
const& ep,
157 typename T::Context
const* context);
160 waitingTasks_.add(task);
173 cached_exception_ = std::exception_ptr();
175 waitingTasks_.reset();
176 workStarted_ =
false;
177 numberOfPathsLeftToRun_ = numberOfPathsOn_;
186 void setActivityRegistry(std::shared_ptr<ActivityRegistry> areg);
191 virtual void updateLookup(
BranchType iBranchType,
193 virtual void resolvePutIndicies(
BranchType iBranchType,
194 std::unordered_multimap<
std::string, std::tuple<TypeID const*, const char*, edm::ProductResolverIndex>>
const& iIndicies) = 0;
196 virtual void modulesWhoseProductsAreConsumed(std::vector<ModuleDescription const*>&
modules,
198 std::map<std::string, ModuleDescription const*>
const& labelsToDesc)
const = 0;
202 virtual std::vector<ConsumesInfo> consumesInfo()
const = 0;
204 virtual Types moduleType()
const =0;
207 timesRun_.store(0,std::memory_order_release);
208 timesVisited_.store(0,std::memory_order_release);
209 timesPassed_.store(0,std::memory_order_release);
210 timesFailed_.store(0,std::memory_order_release);
211 timesExcept_.store(0,std::memory_order_release);
218 int timesRun()
const {
return timesRun_.load(std::memory_order_acquire); }
219 int timesVisited()
const {
return timesVisited_.load(std::memory_order_acquire); }
220 int timesPassed()
const {
return timesPassed_.load(std::memory_order_acquire); }
221 int timesFailed()
const {
return timesFailed_.load(std::memory_order_acquire); }
222 int timesExcept()
const {
return timesExcept_.load(std::memory_order_acquire); }
233 virtual void itemsToGetForSelection(std::vector<ProductResolverIndexAndSkipBit>&)
const = 0;
234 virtual bool implNeedToRunSelection()
const = 0;
240 virtual bool implDoPrePrefetchSelection(
StreamID id,
259 virtual void implBeginJob() = 0;
260 virtual void implEndJob() = 0;
261 virtual void implBeginStream(
StreamID) = 0;
262 virtual void implEndStream(
StreamID) = 0;
270 template <
typename T>
271 bool runModule(
typename T::MyPrincipal
const&,
EventSetup const& c,
274 typename T::Context
const* context);
276 virtual void itemsToGet(
BranchType, std::vector<ProductResolverIndexAndSkipBit>&)
const = 0;
277 virtual void itemsMayGet(
BranchType, std::vector<ProductResolverIndexAndSkipBit>&)
const = 0;
279 virtual std::vector<ProductResolverIndexAndSkipBit>
const& itemsToGetFrom(
BranchType)
const = 0;
282 virtual std::vector<ProductResolverIndex>
const& itemsShouldPutInEvent()
const = 0;
286 virtual void implRespondToOpenInputFile(
FileBlock const&
fb) = 0;
287 virtual void implRespondToCloseInputFile(
FileBlock const& fb) = 0;
307 template<
typename T>
312 std::ostringstream iost;
321 bool shouldRethrowException(std::exception_ptr iPtr,
326 template<
bool IS_EVENT>
329 timesPassed_.fetch_add(1,std::memory_order_relaxed);
335 template<
bool IS_EVENT>
338 timesFailed_.fetch_add(1,std::memory_order_relaxed);
344 template<
bool IS_EVENT>
347 timesExcept_.fetch_add(1,std::memory_order_relaxed);
349 cached_exception_ = iException;
351 return cached_exception_;
359 actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),moduleCallingContext_);
362 virtual bool hasAcquire()
const = 0;
365 std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr
const * iEPtr,
366 typename T::MyPrincipal
const& ep,
370 typename T::Context
const* context);
377 void runAcquireAfterAsyncPrefetch(std::exception_ptr
const* iEPtr,
383 std::exception_ptr handleExternalWorkException(std::exception_ptr
const* iEPtr,
386 template<
typename T>
390 typename T::MyPrincipal
const& ep,
394 typename T::Context
const* context):
398 m_streamID(streamID),
399 m_parentContext(parentContext),
409 std::exception_ptr temp_excptr;
410 auto excptr = exceptionPtr();
411 if(T::isEvent_ && !m_worker->hasAcquire()) {
414 m_worker->emitPostModuleEventPrefetchingSignal();
416 temp_excptr = std::current_exception();
418 excptr = &temp_excptr;
424 if(
auto queue = m_worker->serializeRunModule()) {
428 &es, streamID = m_streamID,
429 parentContext = m_parentContext,
430 sContext = m_context, serviceToken = m_serviceToken]()
435 std::exception_ptr* ptr =
nullptr;
436 worker->template runModuleAfterAsyncPrefetch<T>(ptr,
447 m_worker->runModuleAfterAsyncPrefetch<
T>(excptr,
470 template <
typename T,
typename DUMMY =
void>
474 typename T::MyPrincipal
const& ep,
478 tbb::task*
execute()
override {
return nullptr; }
481 template <
typename DUMMY>
492 m_parentContext(parentContext),
502 std::exception_ptr temp_excptr;
503 auto excptr = exceptionPtr();
506 m_worker->emitPostModuleEventPrefetchingSignal();
508 temp_excptr = std::current_exception();
510 excptr = &temp_excptr;
515 if(
auto queue = m_worker->serializeRunModule()) {
519 &es, parentContext = m_parentContext,
520 serviceToken = m_serviceToken, holder = m_holder]()
525 std::exception_ptr* ptr =
nullptr;
526 worker->runAcquireAfterAsyncPrefetch(ptr,
536 m_worker->runAcquireAfterAsyncPrefetch(excptr,
590 std::atomic<
bool> workStarted_;
591 bool ranAcquireWithoutException_;
595 template <
typename T>
596 class ModuleSignalSentry {
599 typename T::Context
const* context,
601 a_(a), context_(context), moduleCallingContext_(moduleCallingContext) {
603 if(a_) T::preModuleSignal(a_, context, moduleCallingContext_);
606 ~ModuleSignalSentry() {
607 if(a_) T::postModuleSignal(a_, context_, moduleCallingContext_);
612 typename T::Context
const* context_;
618 namespace workerhelper {
629 return iWorker->
implDo(ep,es, mcc);
648 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
667 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
686 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
705 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
725 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
744 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
764 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
783 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
795 template <
typename T>
797 typename T::MyPrincipal
const& ep,
801 typename T::Context
const* context) {
806 waitingTasks_.add(task);
808 timesVisited_.fetch_add(1,std::memory_order_relaxed);
811 bool expected =
false;
812 if(workStarted_.compare_exchange_strong(expected,
true)) {
820 this, ep,es,streamID,parentContext,context);
828 auto p = m_task.load();
835 auto t = m_task.load();
836 m_task.store(
nullptr);
840 std::atomic<edm::WaitingTask*> m_task;
843 auto ownRunTask = std::make_shared<DestroyTask>(runTask);
845 auto selectionTask =
make_waiting_task(tbb::task::allocate_root(), [ownRunTask,parentContext,&ep,token,
this] (std::exception_ptr
const* )
mutable {
848 prefetchAsync(ownRunTask->release(), parentContext, ep);
850 prePrefetchSelectionAsync(selectionTask,streamID, &ep);
853 this, ep, es, streamID, parentContext, context);
854 if (T::isEvent_ && hasAcquire()) {
856 new (tbb::task::allocate_root())
861 this, ep, es, parentContext,
std::move(runTaskHolder));
863 prefetchAsync(moduleTask, parentContext, ep);
870 typename T::MyPrincipal
const& ep,
874 typename T::Context
const* context) {
875 std::exception_ptr exceptionPtr;
879 if(shouldRethrowException(*iEPtr, parentContext, T::isEvent_, idValue)) {
880 exceptionPtr = *iEPtr;
881 setException<T::isEvent_>(exceptionPtr);
883 setPassed<T::isEvent_>();
888 runModule<T>(ep,es,streamID,parentContext,context);
890 exceptionPtr = std::current_exception();
893 waitingTasks_.doneWaiting(exceptionPtr);
897 template <
typename T>
899 typename T::MyPrincipal
const&
principal,
903 typename T::Context
const* context) {
907 waitingTasks_.add(task);
908 bool expected =
false;
909 if(workStarted_.compare_exchange_strong(expected,
true)) {
912 auto toDo =[
this, &
principal, &es, streamID,parentContext,context, serviceToken]()
914 std::exception_ptr exceptionPtr;
925 exceptionPtr = std::current_exception();
927 this->waitingTasks_.doneWaiting(exceptionPtr);
929 if(
auto queue = this->serializeRunModule()) {
933 tbb::task::spawn(*task);
938 template <
typename T>
943 typename T::Context
const* context) {
946 timesVisited_.fetch_add(1,std::memory_order_relaxed);
952 case Pass:
return true;
953 case Fail:
return false;
955 std::rethrow_exception(cached_exception_);
959 bool expected =
false;
960 if(not workStarted_.compare_exchange_strong(expected,
true) ) {
963 waitTask->increment_ref_count();
965 waitingTasks_.add(waitTask.get());
967 waitTask->wait_for_all();
970 case Ready: assert(
false);
971 case Pass:
return true;
972 case Fail:
return false;
974 std::rethrow_exception(cached_exception_);
983 std::unique_ptr<ModuleCallingContext, decltype(resetContext)> prefetchSentry(&moduleCallingContext_,resetContext);
989 waitTask->set_ref_count(2);
990 prePrefetchSelectionAsync(waitTask.get(), streamID, &ep);
991 waitTask->decrement_ref_count();
992 waitTask->wait_for_all();
994 if(state() !=
Ready) {
1004 auto sentryFunc = [
this](
void*) {
1005 emitPostModuleEventPrefetchingSignal();
1007 std::unique_ptr<ActivityRegistry, decltype(sentryFunc)> signalSentry(actReg_.get(),sentryFunc);
1010 waitTask->set_ref_count(2);
1012 prefetchAsync(waitTask.get(),parentContext, ep);
1013 waitTask->decrement_ref_count();
1014 waitTask->wait_for_all();
1016 if(waitTask->exceptionPtr() !=
nullptr) {
1018 if(shouldRethrowException(*waitTask->exceptionPtr(), parentContext, T::isEvent_, idValue)) {
1019 setException<T::isEvent_>(*waitTask->exceptionPtr());
1020 waitingTasks_.doneWaiting(cached_exception_);
1021 std::rethrow_exception(cached_exception_);
1023 setPassed<T::isEvent_>();
1024 waitingTasks_.doneWaiting(
nullptr);
1031 prefetchSentry.release();
1032 if(
auto queue = serializeRunModule()) {
1034 queue.pushAndWait([&]() {
1038 rc = runModule<T>(ep,es,streamID,parentContext,context);
1044 rc = runModule<T>(ep,es,streamID,parentContext,context);
1048 if(state_ == Exception) {
1049 waitingTasks_.doneWaiting(cached_exception_);
1050 std::rethrow_exception(cached_exception_);
1053 waitingTasks_.doneWaiting(
nullptr);
1058 template <
typename T>
1063 typename T::Context
const* context) {
1070 timesRun_.fetch_add(1,std::memory_order_relaxed);
1080 setPassed<T::isEvent_>();
1082 setFailed<T::isEvent_>();
1088 if(shouldRethrowException(std::current_exception(), parentContext, T::isEvent_, idValue)) {
1089 assert(not cached_exception_);
1090 setException<T::isEvent_>(std::current_exception());
1091 std::rethrow_exception(cached_exception_);
1093 rc = setPassed<T::isEvent_>();
1100 template <
typename T>
1105 typename T::Context
const* context) {
1107 timesVisited_.fetch_add(1,std::memory_order_relaxed);
1108 std::exception_ptr
const* prefetchingException =
nullptr;
1109 return runModuleAfterAsyncPrefetch<T>(prefetchingException, ep, es, streamID, parentContext, context);
void push(T &&iAction)
asynchronously pushes functor iAction into queue
static bool call(Worker *iWorker, StreamID, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
tbb::task * execute() override
std::atomic< int > timesVisited_
RunModuleTask(Worker *worker, typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
ModuleDescription const & description() const
virtual bool implDoStreamBegin(StreamID id, RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
T::Context const * m_context
bool runModule(typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
void callWhenDoneAsync(WaitingTask *task)
T::MyPrincipal const & m_principal
std::atomic< int > numberOfPathsLeftToRun_
std::exception_ptr setException(std::exception_ptr iException)
virtual bool wantsGlobalRuns() const =0
static bool needToRunSelection(Worker const *iWorker)
static bool wantsTransition(Worker const *iWorker)
static bool call(Worker *iWorker, StreamID id, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin > Arg
static bool needToRunSelection(Worker const *iWorker)
static PFTauRenderPlugin instance
ParentContext const m_parentContext
ParentContext const m_parentContext
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
std::atomic< int > timesExcept_
static bool wantsTransition(Worker const *iWorker)
EventPrincipal const & m_principal
static bool needToRunSelection(Worker const *iWorker)
OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > Arg
std::atomic< int > timesFailed_
WaitingTask * m_runModuleTask
bool doWork(typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
void prePrefetchSelectionAsync(WaitingTask *task, StreamID stream, void const *)
static bool call(Worker *iWorker, StreamID, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
TaskQueueAdaptor(SerialTaskQueueChain *iChain)
static bool needToRunSelection(Worker const *iWorker)
static bool call(Worker *iWorker, StreamID, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
static bool call(Worker *iWorker, StreamID, EventPrincipal const &ep, EventSetup const &es, ActivityRegistry *, ModuleCallingContext const *mcc, Arg::Context const *)
void push(T &&iAction)
asynchronously pushes functor iAction into queue
ActivityRegistry * activityRegistry()
void pushAndWait(T &&iAction)
synchronously pushes functor iAction into queue
static bool call(Worker *iWorker, StreamID id, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
void exceptionContext(std::ostream &, GlobalContext const &)
ExceptionToActionTable const * actions_
ServiceToken presentToken() const
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
std::exception_ptr runModuleDirectly(typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
ModuleCallingContext moduleCallingContext_
tbb::task * execute() override
static bool needToRunSelection(Worker const *iWorker)
static bool needToRunSelection(Worker const *iWorker)
static bool wantsTransition(Worker const *iWorker)
virtual bool implDoEnd(RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
static bool needToRunSelection(Worker const *iWorker)
virtual bool implNeedToRunSelection() const =0
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
ServiceToken m_serviceToken
OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > Arg
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
ModuleDescription const * descPtr() const
virtual bool implDoBegin(RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
void registerThinnedAssociations(ProductRegistry const ®istry, ThinnedAssociationsHelper &helper)
static ServiceRegistry & instance()
static bool wantsTransition(Worker const *iWorker)
#define CMS_THREAD_GUARD(_var_)
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd > Arg
virtual bool wantsStreamLuminosityBlocks() const =0
tbb::task * execute() override
std::atomic< State > state_
static bool call(Worker *iWorker, StreamID id, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
static bool call(Worker *iWorker, StreamID, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
WaitingTaskWithArenaHolder m_holder
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd > Arg
TransitionIDValue(T const &iP)
static bool wantsTransition(Worker const *iWorker)
static bool wantsTransition(Worker const *iWorker)
static bool wantsTransition(Worker const *iWorker)
static bool wantsTransition(Worker const *iWorker)
TaskQueueAdaptor(LimitedTaskQueue *iLimited)
OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > Arg
std::string value() const override
void doWorkNoPrefetchingAsync(WaitingTask *task, typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > Arg
std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr const *iEPtr, typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
void respondToOpenInputFile(FileBlock const &fb)
virtual bool wantsGlobalLuminosityBlocks() const =0
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
AcquireTask(Worker *worker, EventPrincipal const &ep, EventSetup const &es, ParentContext const &parentContext, WaitingTaskWithArenaHolder holder)
static bool call(Worker *iWorker, StreamID id, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
void respondToCloseInputFile(FileBlock const &fb)
virtual bool wantsStreamRuns() const =0
virtual ~TransitionIDValueBase()
void emitPostModuleEventPrefetchingSignal()
virtual bool implDo(EventPrincipal const &, EventSetup const &c, ModuleCallingContext const *mcc)=0
ParentContext const m_parentContext
std::atomic< int > timesPassed_
static bool needToRunSelection(Worker const *iWorker)
void pushAndWait(T &&iAction)
synchronously pushes functor iAction into queue
static bool needToRunSelection(Worker const *iWorker)
virtual bool implDoStreamEnd(StreamID id, RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
auto wrap(F iFunc) -> decltype(iFunc())
AcquireTask(Worker *worker, typename T::MyPrincipal const &ep, EventSetup const &es, ParentContext const &parentContext, WaitingTaskWithArenaHolder holder)
void doWorkAsync(WaitingTask *task, typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
static bool wantsTransition(Worker const *iWorker)
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin > Arg
static uInt32 F(BLOWFISH_CTX *ctx, uInt32 x)
std::atomic< int > timesRun_
OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > Arg
ServiceToken m_serviceToken