1 #ifndef FWCore_Framework_Worker_h 2 #define FWCore_Framework_Worker_h 59 #include <unordered_map> 64 class ProductResolverIndexHelper;
65 class ProductResolverIndexAndSkipBit;
68 class ProductRegistry;
69 class ThinnedAssociationsHelper;
72 namespace workerhelper {
88 bool doWork(
typename T::MyPrincipal
const&,
EventSetup const&
c,
91 typename T::Context
const* context);
94 typename T::MyPrincipal
const&,
EventSetup const& c,
97 typename T::Context
const* context);
101 typename T::MyPrincipal
const&,
105 typename T::Context
const* context);
108 waitingTasks_.add(task);
119 void postForkReacquireResources(
unsigned int iChildIndex,
unsigned int iNumberOfChildren) {implPostForkReacquireResources(iChildIndex, iNumberOfChildren);}
123 cached_exception_ = std::exception_ptr();
125 waitingTasks_.reset();
126 workStarted_ =
false;
127 numberOfPathsLeftToRun_ = numberOfPathsOn_;
136 void setActivityRegistry(std::shared_ptr<ActivityRegistry> areg);
141 virtual void updateLookup(
BranchType iBranchType,
143 virtual void resolvePutIndicies(
BranchType iBranchType,
144 std::unordered_multimap<std::string, edm::ProductResolverIndex>
const& iIndicies) = 0;
146 virtual void modulesWhoseProductsAreConsumed(std::vector<ModuleDescription const*>&
modules,
148 std::map<std::string, ModuleDescription const*>
const& labelsToDesc)
const = 0;
152 virtual std::vector<ConsumesInfo> consumesInfo()
const = 0;
154 virtual Types moduleType()
const =0;
157 timesRun_.store(0,std::memory_order_release);
158 timesVisited_.store(0,std::memory_order_release);
159 timesPassed_.store(0,std::memory_order_release);
160 timesFailed_.store(0,std::memory_order_release);
161 timesExcept_.store(0,std::memory_order_release);
168 int timesRun()
const {
return timesRun_.load(std::memory_order_acquire); }
169 int timesVisited()
const {
return timesVisited_.load(std::memory_order_acquire); }
170 int timesPassed()
const {
return timesPassed_.load(std::memory_order_acquire); }
171 int timesFailed()
const {
return timesFailed_.load(std::memory_order_acquire); }
172 int timesExcept()
const {
return timesExcept_.load(std::memory_order_acquire); }
182 virtual bool implDoPrePrefetchSelection(
StreamID id,
201 virtual void implBeginJob() = 0;
202 virtual void implEndJob() = 0;
203 virtual void implBeginStream(
StreamID) = 0;
204 virtual void implEndStream(
StreamID) = 0;
212 template <
typename T>
213 bool runModule(
typename T::MyPrincipal
const&,
EventSetup const& c,
216 typename T::Context
const* context);
218 virtual void itemsToGet(
BranchType, std::vector<ProductResolverIndexAndSkipBit>&)
const = 0;
219 virtual void itemsMayGet(
BranchType, std::vector<ProductResolverIndexAndSkipBit>&)
const = 0;
221 virtual std::vector<ProductResolverIndexAndSkipBit>
const& itemsToGetFrom(
BranchType)
const = 0;
224 virtual std::vector<ProductResolverIndex>
const& itemsShouldPutInEvent()
const = 0;
228 virtual void implRespondToOpenInputFile(
FileBlock const&
fb) = 0;
229 virtual void implRespondToCloseInputFile(
FileBlock const& fb) = 0;
231 virtual void implPreForkReleaseResources() = 0;
232 virtual void implPostForkReacquireResources(
unsigned int iChildIndex,
233 unsigned int iNumberOfChildren) = 0;
252 template<
typename T>
257 std::ostringstream iost;
266 bool shouldRethrowException(std::exception_ptr iPtr,
271 template<
bool IS_EVENT>
274 timesPassed_.fetch_add(1,std::memory_order_relaxed);
280 template<
bool IS_EVENT>
283 timesFailed_.fetch_add(1,std::memory_order_relaxed);
289 template<
bool IS_EVENT>
292 timesExcept_.fetch_add(1,std::memory_order_relaxed);
294 cached_exception_ = iException;
296 return cached_exception_;
304 actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),moduleCallingContext_);
308 void runModuleAfterAsyncPrefetch(std::exception_ptr
const * iEPtr,
309 typename T::MyPrincipal
const& ep,
313 typename T::Context
const* context);
315 template<
typename T>
319 typename T::MyPrincipal
const& ep,
323 typename T::Context
const* context):
327 m_streamID(streamID),
328 m_parentContext(parentContext),
338 std::exception_ptr temp_excptr;
339 auto excptr = exceptionPtr();
343 m_worker->emitPostModuleEventPrefetchingSignal();
345 temp_excptr = std::current_exception();
347 excptr = &temp_excptr;
353 if(
auto queue = m_worker->serializeRunModule()) {
354 Worker* worker = m_worker;
357 auto streamID = m_streamID;
358 auto parentContext = m_parentContext;
359 auto serviceToken = m_serviceToken;
360 auto sContext = m_context;
361 queue->push( [worker, &
principal, &es, streamID,parentContext,sContext, serviceToken]()
366 std::exception_ptr* ptr =
nullptr;
378 m_worker->runModuleAfterAsyncPrefetch<
T>(excptr,
416 std::atomic<
bool> workStarted_;
420 template <
typename T>
421 class ModuleSignalSentry {
424 typename T::Context
const* context,
426 a_(a), context_(context), moduleCallingContext_(moduleCallingContext) {
428 if(a_) T::preModuleSignal(a_, context, moduleCallingContext_);
431 ~ModuleSignalSentry() {
432 if(a_) T::postModuleSignal(a_, context_, moduleCallingContext_);
437 typename T::Context
const* context_;
443 namespace workerhelper {
454 return iWorker->
implDo(ep,es, mcc);
472 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
490 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
508 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
526 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
545 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
564 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
584 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
603 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
616 template <
typename T>
618 typename T::MyPrincipal
const& ep,
622 typename T::Context
const* context) {
623 waitingTasks_.add(task);
625 timesVisited_.fetch_add(1,std::memory_order_relaxed);
628 bool expected =
false;
629 if(workStarted_.compare_exchange_strong(expected,
true)) {
635 setPassed<T::isEvent_>();
636 waitingTasks_.doneWaiting(
nullptr);
642 this, ep,es,streamID,parentContext,context);
643 prefetchAsync(runTask, parentContext, ep);
649 typename T::MyPrincipal
const& ep,
653 typename T::Context
const* context) {
654 std::exception_ptr exceptionPtr;
658 if(shouldRethrowException(*iEPtr, parentContext, T::isEvent_, idValue)) {
659 exceptionPtr = *iEPtr;
660 setException<T::isEvent_>(exceptionPtr);
662 setPassed<T::isEvent_>();
667 runModule<T>(ep,es,streamID,parentContext,context);
669 exceptionPtr = std::current_exception();
672 waitingTasks_.doneWaiting(exceptionPtr);
675 template <
typename T>
677 typename T::MyPrincipal
const&
principal,
681 typename T::Context
const* context) {
682 waitingTasks_.add(task);
683 bool expected =
false;
684 if(workStarted_.compare_exchange_strong(expected,
true)) {
687 auto toDo =[
this, &
principal, &es, streamID,parentContext,context, serviceToken]()
689 std::exception_ptr exceptionPtr;
700 exceptionPtr = std::current_exception();
702 this->waitingTasks_.doneWaiting(exceptionPtr);
704 if(
auto queue = this->serializeRunModule()) {
708 tbb::task::spawn(*task);
713 template <
typename T>
718 typename T::Context
const* context) {
721 timesVisited_.fetch_add(1,std::memory_order_relaxed);
727 case Pass:
return true;
728 case Fail:
return false;
730 std::rethrow_exception(cached_exception_);
734 bool expected =
false;
735 if(not workStarted_.compare_exchange_strong(expected,
true) ) {
738 waitTask->increment_ref_count();
740 waitingTasks_.add(waitTask.get());
742 waitTask->wait_for_all();
745 case Ready: assert(
false);
746 case Pass:
return true;
747 case Fail:
return false;
749 std::rethrow_exception(cached_exception_);
758 std::unique_ptr<ModuleCallingContext, decltype(resetContext)> prefetchSentry(&moduleCallingContext_,resetContext);
764 timesRun_.fetch_add(1,std::memory_order_relaxed);
765 rc = setPassed<T::isEvent_>();
766 waitingTasks_.doneWaiting(
nullptr);
775 auto sentryFunc = [
this](
void*) {
776 emitPostModuleEventPrefetchingSignal();
778 std::unique_ptr<ActivityRegistry, decltype(sentryFunc)> signalSentry(actReg_.get(),sentryFunc);
781 waitTask->set_ref_count(2);
783 prefetchAsync(waitTask.get(),parentContext, ep);
784 waitTask->decrement_ref_count();
785 waitTask->wait_for_all();
787 if(waitTask->exceptionPtr() !=
nullptr) {
789 if(shouldRethrowException(*waitTask->exceptionPtr(), parentContext, T::isEvent_, idValue)) {
790 setException<T::isEvent_>(*waitTask->exceptionPtr());
791 waitingTasks_.doneWaiting(cached_exception_);
792 std::rethrow_exception(cached_exception_);
794 setPassed<T::isEvent_>();
795 waitingTasks_.doneWaiting(
nullptr);
802 prefetchSentry.release();
803 if(
auto queue = serializeRunModule()) {
805 queue->pushAndWait([&]() {
809 rc = runModule<T>(ep,es,streamID,parentContext,context);
815 rc = runModule<T>(ep,es,streamID,parentContext,context);
819 if(state_ == Exception) {
820 waitingTasks_.doneWaiting(cached_exception_);
821 std::rethrow_exception(cached_exception_);
824 waitingTasks_.doneWaiting(
nullptr);
829 template <
typename T>
834 typename T::Context
const* context) {
841 timesRun_.fetch_add(1,std::memory_order_relaxed);
851 setPassed<T::isEvent_>();
853 setFailed<T::isEvent_>();
859 if(shouldRethrowException(std::current_exception(), parentContext, T::isEvent_, idValue)) {
860 assert(not cached_exception_);
861 setException<T::isEvent_>(std::current_exception());
862 std::rethrow_exception(cached_exception_);
864 rc = setPassed<T::isEvent_>();
static bool call(Worker *iWorker, StreamID, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
tbb::task * execute() override
std::atomic< int > timesVisited_
RunModuleTask(Worker *worker, typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
ModuleDescription const & description() const
virtual bool implDoStreamBegin(StreamID id, RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
T::Context const * m_context
bool runModule(typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
void callWhenDoneAsync(WaitingTask *task)
T::MyPrincipal const & m_principal
std::atomic< int > numberOfPathsLeftToRun_
std::exception_ptr setException(std::exception_ptr iException)
virtual std::string value() const override
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
static bool call(Worker *iWorker, StreamID id, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin > Arg
static PFTauRenderPlugin instance
ParentContext const m_parentContext
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
std::atomic< int > timesExcept_
OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > Arg
std::atomic< int > timesFailed_
bool doWork(typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
static bool call(Worker *iWorker, StreamID, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
static bool call(Worker *iWorker, StreamID, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
static bool call(Worker *iWorker, StreamID, EventPrincipal const &ep, EventSetup const &es, ActivityRegistry *, ModuleCallingContext const *mcc, Arg::Context const *)
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
ActivityRegistry * activityRegistry()
static bool call(Worker *iWorker, StreamID id, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
void exceptionContext(std::ostream &, GlobalContext const &)
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
ExceptionToActionTable const * actions_
ServiceToken presentToken() const
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
virtual bool implDoEnd(RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > Arg
ModuleDescription const * descPtr() const
virtual bool implDoBegin(RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
void registerThinnedAssociations(ProductRegistry const ®istry, ThinnedAssociationsHelper &helper)
static ServiceRegistry & instance()
#define CMS_THREAD_GUARD(_var_)
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd > Arg
std::atomic< State > state_
static bool call(Worker *iWorker, StreamID id, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
static bool call(Worker *iWorker, StreamID, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd > Arg
TransitionIDValue(T const &iP)
void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren)
OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > Arg
virtual bool implDoPrePrefetchSelection(StreamID id, EventPrincipal const &ep, ModuleCallingContext const *mcc)=0
void doWorkNoPrefetchingAsync(WaitingTask *task, typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > Arg
void respondToOpenInputFile(FileBlock const &fb)
static bool call(Worker *iWorker, StreamID id, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
void respondToCloseInputFile(FileBlock const &fb)
virtual ~TransitionIDValueBase()
void preForkReleaseResources()
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
void emitPostModuleEventPrefetchingSignal()
virtual bool implDo(EventPrincipal const &, EventSetup const &c, ModuleCallingContext const *mcc)=0
std::atomic< int > timesPassed_
virtual bool implDoStreamEnd(StreamID id, RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
auto wrap(F iFunc) -> decltype(iFunc())
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
void doWorkAsync(WaitingTask *task, typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin > Arg
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
std::atomic< int > timesRun_
OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > Arg
ServiceToken m_serviceToken
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
void runModuleAfterAsyncPrefetch(std::exception_ptr const *iEPtr, typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)