1 #ifndef FWCore_Framework_Worker_h 2 #define FWCore_Framework_Worker_h 60 #include <unordered_map> 65 class ProductResolverIndexHelper;
66 class ProductResolverIndexAndSkipBit;
69 class ProductRegistry;
70 class ThinnedAssociationsHelper;
73 namespace workerhelper {
89 bool doWork(
typename T::MyPrincipal
const&,
EventSetup const&
c,
92 typename T::Context
const* context);
95 typename T::MyPrincipal
const&,
EventSetup const& c,
98 typename T::Context
const* context);
100 template <
typename T>
102 typename T::MyPrincipal
const&,
106 typename T::Context
const* context);
108 template <
typename T>
109 std::exception_ptr runModuleDirectly(
typename T::MyPrincipal
const& ep,
113 typename T::Context
const* context);
116 waitingTasks_.add(task);
129 cached_exception_ = std::exception_ptr();
131 waitingTasks_.reset();
132 workStarted_ =
false;
133 numberOfPathsLeftToRun_ = numberOfPathsOn_;
142 void setActivityRegistry(std::shared_ptr<ActivityRegistry> areg);
147 virtual void updateLookup(
BranchType iBranchType,
149 virtual void resolvePutIndicies(
BranchType iBranchType,
150 std::unordered_multimap<std::string, edm::ProductResolverIndex>
const& iIndicies) = 0;
152 virtual void modulesWhoseProductsAreConsumed(std::vector<ModuleDescription const*>&
modules,
154 std::map<std::string, ModuleDescription const*>
const& labelsToDesc)
const = 0;
158 virtual std::vector<ConsumesInfo> consumesInfo()
const = 0;
160 virtual Types moduleType()
const =0;
163 timesRun_.store(0,std::memory_order_release);
164 timesVisited_.store(0,std::memory_order_release);
165 timesPassed_.store(0,std::memory_order_release);
166 timesFailed_.store(0,std::memory_order_release);
167 timesExcept_.store(0,std::memory_order_release);
174 int timesRun()
const {
return timesRun_.load(std::memory_order_acquire); }
175 int timesVisited()
const {
return timesVisited_.load(std::memory_order_acquire); }
176 int timesPassed()
const {
return timesPassed_.load(std::memory_order_acquire); }
177 int timesFailed()
const {
return timesFailed_.load(std::memory_order_acquire); }
178 int timesExcept()
const {
return timesExcept_.load(std::memory_order_acquire); }
188 virtual bool implDoPrePrefetchSelection(
StreamID id,
207 virtual void implBeginJob() = 0;
208 virtual void implEndJob() = 0;
209 virtual void implBeginStream(
StreamID) = 0;
210 virtual void implEndStream(
StreamID) = 0;
218 template <
typename T>
219 bool runModule(
typename T::MyPrincipal
const&,
EventSetup const& c,
222 typename T::Context
const* context);
224 virtual void itemsToGet(
BranchType, std::vector<ProductResolverIndexAndSkipBit>&)
const = 0;
225 virtual void itemsMayGet(
BranchType, std::vector<ProductResolverIndexAndSkipBit>&)
const = 0;
227 virtual std::vector<ProductResolverIndexAndSkipBit>
const& itemsToGetFrom(
BranchType)
const = 0;
230 virtual std::vector<ProductResolverIndex>
const& itemsShouldPutInEvent()
const = 0;
234 virtual void implRespondToOpenInputFile(
FileBlock const&
fb) = 0;
235 virtual void implRespondToCloseInputFile(
FileBlock const& fb) = 0;
255 template<
typename T>
260 std::ostringstream iost;
269 bool shouldRethrowException(std::exception_ptr iPtr,
274 template<
bool IS_EVENT>
277 timesPassed_.fetch_add(1,std::memory_order_relaxed);
283 template<
bool IS_EVENT>
286 timesFailed_.fetch_add(1,std::memory_order_relaxed);
292 template<
bool IS_EVENT>
295 timesExcept_.fetch_add(1,std::memory_order_relaxed);
297 cached_exception_ = iException;
299 return cached_exception_;
307 actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),moduleCallingContext_);
311 std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr
const * iEPtr,
312 typename T::MyPrincipal
const& ep,
316 typename T::Context
const* context);
318 template<
typename T>
322 typename T::MyPrincipal
const& ep,
326 typename T::Context
const* context):
330 m_streamID(streamID),
331 m_parentContext(parentContext),
341 std::exception_ptr temp_excptr;
342 auto excptr = exceptionPtr();
346 m_worker->emitPostModuleEventPrefetchingSignal();
348 temp_excptr = std::current_exception();
350 excptr = &temp_excptr;
356 if(
auto queue = m_worker->serializeRunModule()) {
357 Worker* worker = m_worker;
360 auto streamID = m_streamID;
361 auto parentContext = m_parentContext;
362 auto serviceToken = m_serviceToken;
363 auto sContext = m_context;
364 queue->push( [worker, &
principal, &es, streamID,parentContext,sContext, serviceToken]()
369 std::exception_ptr* ptr =
nullptr;
381 m_worker->runModuleAfterAsyncPrefetch<
T>(excptr,
419 std::atomic<
bool> workStarted_;
423 template <
typename T>
424 class ModuleSignalSentry {
427 typename T::Context
const* context,
429 a_(a), context_(context), moduleCallingContext_(moduleCallingContext) {
431 if(a_) T::preModuleSignal(a_, context, moduleCallingContext_);
434 ~ModuleSignalSentry() {
435 if(a_) T::postModuleSignal(a_, context_, moduleCallingContext_);
440 typename T::Context
const* context_;
446 namespace workerhelper {
457 return iWorker->
implDo(ep,es, mcc);
475 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
493 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
511 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
529 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
548 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
567 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
587 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
606 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
619 template <
typename T>
621 typename T::MyPrincipal
const& ep,
625 typename T::Context
const* context) {
626 waitingTasks_.add(task);
628 timesVisited_.fetch_add(1,std::memory_order_relaxed);
631 bool expected =
false;
632 if(workStarted_.compare_exchange_strong(expected,
true)) {
638 setPassed<T::isEvent_>();
639 waitingTasks_.doneWaiting(
nullptr);
645 this, ep,es,streamID,parentContext,context);
646 prefetchAsync(runTask, parentContext, ep);
652 typename T::MyPrincipal
const& ep,
656 typename T::Context
const* context) {
657 std::exception_ptr exceptionPtr;
661 if(shouldRethrowException(*iEPtr, parentContext, T::isEvent_, idValue)) {
662 exceptionPtr = *iEPtr;
663 setException<T::isEvent_>(exceptionPtr);
665 setPassed<T::isEvent_>();
670 runModule<T>(ep,es,streamID,parentContext,context);
672 exceptionPtr = std::current_exception();
675 waitingTasks_.doneWaiting(exceptionPtr);
679 template <
typename T>
681 typename T::MyPrincipal
const&
principal,
685 typename T::Context
const* context) {
686 waitingTasks_.add(task);
687 bool expected =
false;
688 if(workStarted_.compare_exchange_strong(expected,
true)) {
691 auto toDo =[
this, &
principal, &es, streamID,parentContext,context, serviceToken]()
693 std::exception_ptr exceptionPtr;
704 exceptionPtr = std::current_exception();
706 this->waitingTasks_.doneWaiting(exceptionPtr);
708 if(
auto queue = this->serializeRunModule()) {
712 tbb::task::spawn(*task);
717 template <
typename T>
722 typename T::Context
const* context) {
725 timesVisited_.fetch_add(1,std::memory_order_relaxed);
731 case Pass:
return true;
732 case Fail:
return false;
734 std::rethrow_exception(cached_exception_);
738 bool expected =
false;
739 if(not workStarted_.compare_exchange_strong(expected,
true) ) {
742 waitTask->increment_ref_count();
744 waitingTasks_.add(waitTask.get());
746 waitTask->wait_for_all();
749 case Ready: assert(
false);
750 case Pass:
return true;
751 case Fail:
return false;
753 std::rethrow_exception(cached_exception_);
762 std::unique_ptr<ModuleCallingContext, decltype(resetContext)> prefetchSentry(&moduleCallingContext_,resetContext);
768 timesRun_.fetch_add(1,std::memory_order_relaxed);
769 rc = setPassed<T::isEvent_>();
770 waitingTasks_.doneWaiting(
nullptr);
779 auto sentryFunc = [
this](
void*) {
780 emitPostModuleEventPrefetchingSignal();
782 std::unique_ptr<ActivityRegistry, decltype(sentryFunc)> signalSentry(actReg_.get(),sentryFunc);
785 waitTask->set_ref_count(2);
787 prefetchAsync(waitTask.get(),parentContext, ep);
788 waitTask->decrement_ref_count();
789 waitTask->wait_for_all();
791 if(waitTask->exceptionPtr() !=
nullptr) {
793 if(shouldRethrowException(*waitTask->exceptionPtr(), parentContext, T::isEvent_, idValue)) {
794 setException<T::isEvent_>(*waitTask->exceptionPtr());
795 waitingTasks_.doneWaiting(cached_exception_);
796 std::rethrow_exception(cached_exception_);
798 setPassed<T::isEvent_>();
799 waitingTasks_.doneWaiting(
nullptr);
806 prefetchSentry.release();
807 if(
auto queue = serializeRunModule()) {
809 queue->pushAndWait([&]() {
813 rc = runModule<T>(ep,es,streamID,parentContext,context);
819 rc = runModule<T>(ep,es,streamID,parentContext,context);
823 if(state_ == Exception) {
824 waitingTasks_.doneWaiting(cached_exception_);
825 std::rethrow_exception(cached_exception_);
828 waitingTasks_.doneWaiting(
nullptr);
833 template <
typename T>
838 typename T::Context
const* context) {
845 timesRun_.fetch_add(1,std::memory_order_relaxed);
855 setPassed<T::isEvent_>();
857 setFailed<T::isEvent_>();
863 if(shouldRethrowException(std::current_exception(), parentContext, T::isEvent_, idValue)) {
864 assert(not cached_exception_);
865 setException<T::isEvent_>(std::current_exception());
866 std::rethrow_exception(cached_exception_);
868 rc = setPassed<T::isEvent_>();
875 template <
typename T>
880 typename T::Context
const* context) {
882 timesVisited_.fetch_add(1,std::memory_order_relaxed);
883 std::exception_ptr
const* prefetchingException =
nullptr;
884 return runModuleAfterAsyncPrefetch<T>(prefetchingException, ep, es, streamID, parentContext, context);
static bool call(Worker *iWorker, StreamID, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
tbb::task * execute() override
std::atomic< int > timesVisited_
RunModuleTask(Worker *worker, typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
ModuleDescription const & description() const
virtual bool implDoStreamBegin(StreamID id, RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
T::Context const * m_context
bool runModule(typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
void callWhenDoneAsync(WaitingTask *task)
T::MyPrincipal const & m_principal
std::atomic< int > numberOfPathsLeftToRun_
std::exception_ptr setException(std::exception_ptr iException)
virtual std::string value() const override
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
static bool call(Worker *iWorker, StreamID id, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin > Arg
static PFTauRenderPlugin instance
ParentContext const m_parentContext
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
std::atomic< int > timesExcept_
OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > Arg
std::atomic< int > timesFailed_
bool doWork(typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
static bool call(Worker *iWorker, StreamID, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
static bool call(Worker *iWorker, StreamID, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
static bool call(Worker *iWorker, StreamID, EventPrincipal const &ep, EventSetup const &es, ActivityRegistry *, ModuleCallingContext const *mcc, Arg::Context const *)
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
ActivityRegistry * activityRegistry()
static bool call(Worker *iWorker, StreamID id, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
void exceptionContext(std::ostream &, GlobalContext const &)
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
ExceptionToActionTable const * actions_
ServiceToken presentToken() const
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
std::exception_ptr runModuleDirectly(typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
ModuleCallingContext moduleCallingContext_
virtual bool implDoEnd(RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > Arg
ModuleDescription const * descPtr() const
virtual bool implDoBegin(RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
void registerThinnedAssociations(ProductRegistry const ®istry, ThinnedAssociationsHelper &helper)
static ServiceRegistry & instance()
#define CMS_THREAD_GUARD(_var_)
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd > Arg
std::atomic< State > state_
static bool call(Worker *iWorker, StreamID id, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
static bool call(Worker *iWorker, StreamID, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd > Arg
TransitionIDValue(T const &iP)
OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > Arg
virtual bool implDoPrePrefetchSelection(StreamID id, EventPrincipal const &ep, ModuleCallingContext const *mcc)=0
void doWorkNoPrefetchingAsync(WaitingTask *task, typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > Arg
std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr const *iEPtr, typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
void respondToOpenInputFile(FileBlock const &fb)
static bool call(Worker *iWorker, StreamID id, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
void respondToCloseInputFile(FileBlock const &fb)
virtual ~TransitionIDValueBase()
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
void emitPostModuleEventPrefetchingSignal()
virtual bool implDo(EventPrincipal const &, EventSetup const &c, ModuleCallingContext const *mcc)=0
std::atomic< int > timesPassed_
virtual bool implDoStreamEnd(StreamID id, RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
auto wrap(F iFunc) -> decltype(iFunc())
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
void doWorkAsync(WaitingTask *task, typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin > Arg
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
std::atomic< int > timesRun_
OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > Arg
ServiceToken m_serviceToken
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)