1 #ifndef FWCore_Framework_Worker_h 2 #define FWCore_Framework_Worker_h 59 #include <unordered_map> 64 class ProductResolverIndexHelper;
65 class ProductResolverIndexAndSkipBit;
68 class ProductRegistry;
69 class ThinnedAssociationsHelper;
72 namespace workerhelper {
88 bool doWork(
typename T::MyPrincipal
const&,
EventSetup const&
c,
91 typename T::Context
const* context);
94 typename T::MyPrincipal
const&,
EventSetup const& c,
97 typename T::Context
const* context);
101 typename T::MyPrincipal
const&,
105 typename T::Context
const* context);
108 waitingTasks_.add(task);
119 void postForkReacquireResources(
unsigned int iChildIndex,
unsigned int iNumberOfChildren) {implPostForkReacquireResources(iChildIndex, iNumberOfChildren);}
123 cached_exception_ = std::exception_ptr();
125 waitingTasks_.reset();
126 workStarted_ =
false;
127 numberOfPathsLeftToRun_ = numberOfPathsOn_;
136 void setActivityRegistry(std::shared_ptr<ActivityRegistry> areg);
141 virtual void updateLookup(
BranchType iBranchType,
143 virtual void resolvePutIndicies(
BranchType iBranchType,
144 std::unordered_multimap<std::string, edm::ProductResolverIndex>
const& iIndicies) = 0;
146 virtual void modulesWhoseProductsAreConsumed(std::vector<ModuleDescription const*>&
modules,
148 std::map<std::string, ModuleDescription const*>
const& labelsToDesc)
const = 0;
150 virtual std::vector<ConsumesInfo> consumesInfo()
const = 0;
152 virtual Types moduleType()
const =0;
155 timesRun_.store(0,std::memory_order_release);
156 timesVisited_.store(0,std::memory_order_release);
157 timesPassed_.store(0,std::memory_order_release);
158 timesFailed_.store(0,std::memory_order_release);
159 timesExcept_.store(0,std::memory_order_release);
166 int timesRun()
const {
return timesRun_.load(std::memory_order_acquire); }
167 int timesVisited()
const {
return timesVisited_.load(std::memory_order_acquire); }
168 int timesPassed()
const {
return timesPassed_.load(std::memory_order_acquire); }
169 int timesFailed()
const {
return timesFailed_.load(std::memory_order_acquire); }
170 int timesExcept()
const {
return timesExcept_.load(std::memory_order_acquire); }
180 virtual bool implDoPrePrefetchSelection(
StreamID id,
199 virtual void implBeginJob() = 0;
200 virtual void implEndJob() = 0;
201 virtual void implBeginStream(
StreamID) = 0;
202 virtual void implEndStream(
StreamID) = 0;
210 template <
typename T>
211 bool runModule(
typename T::MyPrincipal
const&,
EventSetup const& c,
214 typename T::Context
const* context);
216 virtual void itemsToGet(
BranchType, std::vector<ProductResolverIndexAndSkipBit>&)
const = 0;
217 virtual void itemsMayGet(
BranchType, std::vector<ProductResolverIndexAndSkipBit>&)
const = 0;
219 virtual std::vector<ProductResolverIndexAndSkipBit>
const& itemsToGetFrom(
BranchType)
const = 0;
222 virtual std::vector<ProductResolverIndex>
const& itemsShouldPutInEvent()
const = 0;
226 virtual void implRespondToOpenInputFile(
FileBlock const&
fb) = 0;
227 virtual void implRespondToCloseInputFile(
FileBlock const& fb) = 0;
229 virtual void implPreForkReleaseResources() = 0;
230 virtual void implPostForkReacquireResources(
unsigned int iChildIndex,
231 unsigned int iNumberOfChildren) = 0;
250 template<
typename T>
255 std::ostringstream iost;
264 bool shouldRethrowException(std::exception_ptr iPtr,
269 template<
bool IS_EVENT>
272 timesPassed_.fetch_add(1,std::memory_order_relaxed);
278 template<
bool IS_EVENT>
281 timesFailed_.fetch_add(1,std::memory_order_relaxed);
287 template<
bool IS_EVENT>
290 timesExcept_.fetch_add(1,std::memory_order_relaxed);
292 cached_exception_ = iException;
294 return cached_exception_;
302 actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),moduleCallingContext_);
306 void runModuleAfterAsyncPrefetch(std::exception_ptr
const * iEPtr,
307 typename T::MyPrincipal
const& ep,
311 typename T::Context
const* context);
313 template<
typename T>
317 typename T::MyPrincipal
const& ep,
321 typename T::Context
const* context):
325 m_streamID(streamID),
326 m_parentContext(parentContext),
336 std::exception_ptr temp_excptr;
337 auto excptr = exceptionPtr();
341 m_worker->emitPostModuleEventPrefetchingSignal();
343 temp_excptr = std::current_exception();
345 excptr = &temp_excptr;
351 if(
auto queue = m_worker->serializeRunModule()) {
352 Worker* worker = m_worker;
355 auto streamID = m_streamID;
356 auto parentContext = m_parentContext;
357 auto serviceToken = m_serviceToken;
358 auto sContext = m_context;
359 queue->push( [worker, &
principal, &es, streamID,parentContext,sContext, serviceToken]()
364 std::exception_ptr* ptr =
nullptr;
376 m_worker->runModuleAfterAsyncPrefetch<
T>(excptr,
414 std::atomic<
bool> workStarted_;
418 template <
typename T>
419 class ModuleSignalSentry {
422 typename T::Context
const* context,
424 a_(a), context_(context), moduleCallingContext_(moduleCallingContext) {
426 if(a_) T::preModuleSignal(a_, context, moduleCallingContext_);
429 ~ModuleSignalSentry() {
430 if(a_) T::postModuleSignal(a_, context_, moduleCallingContext_);
435 typename T::Context
const* context_;
441 namespace workerhelper {
452 return iWorker->
implDo(ep,es, mcc);
470 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
488 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
506 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
524 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
543 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
562 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
582 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
601 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
614 template <
typename T>
616 typename T::MyPrincipal
const& ep,
620 typename T::Context
const* context) {
621 waitingTasks_.add(task);
623 timesVisited_.fetch_add(1,std::memory_order_relaxed);
626 bool expected =
false;
627 if(workStarted_.compare_exchange_strong(expected,
true)) {
633 setPassed<T::isEvent_>();
634 waitingTasks_.doneWaiting(
nullptr);
640 this, ep,es,streamID,parentContext,context);
641 prefetchAsync(runTask, parentContext, ep);
647 typename T::MyPrincipal
const& ep,
651 typename T::Context
const* context) {
652 std::exception_ptr exceptionPtr;
656 if(shouldRethrowException(*iEPtr, parentContext, T::isEvent_, idValue)) {
657 exceptionPtr = *iEPtr;
658 setException<T::isEvent_>(exceptionPtr);
660 setPassed<T::isEvent_>();
665 runModule<T>(ep,es,streamID,parentContext,context);
667 exceptionPtr = std::current_exception();
670 waitingTasks_.doneWaiting(exceptionPtr);
673 template <
typename T>
675 typename T::MyPrincipal
const&
principal,
679 typename T::Context
const* context) {
680 waitingTasks_.add(task);
681 bool expected =
false;
682 if(workStarted_.compare_exchange_strong(expected,
true)) {
685 auto toDo =[
this, &
principal, &es, streamID,parentContext,context, serviceToken]()
687 std::exception_ptr exceptionPtr;
698 exceptionPtr = std::current_exception();
700 this->waitingTasks_.doneWaiting(exceptionPtr);
702 if(
auto queue = this->serializeRunModule()) {
706 tbb::task::spawn(*task);
711 template <
typename T>
716 typename T::Context
const* context) {
719 timesVisited_.fetch_add(1,std::memory_order_relaxed);
725 case Pass:
return true;
726 case Fail:
return false;
728 std::rethrow_exception(cached_exception_);
732 bool expected =
false;
733 if(not workStarted_.compare_exchange_strong(expected,
true) ) {
736 waitTask->increment_ref_count();
738 waitingTasks_.add(waitTask.get());
740 waitTask->wait_for_all();
743 case Ready: assert(
false);
744 case Pass:
return true;
745 case Fail:
return false;
747 std::rethrow_exception(cached_exception_);
756 std::unique_ptr<ModuleCallingContext, decltype(resetContext)> prefetchSentry(&moduleCallingContext_,resetContext);
762 timesRun_.fetch_add(1,std::memory_order_relaxed);
763 rc = setPassed<T::isEvent_>();
764 waitingTasks_.doneWaiting(
nullptr);
773 auto sentryFunc = [
this](
void*) {
774 emitPostModuleEventPrefetchingSignal();
776 std::unique_ptr<ActivityRegistry, decltype(sentryFunc)> signalSentry(actReg_.get(),sentryFunc);
779 waitTask->set_ref_count(2);
781 prefetchAsync(waitTask.get(),parentContext, ep);
782 waitTask->decrement_ref_count();
783 waitTask->wait_for_all();
785 if(waitTask->exceptionPtr() !=
nullptr) {
787 if(shouldRethrowException(*waitTask->exceptionPtr(), parentContext, T::isEvent_, idValue)) {
788 setException<T::isEvent_>(*waitTask->exceptionPtr());
789 waitingTasks_.doneWaiting(cached_exception_);
790 std::rethrow_exception(cached_exception_);
792 setPassed<T::isEvent_>();
793 waitingTasks_.doneWaiting(
nullptr);
800 prefetchSentry.release();
801 if(
auto queue = serializeRunModule()) {
803 queue->pushAndWait([&]() {
807 rc = runModule<T>(ep,es,streamID,parentContext,context);
813 rc = runModule<T>(ep,es,streamID,parentContext,context);
817 if(state_ == Exception) {
818 waitingTasks_.doneWaiting(cached_exception_);
819 std::rethrow_exception(cached_exception_);
822 waitingTasks_.doneWaiting(
nullptr);
827 template <
typename T>
832 typename T::Context
const* context) {
839 timesRun_.fetch_add(1,std::memory_order_relaxed);
849 setPassed<T::isEvent_>();
851 setFailed<T::isEvent_>();
857 if(shouldRethrowException(std::current_exception(), parentContext, T::isEvent_, idValue)) {
858 assert(not cached_exception_);
859 setException<T::isEvent_>(std::current_exception());
860 std::rethrow_exception(cached_exception_);
862 rc = setPassed<T::isEvent_>();
static bool call(Worker *iWorker, StreamID, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
tbb::task * execute() override
std::atomic< int > timesVisited_
RunModuleTask(Worker *worker, typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
ModuleDescription const & description() const
virtual bool implDoStreamBegin(StreamID id, RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
T::Context const * m_context
bool runModule(typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
void callWhenDoneAsync(WaitingTask *task)
T::MyPrincipal const & m_principal
std::atomic< int > numberOfPathsLeftToRun_
std::exception_ptr setException(std::exception_ptr iException)
virtual std::string value() const override
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
static bool call(Worker *iWorker, StreamID id, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin > Arg
static PFTauRenderPlugin instance
ParentContext const m_parentContext
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
std::atomic< int > timesExcept_
OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > Arg
std::atomic< int > timesFailed_
bool doWork(typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
static bool call(Worker *iWorker, StreamID, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
static bool call(Worker *iWorker, StreamID, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
static bool call(Worker *iWorker, StreamID, EventPrincipal const &ep, EventSetup const &es, ActivityRegistry *, ModuleCallingContext const *mcc, Arg::Context const *)
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
ActivityRegistry * activityRegistry()
static bool call(Worker *iWorker, StreamID id, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
void exceptionContext(std::ostream &, GlobalContext const &)
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
ExceptionToActionTable const * actions_
ServiceToken presentToken() const
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
virtual bool implDoEnd(RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > Arg
ModuleDescription const * descPtr() const
virtual bool implDoBegin(RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
void registerThinnedAssociations(ProductRegistry const ®istry, ThinnedAssociationsHelper &helper)
static ServiceRegistry & instance()
#define CMS_THREAD_GUARD(_var_)
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd > Arg
std::atomic< State > state_
static bool call(Worker *iWorker, StreamID id, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
static bool call(Worker *iWorker, StreamID, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd > Arg
TransitionIDValue(T const &iP)
void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren)
OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > Arg
virtual bool implDoPrePrefetchSelection(StreamID id, EventPrincipal const &ep, ModuleCallingContext const *mcc)=0
void doWorkNoPrefetchingAsync(WaitingTask *task, typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > Arg
void respondToOpenInputFile(FileBlock const &fb)
static bool call(Worker *iWorker, StreamID id, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
void respondToCloseInputFile(FileBlock const &fb)
virtual ~TransitionIDValueBase()
void preForkReleaseResources()
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
void emitPostModuleEventPrefetchingSignal()
virtual bool implDo(EventPrincipal const &, EventSetup const &c, ModuleCallingContext const *mcc)=0
std::atomic< int > timesPassed_
virtual bool implDoStreamEnd(StreamID id, RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
auto wrap(F iFunc) -> decltype(iFunc())
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
void doWorkAsync(WaitingTask *task, typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin > Arg
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
std::atomic< int > timesRun_
OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > Arg
ServiceToken m_serviceToken
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
void runModuleAfterAsyncPrefetch(std::exception_ptr const *iEPtr, typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)