1 #ifndef FWCore_Framework_Worker_h
2 #define FWCore_Framework_Worker_h
58 #include <unordered_map>
63 class ProductResolverIndexHelper;
64 class ProductResolverIndexAndSkipBit;
67 class ProductRegistry;
68 class ThinnedAssociationsHelper;
71 namespace workerhelper {
90 typename T::Context
const* context);
93 typename T::MyPrincipal
const&,
EventSetup const&
c,
96 typename T::Context
const* context);
135 std::unordered_multimap<std::string, edm::ProductResolverIndex>
const& iIndicies) = 0;
139 std::map<std::string, ModuleDescription const*>
const& labelsToDesc)
const = 0;
141 virtual std::vector<ConsumesInfo>
consumesInfo()
const = 0;
146 timesRun_.store(0,std::memory_order_relaxed);
201 template <
typename T>
205 typename T::Context
const* context);
210 virtual std::vector<ProductResolverIndexAndSkipBit>
const&
itemsToGetFromEvent()
const = 0;
219 unsigned int iNumberOfChildren) = 0;
239 template<
typename T>
244 std::ostringstream iost;
258 template<
bool IS_EVENT>
267 template<
bool IS_EVENT>
276 template<
bool IS_EVENT>
296 typename T::MyPrincipal
const& ep,
300 typename T::Context
const* context);
302 template<
typename T>
306 typename T::MyPrincipal
const& ep,
310 typename T::Context
const* context):
325 std::exception_ptr temp_excptr;
331 temp_excptr = std::current_exception();
333 excptr = &temp_excptr;
346 queue->push( [worker, &
principal, &es, streamID,parentContext,sContext, serviceToken]()
351 std::exception_ptr* ptr =
nullptr;
405 template <
typename T>
406 class ModuleSignalSentry {
409 typename T::Context
const* context,
416 ~ModuleSignalSentry() {
422 typename T::Context
const* context_;
428 namespace workerhelper {
439 return iWorker->
implDo(ep,es, mcc);
457 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
475 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
493 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
511 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
530 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
549 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
569 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
588 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
601 template <
typename T>
603 typename T::MyPrincipal
const& ep,
607 typename T::Context
const* context) {
613 bool expected =
false;
614 if(
workStarted_.compare_exchange_strong(expected,
true)) {
619 setPassed<T::isEvent_>();
625 this, ep,es,streamID,parentContext,context);
632 typename T::MyPrincipal
const& ep,
636 typename T::Context
const* context) {
642 std::rethrow_exception(*iEPtr);
645 runModule<T>(ep,es,streamID,parentContext,context);
651 std::ostringstream iost;
653 iost<<
"Prefetching for module ";
655 iost<<
"Calling method for module ";
660 setException<T::isEvent_>(std::current_exception());
664 setPassed<T::isEvent_>();
670 template <
typename T>
675 typename T::Context
const* context) {
684 case Pass:
return true;
685 case Fail:
return false;
691 bool expected =
false;
692 if(not
workStarted_.compare_exchange_strong(expected,
true) ) {
695 waitTask->increment_ref_count();
699 waitTask->wait_for_all();
703 case Pass:
return true;
704 case Fail:
return false;
715 std::unique_ptr<ModuleCallingContext, decltype(resetContext)> prefetchSentry(&
moduleCallingContext_,resetContext);
724 timesRun_.fetch_add(1,std::memory_order_relaxed);
725 rc = setPassed<T::isEvent_>();
734 auto sentryFunc = [
this](
void*) {
737 std::unique_ptr<ActivityRegistry, decltype(sentryFunc)> signalSentry(
actReg_.get(),sentryFunc);
740 waitTask->set_ref_count(2);
743 waitTask->decrement_ref_count();
744 waitTask->wait_for_all();
746 if(waitTask->exceptionPtr() !=
nullptr) {
747 std::rethrow_exception(*(waitTask->exceptionPtr()));
751 prefetchSentry.release();
754 queue->pushAndWait([&]() {
757 rc = runModule<T>(ep,es,streamID,parentContext,context);
760 rc = runModule<T>(ep,es,streamID,parentContext,context);
768 setException<T::isEvent_>(std::current_exception());
770 std::rethrow_exception(cached_exception_);
772 rc = setPassed<T::isEvent_>();
780 template <
typename T>
785 typename T::Context
const* context) {
792 timesRun_.fetch_add(1,std::memory_order_relaxed);
798 setPassed<T::isEvent_>();
800 setFailed<T::isEvent_>();
static bool call(Worker *iWorker, StreamID, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
tbb::task * execute() override
bool shouldRethrowException(cms::Exception &ex, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
std::atomic< int > timesVisited_
RunModuleTask(Worker *worker, typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
void resetModuleDescription(ModuleDescription const *)
ModuleDescription const & description() const
virtual bool implDoStreamBegin(StreamID id, RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
T::Context const * m_context
bool runModule(typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
void callWhenDoneAsync(WaitingTask *task)
T::MyPrincipal const & m_principal
std::atomic< int > numberOfPathsLeftToRun_
StreamContext const * getStreamContext() const
std::exception_ptr setException(std::exception_ptr iException)
void setEarlyDeleteHelper(EarlyDeleteHelper *iHelper)
virtual void modulesWhoseProductsAreConsumed(std::vector< ModuleDescription const * > &modules, ProductRegistry const &preg, std::map< std::string, ModuleDescription const * > const &labelsToDesc) const =0
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
virtual void implPostForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren)=0
static bool call(Worker *iWorker, StreamID id, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin > Arg
static PFTauRenderPlugin instance
ParentContext const m_parentContext
std::atomic< int > timesExcept_
std::atomic< bool > workStarted_
OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > Arg
std::atomic< int > timesFailed_
void add(WaitingTask *)
Adds task to the waiting list.
virtual void implRespondToCloseInputFile(FileBlock const &fb)=0
std::shared_ptr< ActivityRegistry > actReg_
bool doWork(typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
static bool call(Worker *iWorker, StreamID, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
std::string const & moduleName() const
static bool call(Worker *iWorker, StreamID, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
static bool call(Worker *iWorker, StreamID, EventPrincipal const &ep, EventSetup const &es, ActivityRegistry *, ModuleCallingContext const *mcc, Arg::Context const *)
virtual SerialTaskQueueChain * serializeRunModule()=0
virtual void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const &)=0
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
ActivityRegistry * activityRegistry()
static bool call(Worker *iWorker, StreamID id, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
void beginStream(StreamID id, StreamContext &streamContext)
virtual void implPreForkReleaseResources()=0
void reset()
Resets access to the resource so that added tasks will wait.
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
virtual std::string workerType() const =0
ExceptionToActionTable const * actions_
ServiceToken presentToken() const
std::string const & moduleLabel() const
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
void setActivityRegistry(std::shared_ptr< ActivityRegistry > areg)
virtual void resolvePutIndicies(BranchType iBranchType, std::unordered_multimap< std::string, edm::ProductResolverIndex > const &iIndicies)=0
virtual void itemsToGet(BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
ModuleCallingContext moduleCallingContext_
std::exception_ptr cached_exception_
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
Worker & operator=(Worker const &)=delete
virtual bool implDoEnd(RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
edm::WaitingTaskList waitingTasks_
virtual void implRegisterThinnedAssociations(ProductRegistry const &, ThinnedAssociationsHelper &)=0
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > Arg
ModuleDescription const * descPtr() const
virtual bool implDoBegin(RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
ModuleDescription const * moduleDescription() const
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
virtual std::vector< ConsumesInfo > consumesInfo() const =0
void registerThinnedAssociations(ProductRegistry const ®istry, ThinnedAssociationsHelper &helper)
static ServiceRegistry & instance()
#define CMS_THREAD_GUARD(_var_)
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd > Arg
Worker(ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
std::atomic< State > state_
static bool call(Worker *iWorker, StreamID id, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
static bool call(Worker *iWorker, StreamID, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd > Arg
TransitionIDValue(T const &iP)
virtual void implEndJob()=0
virtual std::string value() const override
void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren)
virtual Types moduleType() const =0
OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > Arg
virtual bool implDoPrePrefetchSelection(StreamID id, EventPrincipal const &ep, ModuleCallingContext const *mcc)=0
OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > Arg
virtual void implBeginJob()=0
void respondToOpenInputFile(FileBlock const &fb)
void prefetchAsync(WaitingTask *, ParentContext const &parentContext, Principal const &)
void addContext(std::string const &context)
virtual void implBeginStream(StreamID)=0
static bool call(Worker *iWorker, StreamID id, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
virtual void implEndStream(StreamID)=0
void respondToCloseInputFile(FileBlock const &fb)
void preForkReleaseResources()
virtual std::vector< ProductResolverIndex > const & itemsShouldPutInEvent() const =0
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
void emitPostModuleEventPrefetchingSignal()
virtual bool implDo(EventPrincipal const &, EventSetup const &c, ModuleCallingContext const *mcc)=0
static void exceptionContext(const std::string &iID, bool iIsEvent, cms::Exception &ex, ModuleCallingContext const *mcc)
std::atomic< int > timesPassed_
void endStream(StreamID id, StreamContext &streamContext)
void postDoEvent(EventPrincipal const &)
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFromEvent() const =0
virtual bool implDoStreamEnd(StreamID id, RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
auto wrap(F iFunc) -> decltype(iFunc())
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
void doWorkAsync(WaitingTask *task, typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin > Arg
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
std::atomic< int > timesRun_
virtual std::string value() const =0
std::exception_ptr const * exceptionPtr() const
Returns exception thrown by dependent task.
virtual void implRespondToOpenInputFile(FileBlock const &fb)=0
OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > Arg
ServiceToken m_serviceToken
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
void runModuleAfterAsyncPrefetch(std::exception_ptr const *iEPtr, typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
virtual void itemsMayGet(BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0