1 #ifndef FWCore_Framework_Worker_h 2 #define FWCore_Framework_Worker_h 61 #include <unordered_map> 66 class ProductResolverIndexHelper;
67 class ProductResolverIndexAndSkipBit;
70 class ProductRegistry;
71 class ThinnedAssociationsHelper;
74 namespace workerhelper {
90 operator bool() {
return serial_ !=
nullptr or limited_ !=
nullptr; }
117 template <
typename T>
118 bool doWork(
typename T::MyPrincipal
const&,
EventSetup const&
c,
121 typename T::Context
const* context);
122 template <
typename T>
124 typename T::MyPrincipal
const&,
EventSetup const& c,
127 typename T::Context
const* context);
129 template <
typename T>
131 typename T::MyPrincipal
const&,
135 typename T::Context
const* context);
137 template <
typename T>
138 std::exception_ptr runModuleDirectly(
typename T::MyPrincipal
const& ep,
142 typename T::Context
const* context);
145 waitingTasks_.add(task);
158 cached_exception_ = std::exception_ptr();
160 waitingTasks_.reset();
161 workStarted_ =
false;
162 numberOfPathsLeftToRun_ = numberOfPathsOn_;
171 void setActivityRegistry(std::shared_ptr<ActivityRegistry> areg);
176 virtual void updateLookup(
BranchType iBranchType,
178 virtual void resolvePutIndicies(
BranchType iBranchType,
179 std::unordered_multimap<
std::string, std::tuple<TypeID const*, const char*, edm::ProductResolverIndex>>
const& iIndicies) = 0;
181 virtual void modulesWhoseProductsAreConsumed(std::vector<ModuleDescription const*>&
modules,
183 std::map<std::string, ModuleDescription const*>
const& labelsToDesc)
const = 0;
187 virtual std::vector<ConsumesInfo> consumesInfo()
const = 0;
189 virtual Types moduleType()
const =0;
192 timesRun_.store(0,std::memory_order_release);
193 timesVisited_.store(0,std::memory_order_release);
194 timesPassed_.store(0,std::memory_order_release);
195 timesFailed_.store(0,std::memory_order_release);
196 timesExcept_.store(0,std::memory_order_release);
203 int timesRun()
const {
return timesRun_.load(std::memory_order_acquire); }
204 int timesVisited()
const {
return timesVisited_.load(std::memory_order_acquire); }
205 int timesPassed()
const {
return timesPassed_.load(std::memory_order_acquire); }
206 int timesFailed()
const {
return timesFailed_.load(std::memory_order_acquire); }
207 int timesExcept()
const {
return timesExcept_.load(std::memory_order_acquire); }
217 virtual bool implDoPrePrefetchSelection(
StreamID id,
236 virtual void implBeginJob() = 0;
237 virtual void implEndJob() = 0;
238 virtual void implBeginStream(
StreamID) = 0;
239 virtual void implEndStream(
StreamID) = 0;
247 template <
typename T>
248 bool runModule(
typename T::MyPrincipal
const&,
EventSetup const& c,
251 typename T::Context
const* context);
253 virtual void itemsToGet(
BranchType, std::vector<ProductResolverIndexAndSkipBit>&)
const = 0;
254 virtual void itemsMayGet(
BranchType, std::vector<ProductResolverIndexAndSkipBit>&)
const = 0;
256 virtual std::vector<ProductResolverIndexAndSkipBit>
const& itemsToGetFrom(
BranchType)
const = 0;
259 virtual std::vector<ProductResolverIndex>
const& itemsShouldPutInEvent()
const = 0;
263 virtual void implRespondToOpenInputFile(
FileBlock const&
fb) = 0;
264 virtual void implRespondToCloseInputFile(
FileBlock const& fb) = 0;
284 template<
typename T>
289 std::ostringstream iost;
298 bool shouldRethrowException(std::exception_ptr iPtr,
303 template<
bool IS_EVENT>
306 timesPassed_.fetch_add(1,std::memory_order_relaxed);
312 template<
bool IS_EVENT>
315 timesFailed_.fetch_add(1,std::memory_order_relaxed);
321 template<
bool IS_EVENT>
324 timesExcept_.fetch_add(1,std::memory_order_relaxed);
326 cached_exception_ = iException;
328 return cached_exception_;
336 actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),moduleCallingContext_);
340 std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr
const * iEPtr,
341 typename T::MyPrincipal
const& ep,
345 typename T::Context
const* context);
347 template<
typename T>
351 typename T::MyPrincipal
const& ep,
355 typename T::Context
const* context):
359 m_streamID(streamID),
360 m_parentContext(parentContext),
370 std::exception_ptr temp_excptr;
371 auto excptr = exceptionPtr();
375 m_worker->emitPostModuleEventPrefetchingSignal();
377 temp_excptr = std::current_exception();
379 excptr = &temp_excptr;
385 if(
auto queue = m_worker->serializeRunModule()) {
386 Worker* worker = m_worker;
389 auto streamID = m_streamID;
390 auto parentContext = m_parentContext;
391 auto serviceToken = m_serviceToken;
392 auto sContext = m_context;
393 queue.push( [worker, &
principal, &es, streamID,parentContext,sContext, serviceToken]()
398 std::exception_ptr* ptr =
nullptr;
410 m_worker->runModuleAfterAsyncPrefetch<
T>(excptr,
448 std::atomic<
bool> workStarted_;
452 template <
typename T>
453 class ModuleSignalSentry {
456 typename T::Context
const* context,
458 a_(a), context_(context), moduleCallingContext_(moduleCallingContext) {
460 if(a_) T::preModuleSignal(a_, context, moduleCallingContext_);
463 ~ModuleSignalSentry() {
464 if(a_) T::postModuleSignal(a_, context_, moduleCallingContext_);
469 typename T::Context
const* context_;
475 namespace workerhelper {
486 return iWorker->
implDo(ep,es, mcc);
504 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
522 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
540 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
558 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
577 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
596 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
616 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
635 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
648 template <
typename T>
650 typename T::MyPrincipal
const& ep,
654 typename T::Context
const* context) {
655 waitingTasks_.add(task);
657 timesVisited_.fetch_add(1,std::memory_order_relaxed);
660 bool expected =
false;
661 if(workStarted_.compare_exchange_strong(expected,
true)) {
667 setPassed<T::isEvent_>();
668 waitingTasks_.doneWaiting(
nullptr);
674 this, ep,es,streamID,parentContext,context);
675 prefetchAsync(runTask, parentContext, ep);
681 typename T::MyPrincipal
const& ep,
685 typename T::Context
const* context) {
686 std::exception_ptr exceptionPtr;
690 if(shouldRethrowException(*iEPtr, parentContext, T::isEvent_, idValue)) {
691 exceptionPtr = *iEPtr;
692 setException<T::isEvent_>(exceptionPtr);
694 setPassed<T::isEvent_>();
699 runModule<T>(ep,es,streamID,parentContext,context);
701 exceptionPtr = std::current_exception();
704 waitingTasks_.doneWaiting(exceptionPtr);
708 template <
typename T>
710 typename T::MyPrincipal
const&
principal,
714 typename T::Context
const* context) {
715 waitingTasks_.add(task);
716 bool expected =
false;
717 if(workStarted_.compare_exchange_strong(expected,
true)) {
720 auto toDo =[
this, &
principal, &es, streamID,parentContext,context, serviceToken]()
722 std::exception_ptr exceptionPtr;
733 exceptionPtr = std::current_exception();
735 this->waitingTasks_.doneWaiting(exceptionPtr);
737 if(
auto queue = this->serializeRunModule()) {
741 tbb::task::spawn(*task);
746 template <
typename T>
751 typename T::Context
const* context) {
754 timesVisited_.fetch_add(1,std::memory_order_relaxed);
760 case Pass:
return true;
761 case Fail:
return false;
763 std::rethrow_exception(cached_exception_);
767 bool expected =
false;
768 if(not workStarted_.compare_exchange_strong(expected,
true) ) {
771 waitTask->increment_ref_count();
773 waitingTasks_.add(waitTask.get());
775 waitTask->wait_for_all();
778 case Ready: assert(
false);
779 case Pass:
return true;
780 case Fail:
return false;
782 std::rethrow_exception(cached_exception_);
791 std::unique_ptr<ModuleCallingContext, decltype(resetContext)> prefetchSentry(&moduleCallingContext_,resetContext);
797 timesRun_.fetch_add(1,std::memory_order_relaxed);
798 rc = setPassed<T::isEvent_>();
799 waitingTasks_.doneWaiting(
nullptr);
808 auto sentryFunc = [
this](
void*) {
809 emitPostModuleEventPrefetchingSignal();
811 std::unique_ptr<ActivityRegistry, decltype(sentryFunc)> signalSentry(actReg_.get(),sentryFunc);
814 waitTask->set_ref_count(2);
816 prefetchAsync(waitTask.get(),parentContext, ep);
817 waitTask->decrement_ref_count();
818 waitTask->wait_for_all();
820 if(waitTask->exceptionPtr() !=
nullptr) {
822 if(shouldRethrowException(*waitTask->exceptionPtr(), parentContext, T::isEvent_, idValue)) {
823 setException<T::isEvent_>(*waitTask->exceptionPtr());
824 waitingTasks_.doneWaiting(cached_exception_);
825 std::rethrow_exception(cached_exception_);
827 setPassed<T::isEvent_>();
828 waitingTasks_.doneWaiting(
nullptr);
835 prefetchSentry.release();
836 if(
auto queue = serializeRunModule()) {
838 queue.pushAndWait([&]() {
842 rc = runModule<T>(ep,es,streamID,parentContext,context);
848 rc = runModule<T>(ep,es,streamID,parentContext,context);
852 if(state_ == Exception) {
853 waitingTasks_.doneWaiting(cached_exception_);
854 std::rethrow_exception(cached_exception_);
857 waitingTasks_.doneWaiting(
nullptr);
862 template <
typename T>
867 typename T::Context
const* context) {
874 timesRun_.fetch_add(1,std::memory_order_relaxed);
884 setPassed<T::isEvent_>();
886 setFailed<T::isEvent_>();
892 if(shouldRethrowException(std::current_exception(), parentContext, T::isEvent_, idValue)) {
893 assert(not cached_exception_);
894 setException<T::isEvent_>(std::current_exception());
895 std::rethrow_exception(cached_exception_);
897 rc = setPassed<T::isEvent_>();
904 template <
typename T>
909 typename T::Context
const* context) {
911 timesVisited_.fetch_add(1,std::memory_order_relaxed);
912 std::exception_ptr
const* prefetchingException =
nullptr;
913 return runModuleAfterAsyncPrefetch<T>(prefetchingException, ep, es, streamID, parentContext, context);
static bool call(Worker *iWorker, StreamID, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
tbb::task * execute() override
std::atomic< int > timesVisited_
RunModuleTask(Worker *worker, typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
ModuleDescription const & description() const
virtual bool implDoStreamBegin(StreamID id, RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
T::Context const * m_context
bool runModule(typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
void callWhenDoneAsync(WaitingTask *task)
T::MyPrincipal const & m_principal
std::atomic< int > numberOfPathsLeftToRun_
std::exception_ptr setException(std::exception_ptr iException)
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
static bool call(Worker *iWorker, StreamID id, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin > Arg
static PFTauRenderPlugin instance
ParentContext const m_parentContext
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
std::atomic< int > timesExcept_
void push(const T &iAction)
asynchronously pushes functor iAction into queue
OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > Arg
std::atomic< int > timesFailed_
bool doWork(typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
static bool call(Worker *iWorker, StreamID, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
TaskQueueAdaptor(SerialTaskQueueChain *iChain)
static bool call(Worker *iWorker, StreamID, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
static bool call(Worker *iWorker, StreamID, EventPrincipal const &ep, EventSetup const &es, ActivityRegistry *, ModuleCallingContext const *mcc, Arg::Context const *)
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
ActivityRegistry * activityRegistry()
static bool call(Worker *iWorker, StreamID id, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
void exceptionContext(std::ostream &, GlobalContext const &)
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
ExceptionToActionTable const * actions_
ServiceToken presentToken() const
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
std::exception_ptr runModuleDirectly(typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
ModuleCallingContext moduleCallingContext_
virtual bool implDoEnd(RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > Arg
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
ModuleDescription const * descPtr() const
virtual bool implDoBegin(RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
void registerThinnedAssociations(ProductRegistry const ®istry, ThinnedAssociationsHelper &helper)
static ServiceRegistry & instance()
#define CMS_THREAD_GUARD(_var_)
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd > Arg
std::atomic< State > state_
static bool call(Worker *iWorker, StreamID id, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
static bool call(Worker *iWorker, StreamID, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
void pushAndWait(const T &iAction)
synchronously pushes functor iAction into queue
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd > Arg
TransitionIDValue(T const &iP)
TaskQueueAdaptor(LimitedTaskQueue *iLimited)
OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > Arg
void push(const T &iAction)
asynchronously pushes functor iAction into queue
void pushAndWait(const T &iAction)
synchronously pushes functor iAction into queue
virtual bool implDoPrePrefetchSelection(StreamID id, EventPrincipal const &ep, ModuleCallingContext const *mcc)=0
std::string value() const override
void doWorkNoPrefetchingAsync(WaitingTask *task, typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > Arg
std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr const *iEPtr, typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
void respondToOpenInputFile(FileBlock const &fb)
static bool call(Worker *iWorker, StreamID id, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
void respondToCloseInputFile(FileBlock const &fb)
virtual ~TransitionIDValueBase()
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
void emitPostModuleEventPrefetchingSignal()
virtual bool implDo(EventPrincipal const &, EventSetup const &c, ModuleCallingContext const *mcc)=0
std::atomic< int > timesPassed_
virtual bool implDoStreamEnd(StreamID id, RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
auto wrap(F iFunc) -> decltype(iFunc())
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
void doWorkAsync(WaitingTask *task, typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin > Arg
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
static uInt32 F(BLOWFISH_CTX *ctx, uInt32 x)
std::atomic< int > timesRun_
OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > Arg
ServiceToken m_serviceToken
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)