1 #ifndef FWCore_Framework_Worker_h 2 #define FWCore_Framework_Worker_h 59 #include <unordered_map> 64 class ProductResolverIndexHelper;
65 class ProductResolverIndexAndSkipBit;
68 class ProductRegistry;
69 class ThinnedAssociationsHelper;
72 namespace workerhelper {
88 bool doWork(
typename T::MyPrincipal
const&,
EventSetup const&
c,
91 typename T::Context
const* context);
94 typename T::MyPrincipal
const&,
EventSetup const& c,
97 typename T::Context
const* context);
101 typename T::MyPrincipal
const&,
105 typename T::Context
const* context);
108 waitingTasks_.add(task);
119 void postForkReacquireResources(
unsigned int iChildIndex,
unsigned int iNumberOfChildren) {implPostForkReacquireResources(iChildIndex, iNumberOfChildren);}
123 cached_exception_ = std::exception_ptr();
125 waitingTasks_.reset();
126 workStarted_ =
false;
127 numberOfPathsLeftToRun_ = numberOfPathsOn_;
136 void setActivityRegistry(std::shared_ptr<ActivityRegistry> areg);
141 virtual void updateLookup(
BranchType iBranchType,
143 virtual void resolvePutIndicies(
BranchType iBranchType,
144 std::unordered_multimap<std::string, edm::ProductResolverIndex>
const& iIndicies) = 0;
146 virtual void modulesWhoseProductsAreConsumed(std::vector<ModuleDescription const*>&
modules,
148 std::map<std::string, ModuleDescription const*>
const& labelsToDesc)
const = 0;
150 virtual std::vector<ConsumesInfo> consumesInfo()
const = 0;
152 virtual Types moduleType()
const =0;
155 timesRun_.store(0,std::memory_order_release);
156 timesVisited_.store(0,std::memory_order_release);
157 timesPassed_.store(0,std::memory_order_release);
158 timesFailed_.store(0,std::memory_order_release);
159 timesExcept_.store(0,std::memory_order_release);
166 int timesRun()
const {
return timesRun_.load(std::memory_order_acquire); }
167 int timesVisited()
const {
return timesVisited_.load(std::memory_order_acquire); }
168 int timesPassed()
const {
return timesPassed_.load(std::memory_order_acquire); }
169 int timesFailed()
const {
return timesFailed_.load(std::memory_order_acquire); }
170 int timesExcept()
const {
return timesExcept_.load(std::memory_order_acquire); }
180 virtual bool implDoPrePrefetchSelection(
StreamID id,
199 virtual void implBeginJob() = 0;
200 virtual void implEndJob() = 0;
201 virtual void implBeginStream(
StreamID) = 0;
202 virtual void implEndStream(
StreamID) = 0;
210 template <
typename T>
211 bool runModule(
typename T::MyPrincipal
const&,
EventSetup const& c,
214 typename T::Context
const* context);
216 virtual void itemsToGet(
BranchType, std::vector<ProductResolverIndexAndSkipBit>&)
const = 0;
217 virtual void itemsMayGet(
BranchType, std::vector<ProductResolverIndexAndSkipBit>&)
const = 0;
219 virtual std::vector<ProductResolverIndexAndSkipBit>
const& itemsToGetFromEvent()
const = 0;
221 virtual std::vector<ProductResolverIndex>
const& itemsShouldPutInEvent()
const = 0;
225 virtual void implRespondToOpenInputFile(
FileBlock const&
fb) = 0;
226 virtual void implRespondToCloseInputFile(
FileBlock const& fb) = 0;
228 virtual void implPreForkReleaseResources() = 0;
229 virtual void implPostForkReacquireResources(
unsigned int iChildIndex,
230 unsigned int iNumberOfChildren) = 0;
249 template<
typename T>
254 std::ostringstream iost;
263 bool shouldRethrowException(std::exception_ptr iPtr,
268 template<
bool IS_EVENT>
271 timesPassed_.fetch_add(1,std::memory_order_relaxed);
277 template<
bool IS_EVENT>
280 timesFailed_.fetch_add(1,std::memory_order_relaxed);
286 template<
bool IS_EVENT>
289 timesExcept_.fetch_add(1,std::memory_order_relaxed);
291 cached_exception_ = iException;
293 return cached_exception_;
301 actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),moduleCallingContext_);
305 void runModuleAfterAsyncPrefetch(std::exception_ptr
const * iEPtr,
306 typename T::MyPrincipal
const& ep,
310 typename T::Context
const* context);
312 template<
typename T>
316 typename T::MyPrincipal
const& ep,
320 typename T::Context
const* context):
324 m_streamID(streamID),
325 m_parentContext(parentContext),
335 std::exception_ptr temp_excptr;
336 auto excptr = exceptionPtr();
339 m_worker->emitPostModuleEventPrefetchingSignal();
341 temp_excptr = std::current_exception();
343 excptr = &temp_excptr;
348 if(
auto queue = m_worker->serializeRunModule()) {
349 Worker* worker = m_worker;
352 auto streamID = m_streamID;
353 auto parentContext = m_parentContext;
354 auto serviceToken = m_serviceToken;
355 auto sContext = m_context;
356 queue->push( [worker, &
principal, &es, streamID,parentContext,sContext, serviceToken]()
361 std::exception_ptr* ptr =
nullptr;
373 m_worker->runModuleAfterAsyncPrefetch<
T>(excptr,
411 std::atomic<
bool> workStarted_;
415 template <
typename T>
416 class ModuleSignalSentry {
419 typename T::Context
const* context,
421 a_(a), context_(context), moduleCallingContext_(moduleCallingContext) {
423 if(a_) T::preModuleSignal(a_, context, moduleCallingContext_);
426 ~ModuleSignalSentry() {
427 if(a_) T::postModuleSignal(a_, context_, moduleCallingContext_);
432 typename T::Context
const* context_;
438 namespace workerhelper {
449 return iWorker->
implDo(ep,es, mcc);
467 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
485 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
503 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
521 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
540 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
559 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
579 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
598 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
611 template <
typename T>
613 typename T::MyPrincipal
const& ep,
617 typename T::Context
const* context) {
618 waitingTasks_.add(task);
620 timesVisited_.fetch_add(1,std::memory_order_relaxed);
623 bool expected =
false;
624 if(workStarted_.compare_exchange_strong(expected,
true)) {
630 setPassed<T::isEvent_>();
631 waitingTasks_.doneWaiting(
nullptr);
637 this, ep,es,streamID,parentContext,context);
638 prefetchAsync(runTask, parentContext, ep);
644 typename T::MyPrincipal
const& ep,
648 typename T::Context
const* context) {
649 std::exception_ptr exceptionPtr;
653 if(shouldRethrowException(*iEPtr, parentContext, T::isEvent_, idValue)) {
654 exceptionPtr = *iEPtr;
655 setException<T::isEvent_>(exceptionPtr);
657 setPassed<T::isEvent_>();
662 runModule<T>(ep,es,streamID,parentContext,context);
664 exceptionPtr = std::current_exception();
667 waitingTasks_.doneWaiting(exceptionPtr);
670 template <
typename T>
672 typename T::MyPrincipal
const&
principal,
676 typename T::Context
const* context) {
677 waitingTasks_.add(task);
678 bool expected =
false;
679 if(workStarted_.compare_exchange_strong(expected,
true)) {
682 auto toDo =[
this, &
principal, &es, streamID,parentContext,context, serviceToken]()
684 std::exception_ptr exceptionPtr;
695 exceptionPtr = std::current_exception();
697 this->waitingTasks_.doneWaiting(exceptionPtr);
699 if(
auto queue = this->serializeRunModule()) {
703 tbb::task::spawn(*task);
708 template <
typename T>
713 typename T::Context
const* context) {
716 timesVisited_.fetch_add(1,std::memory_order_relaxed);
722 case Pass:
return true;
723 case Fail:
return false;
725 std::rethrow_exception(cached_exception_);
729 bool expected =
false;
730 if(not workStarted_.compare_exchange_strong(expected,
true) ) {
733 waitTask->increment_ref_count();
735 waitingTasks_.add(waitTask.get());
737 waitTask->wait_for_all();
740 case Ready: assert(
false);
741 case Pass:
return true;
742 case Fail:
return false;
744 std::rethrow_exception(cached_exception_);
753 std::unique_ptr<ModuleCallingContext, decltype(resetContext)> prefetchSentry(&moduleCallingContext_,resetContext);
759 timesRun_.fetch_add(1,std::memory_order_relaxed);
760 rc = setPassed<T::isEvent_>();
761 waitingTasks_.doneWaiting(
nullptr);
770 auto sentryFunc = [
this](
void*) {
771 emitPostModuleEventPrefetchingSignal();
773 std::unique_ptr<ActivityRegistry, decltype(sentryFunc)> signalSentry(actReg_.get(),sentryFunc);
776 waitTask->set_ref_count(2);
778 prefetchAsync(waitTask.get(),parentContext, ep);
779 waitTask->decrement_ref_count();
780 waitTask->wait_for_all();
782 if(waitTask->exceptionPtr() !=
nullptr) {
784 if(shouldRethrowException(*waitTask->exceptionPtr(), parentContext, T::isEvent_, idValue)) {
785 setException<T::isEvent_>(*waitTask->exceptionPtr());
786 waitingTasks_.doneWaiting(cached_exception_);
787 std::rethrow_exception(cached_exception_);
789 setPassed<T::isEvent_>();
790 waitingTasks_.doneWaiting(
nullptr);
797 prefetchSentry.release();
798 if(
auto queue = serializeRunModule()) {
800 queue->pushAndWait([&]() {
804 rc = runModule<T>(ep,es,streamID,parentContext,context);
810 rc = runModule<T>(ep,es,streamID,parentContext,context);
814 if(state_ == Exception) {
815 waitingTasks_.doneWaiting(cached_exception_);
816 std::rethrow_exception(cached_exception_);
819 waitingTasks_.doneWaiting(
nullptr);
824 template <
typename T>
829 typename T::Context
const* context) {
836 timesRun_.fetch_add(1,std::memory_order_relaxed);
846 setPassed<T::isEvent_>();
848 setFailed<T::isEvent_>();
854 if(shouldRethrowException(std::current_exception(), parentContext, T::isEvent_, idValue)) {
855 assert(not cached_exception_);
856 setException<T::isEvent_>(std::current_exception());
857 std::rethrow_exception(cached_exception_);
859 rc = setPassed<T::isEvent_>();
static bool call(Worker *iWorker, StreamID, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
tbb::task * execute() override
std::atomic< int > timesVisited_
RunModuleTask(Worker *worker, typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
ModuleDescription const & description() const
virtual bool implDoStreamBegin(StreamID id, RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
T::Context const * m_context
bool runModule(typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
void callWhenDoneAsync(WaitingTask *task)
T::MyPrincipal const & m_principal
std::atomic< int > numberOfPathsLeftToRun_
std::exception_ptr setException(std::exception_ptr iException)
virtual std::string value() const override
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
static bool call(Worker *iWorker, StreamID id, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin > Arg
static PFTauRenderPlugin instance
ParentContext const m_parentContext
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
std::atomic< int > timesExcept_
OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > Arg
std::atomic< int > timesFailed_
bool doWork(typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
static bool call(Worker *iWorker, StreamID, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
static bool call(Worker *iWorker, StreamID, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
static bool call(Worker *iWorker, StreamID, EventPrincipal const &ep, EventSetup const &es, ActivityRegistry *, ModuleCallingContext const *mcc, Arg::Context const *)
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
ActivityRegistry * activityRegistry()
static bool call(Worker *iWorker, StreamID id, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
void exceptionContext(std::ostream &, GlobalContext const &)
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
ExceptionToActionTable const * actions_
ServiceToken presentToken() const
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
virtual bool implDoEnd(RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > Arg
ModuleDescription const * descPtr() const
virtual bool implDoBegin(RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
void registerThinnedAssociations(ProductRegistry const ®istry, ThinnedAssociationsHelper &helper)
static ServiceRegistry & instance()
#define CMS_THREAD_GUARD(_var_)
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd > Arg
std::atomic< State > state_
static bool call(Worker *iWorker, StreamID id, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
static bool call(Worker *iWorker, StreamID, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd > Arg
TransitionIDValue(T const &iP)
void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren)
OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > Arg
virtual bool implDoPrePrefetchSelection(StreamID id, EventPrincipal const &ep, ModuleCallingContext const *mcc)=0
void doWorkNoPrefetchingAsync(WaitingTask *task, typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > Arg
void respondToOpenInputFile(FileBlock const &fb)
static bool call(Worker *iWorker, StreamID id, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
void respondToCloseInputFile(FileBlock const &fb)
virtual ~TransitionIDValueBase()
void preForkReleaseResources()
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
void emitPostModuleEventPrefetchingSignal()
virtual bool implDo(EventPrincipal const &, EventSetup const &c, ModuleCallingContext const *mcc)=0
std::atomic< int > timesPassed_
virtual bool implDoStreamEnd(StreamID id, RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
auto wrap(F iFunc) -> decltype(iFunc())
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
void doWorkAsync(WaitingTask *task, typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin > Arg
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
std::atomic< int > timesRun_
OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > Arg
ServiceToken m_serviceToken
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
void runModuleAfterAsyncPrefetch(std::exception_ptr const *iEPtr, typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)