CMS 3D CMS Logo

List of all members | Classes | Public Types | Public Member Functions | Protected Member Functions | Private Member Functions | Private Attributes | Friends
edm::Worker Class Referenceabstract

#include <Worker.h>

Inheritance diagram for edm::Worker:
edm::WorkerT< T >

Classes

class  AcquireTask
 
class  AcquireTask< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin >, DUMMY >
 
class  HandleExternalWorkExceptionTask
 
class  RunModuleTask
 
struct  TaskQueueAdaptor
 

Public Types

enum  ConcurrencyTypes {
  kGlobal, kLimited, kOne, kStream,
  kLegacy
}
 
enum  State { Ready, Pass, Fail, Exception }
 
enum  Types { kAnalyzer, kFilter, kProducer, kOutputModule }
 

Public Member Functions

void addedToPath ()
 
void beginJob ()
 
void beginStream (StreamID id, StreamContext &streamContext)
 
void callWhenDoneAsync (WaitingTaskHolder task)
 
void clearCounters ()
 
void clearModule ()
 
virtual std::vector< ConsumesInfoconsumesInfo () const =0
 
virtual void convertCurrentProcessAlias (std::string const &processName)=0
 
ModuleDescription const * description () const
 
void doTransformAsync (WaitingTaskHolder, size_t iTransformIndex, EventPrincipal const &, ServiceToken const &, StreamID, ModuleCallingContext const &, StreamContext const *)
 
template<typename T >
void doWorkAsync (WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
 
template<typename T >
void doWorkNoPrefetchingAsync (WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
 
void endJob ()
 
void endStream (StreamID id, StreamContext &streamContext)
 
virtual SerialTaskQueueglobalLuminosityBlocksQueue ()=0
 
virtual SerialTaskQueueglobalRunsQueue ()=0
 
virtual bool hasAccumulator () const =0
 
virtual ConcurrencyTypes moduleConcurrencyType () const =0
 
virtual void modulesWhoseProductsAreConsumed (std::array< std::vector< ModuleDescription const *> *, NumBranchTypes > &modules, std::vector< ModuleProcessName > &modulesInPreviousProcesses, ProductRegistry const &preg, std::map< std::string, ModuleDescription const *> const &labelsToDesc) const =0
 
virtual Types moduleType () const =0
 
Workeroperator= (Worker const &)=delete
 
void postDoEvent (EventPrincipal const &)
 
void prePrefetchSelectionAsync (oneapi::tbb::task_group &, WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *)
 
void prePrefetchSelectionAsync (oneapi::tbb::task_group &, WaitingTask *task, ServiceToken const &, StreamID stream, void const *)
 
void registerThinnedAssociations (ProductRegistry const &registry, ThinnedAssociationsHelper &helper)
 
void reset ()
 
virtual void resolvePutIndicies (BranchType iBranchType, std::unordered_multimap< std::string, std::tuple< TypeID const *, const char *, edm::ProductResolverIndex >> const &iIndicies)=0
 
void respondToCloseInputFile (FileBlock const &fb)
 
void respondToCloseOutputFile ()
 
void respondToOpenInputFile (FileBlock const &fb)
 
template<typename T >
std::exception_ptr runModuleDirectly (typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
 
virtual void selectInputProcessBlocks (ProductRegistry const &, ProcessBlockHelperBase const &)=0
 
void setActivityRegistry (std::shared_ptr< ActivityRegistry > areg)
 
void setEarlyDeleteHelper (EarlyDeleteHelper *iHelper)
 
void skipOnPath (EventPrincipal const &iEvent)
 
State state () const
 
int timesExcept () const
 
int timesFailed () const
 
int timesPass () const
 
int timesPassed () const
 
int timesRun () const
 
int timesVisited () const
 
virtual size_t transformIndex (edm::BranchDescription const &) const =0
 
virtual void updateLookup (BranchType iBranchType, ProductResolverIndexHelper const &)=0
 
virtual void updateLookup (eventsetup::ESRecordsToProxyIndices const &)=0
 
edm::WaitingTaskListwaitingTaskList ()
 
virtual bool wantsGlobalLuminosityBlocks () const =0
 
virtual bool wantsGlobalRuns () const =0
 
virtual bool wantsInputProcessBlocks () const =0
 
virtual bool wantsProcessBlocks () const =0
 
virtual bool wantsStreamLuminosityBlocks () const =0
 
virtual bool wantsStreamRuns () const =0
 
 Worker (ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
 
 Worker (Worker const &)=delete
 
virtual ~Worker ()
 

Protected Member Functions

ActivityRegistryactivityRegistry ()
 
virtual void doClearModule ()=0
 
virtual void implBeginJob ()=0
 
virtual void implBeginStream (StreamID)=0
 
virtual bool implDo (EventTransitionInfo const &, ModuleCallingContext const *)=0
 
virtual bool implDoAccessInputProcessBlock (ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
 
virtual void implDoAcquire (EventTransitionInfo const &, ModuleCallingContext const *, WaitingTaskWithArenaHolder &)=0
 
virtual bool implDoBegin (RunTransitionInfo const &, ModuleCallingContext const *)=0
 
virtual bool implDoBegin (LumiTransitionInfo const &, ModuleCallingContext const *)=0
 
virtual bool implDoBeginProcessBlock (ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
 
virtual bool implDoEnd (RunTransitionInfo const &, ModuleCallingContext const *)=0
 
virtual bool implDoEnd (LumiTransitionInfo const &, ModuleCallingContext const *)=0
 
virtual bool implDoEndProcessBlock (ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
 
virtual bool implDoPrePrefetchSelection (StreamID, EventPrincipal const &, ModuleCallingContext const *)=0
 
virtual bool implDoStreamBegin (StreamID, RunTransitionInfo const &, ModuleCallingContext const *)=0
 
virtual bool implDoStreamBegin (StreamID, LumiTransitionInfo const &, ModuleCallingContext const *)=0
 
virtual bool implDoStreamEnd (StreamID, RunTransitionInfo const &, ModuleCallingContext const *)=0
 
virtual bool implDoStreamEnd (StreamID, LumiTransitionInfo const &, ModuleCallingContext const *)=0
 
virtual void implDoTransformAsync (WaitingTaskHolder, size_t iTransformIndex, EventPrincipal const &, ParentContext const &, ServiceWeakToken const &)=0
 
virtual void implEndJob ()=0
 
virtual void implEndStream (StreamID)=0
 
virtual bool implNeedToRunSelection () const =0
 
virtual void itemsToGetForSelection (std::vector< ProductResolverIndexAndSkipBit > &) const =0
 
virtual ProductResolverIndex itemToGetForTransform (size_t iTransformIndex) const =0
 
void resetModuleDescription (ModuleDescription const *)
 
virtual std::string workerType () const =0
 

Private Member Functions

void edPrefetchAsync (WaitingTaskHolder, ServiceToken const &, Principal const &) const
 
void emitPostModuleEventPrefetchingSignal ()
 
void emitPostModuleGlobalPrefetchingSignal ()
 
void emitPostModuleStreamPrefetchingSignal ()
 
virtual std::vector< ESProxyIndex > const & esItemsToGetFrom (Transition) const =0
 
void esPrefetchAsync (WaitingTaskHolder, EventSetupImpl const &, Transition, ServiceToken const &)
 
virtual std::vector< ESRecordIndex > const & esRecordsToGetFrom (Transition) const =0
 
std::exception_ptr handleExternalWorkException (std::exception_ptr iEPtr, ParentContext const &parentContext)
 
virtual bool hasAcquire () const =0
 
virtual void implRegisterThinnedAssociations (ProductRegistry const &, ThinnedAssociationsHelper &)=0
 
virtual void implRespondToCloseInputFile (FileBlock const &fb)=0
 
virtual void implRespondToCloseOutputFile ()=0
 
virtual void implRespondToOpenInputFile (FileBlock const &fb)=0
 
virtual void itemsMayGet (BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
 
virtual void itemsToGet (BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
 
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom (BranchType) const =0
 
bool needsESPrefetching (Transition iTrans) const noexcept
 
virtual void preActionBeforeRunEventAsync (WaitingTaskHolder iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const =0
 
template<typename T >
void prefetchAsync (WaitingTaskHolder, ServiceToken const &, ParentContext const &, typename T::TransitionInfoType const &, Transition)
 
void runAcquire (EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder &)
 
void runAcquireAfterAsyncPrefetch (std::exception_ptr, EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder)
 
template<typename T >
bool runModule (typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
 
template<typename T >
std::exception_ptr runModuleAfterAsyncPrefetch (std::exception_ptr, typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
 
virtual TaskQueueAdaptor serializeRunModule ()=0
 
template<bool IS_EVENT>
std::exception_ptr setException (std::exception_ptr iException)
 
template<bool IS_EVENT>
bool setFailed ()
 
template<bool IS_EVENT>
bool setPassed ()
 
bool shouldRethrowException (std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent) const
 

Private Attributes

ExceptionToActionTable const * actions_
 
std::shared_ptr< ActivityRegistryactReg_
 
std::exception_ptr cached_exception_
 
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
 
ModuleCallingContext moduleCallingContext_
 
bool moduleValid_ = true
 
std::atomic< int > numberOfPathsLeftToRun_
 
int numberOfPathsOn_
 
bool ranAcquireWithoutException_
 
std::atomic< Statestate_
 
std::atomic< int > timesExcept_
 
std::atomic< int > timesFailed_
 
std::atomic< int > timesPassed_
 
std::atomic< int > timesRun_
 
std::atomic< int > timesVisited_
 
edm::WaitingTaskList waitingTasks_
 
std::atomic< bool > workStarted_
 

Friends

template<typename O >
class workerhelper::CallImpl
 

Detailed Description

Definition at line 92 of file Worker.h.

Member Enumeration Documentation

◆ ConcurrencyTypes

Enumerator
kGlobal 
kLimited 
kOne 
kStream 
kLegacy 

Definition at line 96 of file Worker.h.

◆ State

Enumerator
Ready 
Pass 
Fail 
Exception 

Definition at line 94 of file Worker.h.

◆ Types

Enumerator
kAnalyzer 
kFilter 
kProducer 
kOutputModule 

Definition at line 95 of file Worker.h.

Constructor & Destructor Documentation

◆ Worker() [1/2]

edm::Worker::Worker ( ModuleDescription const &  iMD,
ExceptionToActionTable const *  iActions 
)

Definition at line 89 of file Worker.cc.

90  : timesRun_(0),
91  timesVisited_(0),
92  timesPassed_(0),
93  timesFailed_(0),
94  timesExcept_(0),
95  state_(Ready),
99  actions_(iActions),
101  actReg_(),
102  earlyDeleteHelper_(nullptr),
103  workStarted_(false),
std::atomic< int > timesVisited_
Definition: Worker.h:601
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:607
std::atomic< int > timesExcept_
Definition: Worker.h:604
std::atomic< bool > workStarted_
Definition: Worker.h:619
std::atomic< int > timesFailed_
Definition: Worker.h:603
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:614
ExceptionToActionTable const * actions_
Definition: Worker.h:611
bool ranAcquireWithoutException_
Definition: Worker.h:620
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:609
std::exception_ptr cached_exception_
Definition: Worker.h:612
std::atomic< State > state_
Definition: Worker.h:605
int numberOfPathsOn_
Definition: Worker.h:606
std::atomic< int > timesPassed_
Definition: Worker.h:602
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:616
std::atomic< int > timesRun_
Definition: Worker.h:600

◆ ~Worker()

edm::Worker::~Worker ( )
virtual

Definition at line 106 of file Worker.cc.

106 {}

◆ Worker() [2/2]

edm::Worker::Worker ( Worker const &  )
delete

Member Function Documentation

◆ activityRegistry()

ActivityRegistry* edm::Worker::activityRegistry ( )
inlineprotected

Definition at line 298 of file Worker.h.

References actReg_.

298 { return actReg_.get(); }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:614

◆ addedToPath()

void edm::Worker::addedToPath ( )
inline

Definition at line 240 of file Worker.h.

References numberOfPathsOn_.

240 { ++numberOfPathsOn_; }
int numberOfPathsOn_
Definition: Worker.h:606

◆ beginJob()

void edm::Worker::beginJob ( void  )

Definition at line 273 of file Worker.cc.

References actReg_, cms::Exception::addContext(), description(), Exception, implBeginJob(), edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), state_, and edm::convertException::wrap().

Referenced by edm::WorkerManager::beginJob().

273  {
274  try {
275  convertException::wrap([&]() {
276  ModuleBeginJobSignalSentry cpp(actReg_.get(), *description());
277  implBeginJob();
278  });
279  } catch (cms::Exception& ex) {
280  state_ = Exception;
281  std::ostringstream ost;
282  ost << "Calling beginJob for module " << description()->moduleName() << "/'" << description()->moduleLabel()
283  << "'";
284  ex.addContext(ost.str());
285  throw;
286  }
287  }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:614
std::string const & moduleName() const
std::atomic< State > state_
Definition: Worker.h:605
ModuleDescription const * description() const
Definition: Worker.h:198
virtual void implBeginJob()=0
void addContext(std::string const &context)
Definition: Exception.cc:165
auto wrap(F iFunc) -> decltype(iFunc())
std::string const & moduleLabel() const

◆ beginStream()

void edm::Worker::beginStream ( StreamID  id,
StreamContext streamContext 
)

Definition at line 306 of file Worker.cc.

References actReg_, cms::Exception::addContext(), description(), Exception, implBeginStream(), edm::LuminosityBlockIndex::invalidLuminosityBlockIndex(), edm::RunIndex::invalidRunIndex(), edm::StreamContext::kBeginStream, edm::ModuleCallingContext::kRunning, moduleCallingContext_, edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), edm::StreamContext::setEventID(), edm::StreamContext::setLuminosityBlockIndex(), edm::StreamContext::setRunIndex(), edm::ModuleCallingContext::setState(), edm::StreamContext::setTimestamp(), edm::StreamContext::setTransition(), state_, and edm::convertException::wrap().

306  {
307  try {
308  convertException::wrap([&]() {
309  streamContext.setTransition(StreamContext::Transition::kBeginStream);
310  streamContext.setEventID(EventID(0, 0, 0));
311  streamContext.setRunIndex(RunIndex::invalidRunIndex());
312  streamContext.setLuminosityBlockIndex(LuminosityBlockIndex::invalidLuminosityBlockIndex());
313  streamContext.setTimestamp(Timestamp());
314  ParentContext parentContext(&streamContext);
315  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
317  ModuleBeginStreamSignalSentry beginSentry(actReg_.get(), streamContext, moduleCallingContext_);
318  implBeginStream(id);
319  });
320  } catch (cms::Exception& ex) {
321  state_ = Exception;
322  std::ostringstream ost;
323  ost << "Calling beginStream for module " << description()->moduleName() << "/'" << description()->moduleLabel()
324  << "'";
325  ex.addContext(ost.str());
326  throw;
327  }
328  }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:614
std::string const & moduleName() const
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:609
static RunIndex invalidRunIndex()
Definition: RunIndex.cc:9
std::atomic< State > state_
Definition: Worker.h:605
static LuminosityBlockIndex invalidLuminosityBlockIndex()
ModuleDescription const * description() const
Definition: Worker.h:198
void addContext(std::string const &context)
Definition: Exception.cc:165
virtual void implBeginStream(StreamID)=0
auto wrap(F iFunc) -> decltype(iFunc())
std::string const & moduleLabel() const

◆ callWhenDoneAsync()

void edm::Worker::callWhenDoneAsync ( WaitingTaskHolder  task)
inline

Definition at line 177 of file Worker.h.

References edm::WaitingTaskList::add(), eostools::move(), TrackValidation_cff::task, and waitingTasks_.

Referenced by edm::SwitchProducerProductResolver::prefetchAsync_().

edm::WaitingTaskList waitingTasks_
Definition: Worker.h:618
void add(oneapi::tbb::task_group *, WaitingTask *)
Adds task to the waiting list.
def move(src, dest)
Definition: eostools.py:511

◆ clearCounters()

void edm::Worker::clearCounters ( )
inline

Definition at line 232 of file Worker.h.

References timesExcept_, timesFailed_, timesPassed_, timesRun_, and timesVisited_.

Referenced by edm::StreamSchedule::clearCounters().

232  {
233  timesRun_.store(0, std::memory_order_release);
234  timesVisited_.store(0, std::memory_order_release);
235  timesPassed_.store(0, std::memory_order_release);
236  timesFailed_.store(0, std::memory_order_release);
237  timesExcept_.store(0, std::memory_order_release);
238  }
std::atomic< int > timesVisited_
Definition: Worker.h:601
std::atomic< int > timesExcept_
Definition: Worker.h:604
std::atomic< int > timesFailed_
Definition: Worker.h:603
std::atomic< int > timesPassed_
Definition: Worker.h:602
std::atomic< int > timesRun_
Definition: Worker.h:600

◆ clearModule()

void edm::Worker::clearModule ( )
inline

Definition at line 123 of file Worker.h.

References doClearModule(), and moduleValid_.

123  {
124  moduleValid_ = false;
125  doClearModule();
126  }
bool moduleValid_
Definition: Worker.h:621
virtual void doClearModule()=0

◆ consumesInfo()

virtual std::vector<ConsumesInfo> edm::Worker::consumesInfo ( ) const
pure virtual

◆ convertCurrentProcessAlias()

virtual void edm::Worker::convertCurrentProcessAlias ( std::string const &  processName)
pure virtual

Implemented in edm::WorkerT< T >.

◆ description()

ModuleDescription const* edm::Worker::description ( ) const
inline

◆ doClearModule()

virtual void edm::Worker::doClearModule ( )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by clearModule().

◆ doTransformAsync()

void edm::Worker::doTransformAsync ( WaitingTaskHolder  iTask,
size_t  iTransformIndex,
EventPrincipal const &  iPrincipal,
ServiceToken const &  iToken,
StreamID  ,
ModuleCallingContext const &  mcc,
StreamContext const *   
)

Definition at line 240 of file Worker.cc.

References edm::WaitingTaskHolder::doneWaiting(), edm::WaitingTaskHolder::group(), implDoTransformAsync(), itemToGetForTransform(), edm::make_waiting_task(), edm::ModuleCallingContext::parent(), class-composition::parent, edm::Principal::prefetchAsync(), and TrackValidation_cff::task.

Referenced by edm::TransformingProductResolver::prefetchAsync_().

246  {
247  ServiceWeakToken weakToken = iToken;
248 
249  //Need to make the services available early so other services can see them
250  auto task = make_waiting_task([this, iTask, weakToken, &iPrincipal, iTransformIndex, parent = mcc.parent()](
251  std::exception_ptr const* iExcept) mutable {
252  if (iExcept) {
253  iTask.doneWaiting(*iExcept);
254  return;
255  }
256  implDoTransformAsync(iTask, iTransformIndex, iPrincipal, parent, weakToken);
257  });
258 
259  //NOTE: need different ModuleCallingContext. The ProductResolver will copy the context in order to get
260  // a longer lifetime than this function call.
261  iPrincipal.prefetchAsync(
262  WaitingTaskHolder(*iTask.group(), task), itemToGetForTransform(iTransformIndex), false, iToken, &mcc);
263  }
virtual void implDoTransformAsync(WaitingTaskHolder, size_t iTransformIndex, EventPrincipal const &, ParentContext const &, ServiceWeakToken const &)=0
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
virtual ProductResolverIndex itemToGetForTransform(size_t iTransformIndex) const =0

◆ doWorkAsync()

template<typename T >
void edm::Worker::doWorkAsync ( WaitingTaskHolder  task,
typename T::TransitionInfoType const &  transitionInfo,
ServiceToken const &  token,
StreamID  streamID,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

Definition at line 968 of file Worker.h.

References edm::WaitingTaskList::add(), visDQMUpload::context, watchdog::group, hasAcquire(), info(), edm::ModuleCallingContext::kPrefetching, edm::make_waiting_task(), moduleCallingContext_, eostools::move(), AlCaHLTBitMon_ParallelJobs::p, prePrefetchSelectionAsync(), fetchall_from_DQM_v2::release, alignCSCRings::s, edm::ModuleCallingContext::setContext(), submitPVValidationJobs::t, TrackValidation_cff::task, timesVisited_, unpackBuffers-CaloStage2::token, waitingTasks_, and workStarted_.

Referenced by edm::UnscheduledProductResolver::prefetchAsync_(), and edm::WorkerInPath::runWorkerAsync().

973  {
974  if (not workerhelper::CallImpl<T>::wantsTransition(this)) {
975  return;
976  }
977 
978  //Need to check workStarted_ before adding to waitingTasks_
979  bool expected = false;
980  bool workStarted = workStarted_.compare_exchange_strong(expected, true);
981 
983  if constexpr (T::isEvent_) {
984  timesVisited_.fetch_add(1, std::memory_order_relaxed);
985  }
986 
987  if (workStarted) {
989 
990  //if have TriggerResults based selection we want to reject the event before doing prefetching
991  if (workerhelper::CallImpl<T>::needToRunSelection(this)) {
992  //We need to run the selection in a different task so that
993  // we can prefetch the data needed for the selection
994  WaitingTask* moduleTask =
995  new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
996 
997  //make sure the task is either run or destroyed
998  struct DestroyTask {
999  DestroyTask(edm::WaitingTask* iTask) : m_task(iTask) {}
1000 
1001  ~DestroyTask() {
1002  auto p = m_task.exchange(nullptr);
1003  if (p) {
1004  TaskSentry s{p};
1005  }
1006  }
1007 
1008  edm::WaitingTask* release() { return m_task.exchange(nullptr); }
1009 
1010  private:
1011  std::atomic<edm::WaitingTask*> m_task;
1012  };
1013  if constexpr (T::isEvent_) {
1014  if (hasAcquire()) {
1015  auto ownRunTask = std::make_shared<DestroyTask>(moduleTask);
1016  ServiceWeakToken weakToken = token;
1017  auto* group = task.group();
1018  moduleTask = make_waiting_task(
1019  [this, weakToken, transitionInfo, parentContext, ownRunTask, group](std::exception_ptr const* iExcept) {
1020  WaitingTaskWithArenaHolder runTaskHolder(
1021  *group, new HandleExternalWorkExceptionTask(this, group, ownRunTask->release(), parentContext));
1022  AcquireTask<T> t(this, transitionInfo, weakToken.lock(), parentContext, runTaskHolder);
1023  t.execute();
1024  });
1025  }
1026  }
1027  auto* group = task.group();
1028  auto ownModuleTask = std::make_shared<DestroyTask>(moduleTask);
1029  ServiceWeakToken weakToken = token;
1030  auto selectionTask =
1031  make_waiting_task([ownModuleTask, parentContext, info = transitionInfo, weakToken, group, this](
1032  std::exception_ptr const*) mutable {
1033  ServiceRegistry::Operate guard(weakToken.lock());
1034  prefetchAsync<T>(WaitingTaskHolder(*group, ownModuleTask->release()),
1035  weakToken.lock(),
1036  parentContext,
1037  info,
1038  T::transition_);
1039  });
1040  prePrefetchSelectionAsync(*group, selectionTask, token, streamID, &transitionInfo.principal());
1041  } else {
1042  WaitingTask* moduleTask =
1043  new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
1044  auto group = task.group();
1045  if constexpr (T::isEvent_) {
1046  if (hasAcquire()) {
1047  WaitingTaskWithArenaHolder runTaskHolder(
1048  *group, new HandleExternalWorkExceptionTask(this, group, moduleTask, parentContext));
1049  moduleTask = new AcquireTask<T>(this, transitionInfo, token, parentContext, std::move(runTaskHolder));
1050  }
1051  }
1052  prefetchAsync<T>(WaitingTaskHolder(*group, moduleTask), token, parentContext, transitionInfo, T::transition_);
1053  }
1054  }
1055  }
virtual bool hasAcquire() const =0
std::atomic< int > timesVisited_
Definition: Worker.h:601
static const TGPicture * info(bool iBackgroundIsBlack)
std::atomic< bool > workStarted_
Definition: Worker.h:619
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:609
void prePrefetchSelectionAsync(oneapi::tbb::task_group &, WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *)
Definition: Worker.cc:150
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:618
void add(oneapi::tbb::task_group *, WaitingTask *)
Adds task to the waiting list.
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
def move(src, dest)
Definition: eostools.py:511

◆ doWorkNoPrefetchingAsync()

template<typename T >
void edm::Worker::doWorkNoPrefetchingAsync ( WaitingTaskHolder  task,
typename T::TransitionInfoType const &  transitionInfo,
ServiceToken const &  serviceToken,
StreamID  streamID,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

Definition at line 1083 of file Worker.h.

References edm::WaitingTaskList::add(), CMS_SA_ALLOW, visDQMUpload::context, edm::WaitingTaskList::doneWaiting(), esPrefetchAsync(), watchdog::group, info(), edm::ServiceWeakToken::lock(), edm::make_waiting_task(), eostools::move(), needsESPrefetching(), createBeamHaloJobs::queue, serializeRunModule(), TrackValidation_cff::task, waitingTasks_, and workStarted_.

Referenced by edm::WorkerInPath::runWorkerAsync().

1088  {
1089  if (not workerhelper::CallImpl<T>::wantsTransition(this)) {
1090  return;
1091  }
1092 
1093  //Need to check workStarted_ before adding to waitingTasks_
1094  bool expected = false;
1095  auto workStarted = workStarted_.compare_exchange_strong(expected, true);
1096 
1098  if (workStarted) {
1099  ServiceWeakToken weakToken = serviceToken;
1100  auto toDo = [this, info = transitionInfo, streamID, parentContext, context, weakToken]() {
1101  std::exception_ptr exceptionPtr;
1102  // Caught exception is propagated via WaitingTaskList
1103  CMS_SA_ALLOW try {
1104  //Need to make the services available
1105  ServiceRegistry::Operate guard(weakToken.lock());
1106 
1107  this->runModule<T>(info, streamID, parentContext, context);
1108  } catch (...) {
1109  exceptionPtr = std::current_exception();
1110  }
1111  this->waitingTasks_.doneWaiting(exceptionPtr);
1112  };
1113 
1114  if (needsESPrefetching(T::transition_)) {
1115  auto group = task.group();
1116  auto afterPrefetch =
1117  edm::make_waiting_task([toDo = std::move(toDo), group, this](std::exception_ptr const* iExcept) {
1118  if (iExcept) {
1119  this->waitingTasks_.doneWaiting(*iExcept);
1120  } else {
1121  if (auto queue = this->serializeRunModule()) {
1122  queue.push(*group, toDo);
1123  } else {
1124  group->run(toDo);
1125  }
1126  }
1127  });
1129  WaitingTaskHolder(*group, afterPrefetch), transitionInfo.eventSetupImpl(), T::transition_, serviceToken);
1130  } else {
1131  auto group = task.group();
1132  if (auto queue = this->serializeRunModule()) {
1133  queue.push(*group, toDo);
1134  } else {
1135  group->run(toDo);
1136  }
1137  }
1138  }
1139  }
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
std::atomic< bool > workStarted_
Definition: Worker.h:619
bool needsESPrefetching(Transition iTrans) const noexcept
Definition: Worker.h:364
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:618
void add(oneapi::tbb::task_group *, WaitingTask *)
Adds task to the waiting list.
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
void esPrefetchAsync(WaitingTaskHolder, EventSetupImpl const &, Transition, ServiceToken const &)
Definition: Worker.cc:199
virtual TaskQueueAdaptor serializeRunModule()=0
def move(src, dest)
Definition: eostools.py:511

◆ edPrefetchAsync()

void edm::Worker::edPrefetchAsync ( WaitingTaskHolder  iTask,
ServiceToken const &  token,
Principal const &  iPrincipal 
) const
private

Definition at line 224 of file Worker.cc.

References edm::Principal::branchType(), B2GTnPMonitor_cfi::item, mps_monitormerge::items, itemsToGetFrom(), moduleCallingContext_, edm::Principal::prefetchAsync(), edm::ProductResolverIndexAmbiguous, and unpackBuffers-CaloStage2::token.

Referenced by prefetchAsync().

224  {
225  // Prefetch products the module declares it consumes
226  std::vector<ProductResolverIndexAndSkipBit> const& items = itemsToGetFrom(iPrincipal.branchType());
227 
228  for (auto const& item : items) {
229  ProductResolverIndex productResolverIndex = item.productResolverIndex();
230  bool skipCurrentProcess = item.skipCurrentProcess();
231  if (productResolverIndex != ProductResolverIndexAmbiguous) {
232  iPrincipal.prefetchAsync(iTask, productResolverIndex, skipCurrentProcess, token, &moduleCallingContext_);
233  }
234  }
235  }
unsigned int ProductResolverIndex
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom(BranchType) const =0
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:609

◆ emitPostModuleEventPrefetchingSignal()

void edm::Worker::emitPostModuleEventPrefetchingSignal ( )
inlineprivate

Definition at line 368 of file Worker.h.

References actReg_, edm::ModuleCallingContext::getStreamContext(), and moduleCallingContext_.

Referenced by edm::Worker::RunModuleTask< T >::execute().

368  {
369  actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
370  }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:614
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:609
StreamContext const * getStreamContext() const

◆ emitPostModuleGlobalPrefetchingSignal()

void edm::Worker::emitPostModuleGlobalPrefetchingSignal ( )
inlineprivate

Definition at line 377 of file Worker.h.

References actReg_, edm::ModuleCallingContext::getGlobalContext(), and moduleCallingContext_.

Referenced by edm::Worker::RunModuleTask< T >::execute().

377  {
378  actReg_->postModuleGlobalPrefetchingSignal_.emit(*moduleCallingContext_.getGlobalContext(),
380  }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:614
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:609
GlobalContext const * getGlobalContext() const

◆ emitPostModuleStreamPrefetchingSignal()

void edm::Worker::emitPostModuleStreamPrefetchingSignal ( )
inlineprivate

Definition at line 372 of file Worker.h.

References actReg_, edm::ModuleCallingContext::getStreamContext(), and moduleCallingContext_.

Referenced by edm::Worker::RunModuleTask< T >::execute().

372  {
373  actReg_->postModuleStreamPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),
375  }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:614
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:609
StreamContext const * getStreamContext() const

◆ endJob()

void edm::Worker::endJob ( void  )

Definition at line 289 of file Worker.cc.

References actReg_, cms::Exception::addContext(), cms::cuda::assert(), submitPVResolutionJobs::desc, description(), Exception, implEndJob(), edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), state_, and edm::convertException::wrap().

289  {
290  try {
291  convertException::wrap([&]() {
292  ModuleDescription const* desc = description();
293  assert(desc != nullptr);
294  ModuleEndJobSignalSentry cpp(actReg_.get(), *desc);
295  implEndJob();
296  });
297  } catch (cms::Exception& ex) {
298  state_ = Exception;
299  std::ostringstream ost;
300  ost << "Calling endJob for module " << description()->moduleName() << "/'" << description()->moduleLabel() << "'";
301  ex.addContext(ost.str());
302  throw;
303  }
304  }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:614
std::string const & moduleName() const
assert(be >=bs)
std::atomic< State > state_
Definition: Worker.h:605
virtual void implEndJob()=0
ModuleDescription const * description() const
Definition: Worker.h:198
void addContext(std::string const &context)
Definition: Exception.cc:165
auto wrap(F iFunc) -> decltype(iFunc())
std::string const & moduleLabel() const

◆ endStream()

void edm::Worker::endStream ( StreamID  id,
StreamContext streamContext 
)

Definition at line 330 of file Worker.cc.

References actReg_, cms::Exception::addContext(), description(), Exception, implEndStream(), edm::LuminosityBlockIndex::invalidLuminosityBlockIndex(), edm::RunIndex::invalidRunIndex(), edm::StreamContext::kEndStream, edm::ModuleCallingContext::kRunning, moduleCallingContext_, edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), edm::StreamContext::setEventID(), edm::StreamContext::setLuminosityBlockIndex(), edm::StreamContext::setRunIndex(), edm::ModuleCallingContext::setState(), edm::StreamContext::setTimestamp(), edm::StreamContext::setTransition(), state_, and edm::convertException::wrap().

330  {
331  try {
332  convertException::wrap([&]() {
333  streamContext.setTransition(StreamContext::Transition::kEndStream);
334  streamContext.setEventID(EventID(0, 0, 0));
335  streamContext.setRunIndex(RunIndex::invalidRunIndex());
336  streamContext.setLuminosityBlockIndex(LuminosityBlockIndex::invalidLuminosityBlockIndex());
337  streamContext.setTimestamp(Timestamp());
338  ParentContext parentContext(&streamContext);
339  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
341  ModuleEndStreamSignalSentry endSentry(actReg_.get(), streamContext, moduleCallingContext_);
342  implEndStream(id);
343  });
344  } catch (cms::Exception& ex) {
345  state_ = Exception;
346  std::ostringstream ost;
347  ost << "Calling endStream for module " << description()->moduleName() << "/'" << description()->moduleLabel()
348  << "'";
349  ex.addContext(ost.str());
350  throw;
351  }
352  }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:614
std::string const & moduleName() const
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:609
static RunIndex invalidRunIndex()
Definition: RunIndex.cc:9
std::atomic< State > state_
Definition: Worker.h:605
static LuminosityBlockIndex invalidLuminosityBlockIndex()
ModuleDescription const * description() const
Definition: Worker.h:198
void addContext(std::string const &context)
Definition: Exception.cc:165
virtual void implEndStream(StreamID)=0
auto wrap(F iFunc) -> decltype(iFunc())
std::string const & moduleLabel() const

◆ esItemsToGetFrom()

virtual std::vector<ESProxyIndex> const& edm::Worker::esItemsToGetFrom ( Transition  ) const
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by esPrefetchAsync(), and needsESPrefetching().

◆ esPrefetchAsync()

void edm::Worker::esPrefetchAsync ( WaitingTaskHolder  iTask,
EventSetupImpl const &  iImpl,
Transition  iTrans,
ServiceToken const &  iToken 
)
private

Definition at line 199 of file Worker.cc.

References cms::cuda::assert(), esItemsToGetFrom(), esRecordsToGetFrom(), edm::EventSetupImpl::findImpl(), mps_fire::i, mps_monitormerge::items, moduleCallingContext_, edm::NumberOfEventSetupTransitions, and edm::eventsetup::EventSetupRecordImpl::prefetchAsync().

Referenced by doWorkNoPrefetchingAsync(), edm::workerhelper::CallImpl< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > >::esPrefetchAsync(), edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > >::esPrefetchAsync(), edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > >::esPrefetchAsync(), edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > >::esPrefetchAsync(), edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > >::esPrefetchAsync(), edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin > >::esPrefetchAsync(), edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin > >::esPrefetchAsync(), edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd > >::esPrefetchAsync(), and edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd > >::esPrefetchAsync().

202  {
204  return;
205  }
206  auto const& recs = esRecordsToGetFrom(iTrans);
207  auto const& items = esItemsToGetFrom(iTrans);
208 
209  assert(items.size() == recs.size());
210  if (items.empty()) {
211  return;
212  }
213 
214  for (size_t i = 0; i != items.size(); ++i) {
215  if (recs[i] != ESRecordIndex{}) {
216  auto rec = iImpl.findImpl(recs[i]);
217  if (rec) {
218  rec->prefetchAsync(iTask, items[i], &iImpl, iToken, ESParentContext(&moduleCallingContext_));
219  }
220  }
221  }
222  }
assert(be >=bs)
virtual std::vector< ESProxyIndex > const & esItemsToGetFrom(Transition) const =0
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:609
virtual std::vector< ESRecordIndex > const & esRecordsToGetFrom(Transition) const =0

◆ esRecordsToGetFrom()

virtual std::vector<ESRecordIndex> const& edm::Worker::esRecordsToGetFrom ( Transition  ) const
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by esPrefetchAsync().

◆ globalLuminosityBlocksQueue()

virtual SerialTaskQueue* edm::Worker::globalLuminosityBlocksQueue ( )
pure virtual

◆ globalRunsQueue()

virtual SerialTaskQueue* edm::Worker::globalRunsQueue ( )
pure virtual

◆ handleExternalWorkException()

std::exception_ptr edm::Worker::handleExternalWorkException ( std::exception_ptr  iEPtr,
ParentContext const &  parentContext 
)
private

Definition at line 417 of file Worker.cc.

References edm::exceptionContext(), moduleCallingContext_, ranAcquireWithoutException_, and edm::convertException::wrap().

417  {
419  try {
420  convertException::wrap([iEPtr]() { std::rethrow_exception(iEPtr); });
421  } catch (cms::Exception& ex) {
422  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
424  return std::current_exception();
425  }
426  }
427  return iEPtr;
428  }
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
bool ranAcquireWithoutException_
Definition: Worker.h:620
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:609
auto wrap(F iFunc) -> decltype(iFunc())

◆ hasAccumulator()

virtual bool edm::Worker::hasAccumulator ( ) const
pure virtual

◆ hasAcquire()

virtual bool edm::Worker::hasAcquire ( ) const
privatepure virtual

◆ implBeginJob()

virtual void edm::Worker::implBeginJob ( )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by beginJob().

◆ implBeginStream()

virtual void edm::Worker::implBeginStream ( StreamID  )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by beginStream().

◆ implDo()

virtual bool edm::Worker::implDo ( EventTransitionInfo const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

◆ implDoAccessInputProcessBlock()

virtual bool edm::Worker::implDoAccessInputProcessBlock ( ProcessBlockPrincipal const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

◆ implDoAcquire()

virtual void edm::Worker::implDoAcquire ( EventTransitionInfo const &  ,
ModuleCallingContext const *  ,
WaitingTaskWithArenaHolder  
)
protectedpure virtual

◆ implDoBegin() [1/2]

virtual bool edm::Worker::implDoBegin ( RunTransitionInfo const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

◆ implDoBegin() [2/2]

virtual bool edm::Worker::implDoBegin ( LumiTransitionInfo const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

◆ implDoBeginProcessBlock()

virtual bool edm::Worker::implDoBeginProcessBlock ( ProcessBlockPrincipal const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

◆ implDoEnd() [1/2]

virtual bool edm::Worker::implDoEnd ( RunTransitionInfo const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

◆ implDoEnd() [2/2]

virtual bool edm::Worker::implDoEnd ( LumiTransitionInfo const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

◆ implDoEndProcessBlock()

virtual bool edm::Worker::implDoEndProcessBlock ( ProcessBlockPrincipal const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

◆ implDoPrePrefetchSelection()

virtual bool edm::Worker::implDoPrePrefetchSelection ( StreamID  ,
EventPrincipal const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

◆ implDoStreamBegin() [1/2]

virtual bool edm::Worker::implDoStreamBegin ( StreamID  ,
RunTransitionInfo const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

◆ implDoStreamBegin() [2/2]

virtual bool edm::Worker::implDoStreamBegin ( StreamID  ,
LumiTransitionInfo const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

◆ implDoStreamEnd() [1/2]

virtual bool edm::Worker::implDoStreamEnd ( StreamID  ,
RunTransitionInfo const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

◆ implDoStreamEnd() [2/2]

virtual bool edm::Worker::implDoStreamEnd ( StreamID  ,
LumiTransitionInfo const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

◆ implDoTransformAsync()

virtual void edm::Worker::implDoTransformAsync ( WaitingTaskHolder  ,
size_t  iTransformIndex,
EventPrincipal const &  ,
ParentContext const &  ,
ServiceWeakToken const &   
)
protectedpure virtual

◆ implEndJob()

virtual void edm::Worker::implEndJob ( )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by endJob().

◆ implEndStream()

virtual void edm::Worker::implEndStream ( StreamID  )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by endStream().

◆ implNeedToRunSelection()

virtual bool edm::Worker::implNeedToRunSelection ( ) const
protectedpure virtual

◆ implRegisterThinnedAssociations()

virtual void edm::Worker::implRegisterThinnedAssociations ( ProductRegistry const &  ,
ThinnedAssociationsHelper  
)
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by registerThinnedAssociations().

◆ implRespondToCloseInputFile()

virtual void edm::Worker::implRespondToCloseInputFile ( FileBlock const &  fb)
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by respondToCloseInputFile().

◆ implRespondToCloseOutputFile()

virtual void edm::Worker::implRespondToCloseOutputFile ( )
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by respondToCloseOutputFile().

◆ implRespondToOpenInputFile()

virtual void edm::Worker::implRespondToOpenInputFile ( FileBlock const &  fb)
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by respondToOpenInputFile().

◆ itemsMayGet()

virtual void edm::Worker::itemsMayGet ( BranchType  ,
std::vector< ProductResolverIndexAndSkipBit > &   
) const
privatepure virtual

Implemented in edm::WorkerT< T >.

◆ itemsToGet()

virtual void edm::Worker::itemsToGet ( BranchType  ,
std::vector< ProductResolverIndexAndSkipBit > &   
) const
privatepure virtual

Implemented in edm::WorkerT< T >.

◆ itemsToGetForSelection()

virtual void edm::Worker::itemsToGetForSelection ( std::vector< ProductResolverIndexAndSkipBit > &  ) const
protectedpure virtual

◆ itemsToGetFrom()

virtual std::vector<ProductResolverIndexAndSkipBit> const& edm::Worker::itemsToGetFrom ( BranchType  ) const
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by edPrefetchAsync().

◆ itemToGetForTransform()

virtual ProductResolverIndex edm::Worker::itemToGetForTransform ( size_t  iTransformIndex) const
protectedpure virtual

◆ moduleConcurrencyType()

virtual ConcurrencyTypes edm::Worker::moduleConcurrencyType ( ) const
pure virtual

◆ modulesWhoseProductsAreConsumed()

virtual void edm::Worker::modulesWhoseProductsAreConsumed ( std::array< std::vector< ModuleDescription const *> *, NumBranchTypes > &  modules,
std::vector< ModuleProcessName > &  modulesInPreviousProcesses,
ProductRegistry const &  preg,
std::map< std::string, ModuleDescription const *> const &  labelsToDesc 
) const
pure virtual

Implemented in edm::WorkerT< T >.

◆ moduleType()

virtual Types edm::Worker::moduleType ( ) const
pure virtual

◆ needsESPrefetching()

bool edm::Worker::needsESPrefetching ( Transition  iTrans) const
inlineprivatenoexcept

Definition at line 364 of file Worker.h.

References esItemsToGetFrom(), and edm::NumberOfEventSetupTransitions.

Referenced by doWorkNoPrefetchingAsync().

364  {
365  return iTrans < edm::Transition::NumberOfEventSetupTransitions ? not esItemsToGetFrom(iTrans).empty() : false;
366  }
virtual std::vector< ESProxyIndex > const & esItemsToGetFrom(Transition) const =0

◆ operator=()

Worker& edm::Worker::operator= ( Worker const &  )
delete

◆ postDoEvent()

void edm::Worker::postDoEvent ( EventPrincipal const &  iEvent)

Definition at line 372 of file Worker.cc.

References earlyDeleteHelper_, and iEvent.

372  {
373  if (earlyDeleteHelper_) {
374  earlyDeleteHelper_->moduleRan(iEvent);
375  }
376  }
int iEvent
Definition: GenABIO.cc:224
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:616

◆ preActionBeforeRunEventAsync()

virtual void edm::Worker::preActionBeforeRunEventAsync ( WaitingTaskHolder  iTask,
ModuleCallingContext const &  moduleCallingContext,
Principal const &  iPrincipal 
) const
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by prefetchAsync().

◆ prefetchAsync()

template<typename T >
void edm::Worker::prefetchAsync ( WaitingTaskHolder  iTask,
ServiceToken const &  token,
ParentContext const &  parentContext,
typename T::TransitionInfoType const &  transitionInfo,
Transition  iTransition 
)
private

Definition at line 942 of file Worker.h.

References actReg_, edm::Principal::branchType(), edPrefetchAsync(), edm::ModuleCallingContext::getGlobalContext(), edm::ModuleCallingContext::getStreamContext(), edm::InEvent, edm::ModuleCallingContext::kPrefetching, moduleCallingContext_, preActionBeforeRunEventAsync(), edm::ModuleCallingContext::setContext(), and unpackBuffers-CaloStage2::token.

946  {
947  Principal const& principal = transitionInfo.principal();
948 
950 
951  if constexpr (T::isEvent_) {
952  actReg_->preModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
953  } else if constexpr (std::is_same_v<typename T::Context, StreamContext>) {
954  actReg_->preModuleStreamPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
955  } else if constexpr (std::is_same_v<typename T::Context, GlobalContext>) {
956  actReg_->preModuleGlobalPrefetchingSignal_.emit(*moduleCallingContext_.getGlobalContext(), moduleCallingContext_);
957  }
958 
959  workerhelper::CallImpl<T>::esPrefetchAsync(this, iTask, token, transitionInfo, iTransition);
960  edPrefetchAsync(iTask, token, principal);
961 
962  if (principal.branchType() == InEvent) {
964  }
965  }
virtual void preActionBeforeRunEventAsync(WaitingTaskHolder iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const =0
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:614
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:609
void edPrefetchAsync(WaitingTaskHolder, ServiceToken const &, Principal const &) const
Definition: Worker.cc:224
StreamContext const * getStreamContext() const
GlobalContext const * getGlobalContext() const

◆ prePrefetchSelectionAsync() [1/2]

void edm::Worker::prePrefetchSelectionAsync ( oneapi::tbb::task_group &  group,
WaitingTask task,
ServiceToken const &  token,
StreamID  stream,
EventPrincipal const *  iPrincipal 
)

Definition at line 150 of file Worker.cc.

References CMS_SA_ALLOW, edm::TaskBase::decrement_ref_count(), edm::WaitingTaskList::doneWaiting(), edm::TaskBase::execute(), watchdog::group, implDoPrePrefetchSelection(), edm::TaskBase::increment_ref_count(), B2GTnPMonitor_cfi::item, mps_monitormerge::items, itemsToGetForSelection(), edm::ServiceWeakToken::lock(), edm::make_waiting_task(), moduleCallingContext_, edm::Principal::prefetchAsync(), edm::ProductResolverIndexAmbiguous, alignCSCRings::s, timesRun_, unpackBuffers-CaloStage2::token, and waitingTasks_.

Referenced by doWorkAsync().

154  {
155  successTask->increment_ref_count();
156 
157  ServiceWeakToken weakToken = token;
158  auto choiceTask =
159  edm::make_waiting_task([id, successTask, iPrincipal, this, weakToken, &group](std::exception_ptr const*) {
160  ServiceRegistry::Operate guard(weakToken.lock());
161  // There is no reasonable place to rethrow, and implDoPrePrefetchSelection() should not throw in the first place.
162  CMS_SA_ALLOW try {
163  if (not implDoPrePrefetchSelection(id, *iPrincipal, &moduleCallingContext_)) {
164  timesRun_.fetch_add(1, std::memory_order_relaxed);
165  setPassed<true>();
166  waitingTasks_.doneWaiting(nullptr);
167  //TBB requires that destroyed tasks have count 0
168  if (0 == successTask->decrement_ref_count()) {
169  TaskSentry s(successTask);
170  }
171  return;
172  }
173  } catch (...) {
174  }
175  if (0 == successTask->decrement_ref_count()) {
176  group.run([successTask]() {
177  TaskSentry s(successTask);
178  successTask->execute();
179  });
180  }
181  });
182 
183  WaitingTaskHolder choiceHolder{group, choiceTask};
184 
185  std::vector<ProductResolverIndexAndSkipBit> items;
187 
188  for (auto const& item : items) {
189  ProductResolverIndex productResolverIndex = item.productResolverIndex();
190  bool skipCurrentProcess = item.skipCurrentProcess();
191  if (productResolverIndex != ProductResolverIndexAmbiguous) {
192  iPrincipal->prefetchAsync(
193  choiceHolder, productResolverIndex, skipCurrentProcess, token, &moduleCallingContext_);
194  }
195  }
196  choiceHolder.doneWaiting(std::exception_ptr{});
197  }
#define CMS_SA_ALLOW
unsigned int ProductResolverIndex
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:609
virtual bool implDoPrePrefetchSelection(StreamID, EventPrincipal const &, ModuleCallingContext const *)=0
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:618
virtual void itemsToGetForSelection(std::vector< ProductResolverIndexAndSkipBit > &) const =0
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
std::atomic< int > timesRun_
Definition: Worker.h:600

◆ prePrefetchSelectionAsync() [2/2]

void edm::Worker::prePrefetchSelectionAsync ( oneapi::tbb::task_group &  ,
WaitingTask task,
ServiceToken const &  ,
StreamID  stream,
void const *   
)
inline

Definition at line 141 of file Worker.h.

References cms::cuda::assert().

142  {
143  assert(false);
144  }
assert(be >=bs)

◆ registerThinnedAssociations()

void edm::Worker::registerThinnedAssociations ( ProductRegistry const &  registry,
ThinnedAssociationsHelper helper 
)

Definition at line 354 of file Worker.cc.

References cms::Exception::addContext(), description(), implRegisterThinnedAssociations(), and HerwigMaxPtPartonFilter_cfi::moduleLabel.

354  {
355  try {
357  } catch (cms::Exception& ex) {
358  ex.addContext("Calling registerThinnedAssociations() for module " + description()->moduleLabel());
359  throw ex;
360  }
361  }
Definition: helper.py:1
virtual void implRegisterThinnedAssociations(ProductRegistry const &, ThinnedAssociationsHelper &)=0
ModuleDescription const * description() const
Definition: Worker.h:198
void addContext(std::string const &context)
Definition: Exception.cc:165

◆ reset()

void edm::Worker::reset ( void  )
inline

Definition at line 188 of file Worker.h.

References cached_exception_, numberOfPathsLeftToRun_, numberOfPathsOn_, Ready, edm::WaitingTaskList::reset(), state_, waitingTasks_, and workStarted_.

Referenced by edm::WorkerManager::resetAll().

188  {
189  cached_exception_ = std::exception_ptr();
190  state_ = Ready;
192  workStarted_ = false;
194  }
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:607
std::atomic< bool > workStarted_
Definition: Worker.h:619
void reset()
Resets access to the resource so that added tasks will wait.
std::exception_ptr cached_exception_
Definition: Worker.h:612
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:618
std::atomic< State > state_
Definition: Worker.h:605
int numberOfPathsOn_
Definition: Worker.h:606

◆ resetModuleDescription()

void edm::Worker::resetModuleDescription ( ModuleDescription const *  iDesc)
protected

◆ resolvePutIndicies()

virtual void edm::Worker::resolvePutIndicies ( BranchType  iBranchType,
std::unordered_multimap< std::string, std::tuple< TypeID const *, const char *, edm::ProductResolverIndex >> const &  iIndicies 
)
pure virtual

Implemented in edm::WorkerT< T >.

◆ respondToCloseInputFile()

void edm::Worker::respondToCloseInputFile ( FileBlock const &  fb)
inline

Definition at line 184 of file Worker.h.

References implRespondToCloseInputFile().

Referenced by edm::Schedule::respondToCloseInputFile().

virtual void implRespondToCloseInputFile(FileBlock const &fb)=0

◆ respondToCloseOutputFile()

void edm::Worker::respondToCloseOutputFile ( )
inline

Definition at line 185 of file Worker.h.

References implRespondToCloseOutputFile().

virtual void implRespondToCloseOutputFile()=0

◆ respondToOpenInputFile()

void edm::Worker::respondToOpenInputFile ( FileBlock const &  fb)
inline

Definition at line 183 of file Worker.h.

References implRespondToOpenInputFile().

Referenced by edm::Schedule::respondToOpenInputFile().

virtual void implRespondToOpenInputFile(FileBlock const &fb)=0

◆ runAcquire()

void edm::Worker::runAcquire ( EventTransitionInfo const &  info,
ParentContext const &  parentContext,
WaitingTaskWithArenaHolder holder 
)
private

Definition at line 378 of file Worker.cc.

References edm::exceptionContext(), implDoAcquire(), moduleCallingContext_, shouldRethrowException(), timesRun_, and edm::convertException::wrap().

Referenced by runAcquireAfterAsyncPrefetch().

380  {
381  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
382  try {
383  convertException::wrap([&]() { this->implDoAcquire(info, &moduleCallingContext_, holder); });
384  } catch (cms::Exception& ex) {
386  if (shouldRethrowException(std::current_exception(), parentContext, true)) {
387  timesRun_.fetch_add(1, std::memory_order_relaxed);
388  throw;
389  }
390  }
391  }
virtual void implDoAcquire(EventTransitionInfo const &, ModuleCallingContext const *, WaitingTaskWithArenaHolder &)=0
static const TGPicture * info(bool iBackgroundIsBlack)
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:609
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent) const
Definition: Worker.cc:110
auto wrap(F iFunc) -> decltype(iFunc())
std::atomic< int > timesRun_
Definition: Worker.h:600

◆ runAcquireAfterAsyncPrefetch()

void edm::Worker::runAcquireAfterAsyncPrefetch ( std::exception_ptr  iEPtr,
EventTransitionInfo const &  eventTransitionInfo,
ParentContext const &  parentContext,
WaitingTaskWithArenaHolder  holder 
)
private

Definition at line 393 of file Worker.cc.

References CMS_SA_ALLOW, edm::WaitingTaskWithArenaHolder::doneWaiting(), edm::ModuleCallingContext::kInvalid, moduleCallingContext_, ranAcquireWithoutException_, runAcquire(), edm::ModuleCallingContext::setContext(), and shouldRethrowException().

396  {
398  std::exception_ptr exceptionPtr;
399  if (iEPtr) {
400  if (shouldRethrowException(iEPtr, parentContext, true)) {
401  exceptionPtr = iEPtr;
402  }
404  } else {
405  // Caught exception is propagated via WaitingTaskWithArenaHolder
406  CMS_SA_ALLOW try {
407  runAcquire(eventTransitionInfo, parentContext, holder);
409  } catch (...) {
410  exceptionPtr = std::current_exception();
411  }
412  }
413  // It is important this is after runAcquire completely finishes
414  holder.doneWaiting(exceptionPtr);
415  }
#define CMS_SA_ALLOW
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
bool ranAcquireWithoutException_
Definition: Worker.h:620
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:609
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent) const
Definition: Worker.cc:110
void runAcquire(EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder &)
Definition: Worker.cc:378

◆ runModule()

template<typename T >
bool edm::Worker::runModule ( typename T::TransitionInfoType const &  transitionInfo,
StreamID  streamID,
ParentContext const &  parentContext,
typename T::Context const *  context 
)
private

Definition at line 1142 of file Worker.h.

References actReg_, cms::cuda::assert(), cached_exception_, visDQMUpload::context, edm::exceptionContext(), moduleCallingContext_, shouldRethrowException(), timesRun_, and edm::convertException::wrap().

1145  {
1146  //unscheduled producers should advance this
1147  //if (T::isEvent_) {
1148  // ++timesVisited_;
1149  //}
1150  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
1151  if constexpr (T::isEvent_) {
1152  timesRun_.fetch_add(1, std::memory_order_relaxed);
1153  }
1154 
1155  bool rc = true;
1156  try {
1157  convertException::wrap([&]() {
1158  rc = workerhelper::CallImpl<T>::call(
1159  this, streamID, transitionInfo, actReg_.get(), &moduleCallingContext_, context);
1160 
1161  if (rc) {
1162  setPassed<T::isEvent_>();
1163  } else {
1164  setFailed<T::isEvent_>();
1165  }
1166  });
1167  } catch (cms::Exception& ex) {
1169  if (shouldRethrowException(std::current_exception(), parentContext, T::isEvent_)) {
1171  setException<T::isEvent_>(std::current_exception());
1172  std::rethrow_exception(cached_exception_);
1173  } else {
1174  rc = setPassed<T::isEvent_>();
1175  }
1176  }
1177 
1178  return rc;
1179  }
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:614
assert(be >=bs)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:609
std::exception_ptr cached_exception_
Definition: Worker.h:612
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent) const
Definition: Worker.cc:110
auto wrap(F iFunc) -> decltype(iFunc())
std::atomic< int > timesRun_
Definition: Worker.h:600

◆ runModuleAfterAsyncPrefetch()

template<typename T >
std::exception_ptr edm::Worker::runModuleAfterAsyncPrefetch ( std::exception_ptr  iEPtr,
typename T::TransitionInfoType const &  transitionInfo,
StreamID  streamID,
ParentContext const &  parentContext,
typename T::Context const *  context 
)
private

Definition at line 1058 of file Worker.h.

References CMS_SA_ALLOW, visDQMUpload::context, edm::WaitingTaskList::doneWaiting(), edm::ModuleCallingContext::kInvalid, moduleCallingContext_, edm::ModuleCallingContext::setContext(), shouldRethrowException(), and waitingTasks_.

Referenced by edm::Worker::RunModuleTask< T >::execute().

1062  {
1063  std::exception_ptr exceptionPtr;
1064  if (iEPtr) {
1065  if (shouldRethrowException(iEPtr, parentContext, T::isEvent_)) {
1066  exceptionPtr = iEPtr;
1067  setException<T::isEvent_>(exceptionPtr);
1068  } else {
1069  setPassed<T::isEvent_>();
1070  }
1072  } else {
1073  // Caught exception is propagated via WaitingTaskList
1074  CMS_SA_ALLOW try { runModule<T>(transitionInfo, streamID, parentContext, context); } catch (...) {
1075  exceptionPtr = std::current_exception();
1076  }
1077  }
1078  waitingTasks_.doneWaiting(exceptionPtr);
1079  return exceptionPtr;
1080  }
#define CMS_SA_ALLOW
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:609
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:618
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent) const
Definition: Worker.cc:110

◆ runModuleDirectly()

template<typename T >
std::exception_ptr edm::Worker::runModuleDirectly ( typename T::TransitionInfoType const &  transitionInfo,
StreamID  streamID,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

Definition at line 1182 of file Worker.h.

References visDQMUpload::context, and timesVisited_.

Referenced by edm::Path::finished().

1185  {
1186  timesVisited_.fetch_add(1, std::memory_order_relaxed);
1187  std::exception_ptr prefetchingException; // null because there was no prefetching to do
1188  return runModuleAfterAsyncPrefetch<T>(prefetchingException, transitionInfo, streamID, parentContext, context);
1189  }
std::atomic< int > timesVisited_
Definition: Worker.h:601

◆ selectInputProcessBlocks()

virtual void edm::Worker::selectInputProcessBlocks ( ProductRegistry const &  ,
ProcessBlockHelperBase const &   
)
pure virtual

Implemented in edm::WorkerT< T >.

◆ serializeRunModule()

virtual TaskQueueAdaptor edm::Worker::serializeRunModule ( )
privatepure virtual

◆ setActivityRegistry()

void edm::Worker::setActivityRegistry ( std::shared_ptr< ActivityRegistry areg)

The signals are required to live longer than the last call to 'doWork' this was done to improve performance based on profiling

Definition at line 108 of file Worker.cc.

References actReg_.

108 { actReg_ = areg; }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:614

◆ setEarlyDeleteHelper()

void edm::Worker::setEarlyDeleteHelper ( EarlyDeleteHelper iHelper)

Definition at line 237 of file Worker.cc.

References earlyDeleteHelper_.

237 { earlyDeleteHelper_ = iHelper; }
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:616

◆ setException()

template<bool IS_EVENT>
std::exception_ptr edm::Worker::setException ( std::exception_ptr  iException)
inlineprivate

Definition at line 345 of file Worker.h.

References cached_exception_, Exception, state_, and timesExcept_.

345  {
346  if (IS_EVENT) {
347  timesExcept_.fetch_add(1, std::memory_order_relaxed);
348  }
349  cached_exception_ = iException; // propagate_const<T> has no reset() function
350  state_ = Exception;
351  return cached_exception_;
352  }
std::atomic< int > timesExcept_
Definition: Worker.h:604
std::exception_ptr cached_exception_
Definition: Worker.h:612
std::atomic< State > state_
Definition: Worker.h:605

◆ setFailed()

template<bool IS_EVENT>
bool edm::Worker::setFailed ( )
inlineprivate

Definition at line 336 of file Worker.h.

References Fail, state_, and timesFailed_.

336  {
337  if (IS_EVENT) {
338  timesFailed_.fetch_add(1, std::memory_order_relaxed);
339  }
340  state_ = Fail;
341  return false;
342  }
std::atomic< int > timesFailed_
Definition: Worker.h:603
std::atomic< State > state_
Definition: Worker.h:605

◆ setPassed()

template<bool IS_EVENT>
bool edm::Worker::setPassed ( )
inlineprivate

Definition at line 327 of file Worker.h.

References Pass, state_, and timesPassed_.

327  {
328  if (IS_EVENT) {
329  timesPassed_.fetch_add(1, std::memory_order_relaxed);
330  }
331  state_ = Pass;
332  return true;
333  }
std::atomic< State > state_
Definition: Worker.h:605
std::atomic< int > timesPassed_
Definition: Worker.h:602

◆ shouldRethrowException()

bool edm::Worker::shouldRethrowException ( std::exception_ptr  iPtr,
ParentContext const &  parentContext,
bool  isEvent 
) const
private

Definition at line 110 of file Worker.cc.

References writedatasetfile::action, actions_, cms::Exception::category(), description(), edm::exception_actions::FailPath, edm::ExceptionToActionTable::find(), edm::ModuleCallingContext::getTopModuleCallingContext(), edm::exception_actions::IgnoreCompletely, edm::PathContext::isEndPath(), edm::ModuleCallingContext::kInvalid, edm::ParentContext::kPlaceInPath, edm::PlaceInPathContext::pathContext(), edm::ModuleCallingContext::placeInPathContext(), edm::printCmsExceptionWarning(), edm::exception_actions::Rethrow, edm::exception_actions::SkipEvent, edm::ModuleCallingContext::type(), and edm::convertException::wrap().

Referenced by runAcquire(), runAcquireAfterAsyncPrefetch(), runModule(), and runModuleAfterAsyncPrefetch().

110  {
111  // NOTE: the warning printed as a result of ignoring or failing
112  // a module will only be printed during the full true processing
113  // pass of this module
114 
115  // Get the action corresponding to this exception. However, if processing
116  // something other than an event (e.g. run, lumi) always rethrow.
117  if (not isEvent) {
118  return true;
119  }
120  try {
121  convertException::wrap([&]() { std::rethrow_exception(iPtr); });
122  } catch (cms::Exception& ex) {
124 
126  return true;
127  }
128 
129  ModuleCallingContext tempContext(description(), ModuleCallingContext::State::kInvalid, parentContext, nullptr);
130 
131  // If we are processing an endpath and the module was scheduled, treat SkipEvent or FailPath
132  // as IgnoreCompletely, so any subsequent OutputModules are still run.
133  // For unscheduled modules only treat FailPath as IgnoreCompletely but still allow SkipEvent to throw
134  ModuleCallingContext const* top_mcc = tempContext.getTopModuleCallingContext();
135  if (top_mcc->type() == ParentContext::Type::kPlaceInPath &&
136  top_mcc->placeInPathContext()->pathContext()->isEndPath()) {
137  if ((action == exception_actions::SkipEvent && tempContext.type() == ParentContext::Type::kPlaceInPath) ||
140  }
141  }
143  edm::printCmsExceptionWarning("IgnoreCompletely", ex);
144  return false;
145  }
146  }
147  return true;
148  }
std::string const & category() const
Definition: Exception.cc:143
ExceptionToActionTable const * actions_
Definition: Worker.h:611
ModuleDescription const * description() const
Definition: Worker.h:198
exception_actions::ActionCodes find(const std::string &category) const
auto wrap(F iFunc) -> decltype(iFunc())
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)

◆ skipOnPath()

void edm::Worker::skipOnPath ( EventPrincipal const &  iEvent)

Definition at line 363 of file Worker.cc.

References cached_exception_, edm::WaitingTaskList::doneWaiting(), earlyDeleteHelper_, iEvent, numberOfPathsLeftToRun_, and waitingTasks_.

Referenced by edm::WorkerInPath::skipWorker().

363  {
364  if (earlyDeleteHelper_) {
365  earlyDeleteHelper_->pathFinished(iEvent);
366  }
367  if (0 == --numberOfPathsLeftToRun_) {
369  }
370  }
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:607
std::exception_ptr cached_exception_
Definition: Worker.h:612
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
int iEvent
Definition: GenABIO.cc:224
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:618
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:616

◆ state()

State edm::Worker::state ( ) const
inline

Definition at line 247 of file Worker.h.

References state_.

Referenced by edm::WorkerInPath::checkResultsOfRunWorker().

247 { return state_; }
std::atomic< State > state_
Definition: Worker.h:605

◆ timesExcept()

int edm::Worker::timesExcept ( ) const
inline

Definition at line 246 of file Worker.h.

References timesExcept_.

246 { return timesExcept_.load(std::memory_order_acquire); }
std::atomic< int > timesExcept_
Definition: Worker.h:604

◆ timesFailed()

int edm::Worker::timesFailed ( ) const
inline

Definition at line 245 of file Worker.h.

References timesFailed_.

245 { return timesFailed_.load(std::memory_order_acquire); }
std::atomic< int > timesFailed_
Definition: Worker.h:603

◆ timesPass()

int edm::Worker::timesPass ( ) const
inline

Definition at line 249 of file Worker.h.

References timesPassed().

249 { return timesPassed(); } // for backward compatibility only - to be removed soon
int timesPassed() const
Definition: Worker.h:244

◆ timesPassed()

int edm::Worker::timesPassed ( ) const
inline

Definition at line 244 of file Worker.h.

References timesPassed_.

Referenced by timesPass().

244 { return timesPassed_.load(std::memory_order_acquire); }
std::atomic< int > timesPassed_
Definition: Worker.h:602

◆ timesRun()

int edm::Worker::timesRun ( ) const
inline

Definition at line 242 of file Worker.h.

References timesRun_.

242 { return timesRun_.load(std::memory_order_acquire); }
std::atomic< int > timesRun_
Definition: Worker.h:600

◆ timesVisited()

int edm::Worker::timesVisited ( ) const
inline

Definition at line 243 of file Worker.h.

References timesVisited_.

243 { return timesVisited_.load(std::memory_order_acquire); }
std::atomic< int > timesVisited_
Definition: Worker.h:601

◆ transformIndex()

size_t edm::Worker::transformIndex ( edm::BranchDescription const &  ) const
pure virtual

◆ updateLookup() [1/2]

virtual void edm::Worker::updateLookup ( BranchType  iBranchType,
ProductResolverIndexHelper const &   
)
pure virtual

Implemented in edm::WorkerT< T >.

◆ updateLookup() [2/2]

virtual void edm::Worker::updateLookup ( eventsetup::ESRecordsToProxyIndices const &  )
pure virtual

Implemented in edm::WorkerT< T >.

◆ waitingTaskList()

edm::WaitingTaskList& edm::Worker::waitingTaskList ( )
inline

Definition at line 254 of file Worker.h.

References waitingTasks_.

254 { return waitingTasks_; }
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:618

◆ wantsGlobalLuminosityBlocks()

virtual bool edm::Worker::wantsGlobalLuminosityBlocks ( ) const
pure virtual

◆ wantsGlobalRuns()

virtual bool edm::Worker::wantsGlobalRuns ( ) const
pure virtual

◆ wantsInputProcessBlocks()

virtual bool edm::Worker::wantsInputProcessBlocks ( ) const
pure virtual

◆ wantsProcessBlocks()

virtual bool edm::Worker::wantsProcessBlocks ( ) const
pure virtual

◆ wantsStreamLuminosityBlocks()

virtual bool edm::Worker::wantsStreamLuminosityBlocks ( ) const
pure virtual

◆ wantsStreamRuns()

virtual bool edm::Worker::wantsStreamRuns ( ) const
pure virtual

◆ workerType()

virtual std::string edm::Worker::workerType ( ) const
protectedpure virtual

Implemented in edm::WorkerT< T >.

Friends And Related Function Documentation

◆ workerhelper::CallImpl

template<typename O >
friend class workerhelper::CallImpl
friend

Definition at line 258 of file Worker.h.

Member Data Documentation

◆ actions_

ExceptionToActionTable const* edm::Worker::actions_
private

Definition at line 611 of file Worker.h.

Referenced by shouldRethrowException().

◆ actReg_

std::shared_ptr<ActivityRegistry> edm::Worker::actReg_
private

◆ cached_exception_

std::exception_ptr edm::Worker::cached_exception_
private

Definition at line 612 of file Worker.h.

Referenced by reset(), runModule(), setException(), and skipOnPath().

◆ earlyDeleteHelper_

edm::propagate_const<EarlyDeleteHelper*> edm::Worker::earlyDeleteHelper_
private

Definition at line 616 of file Worker.h.

Referenced by postDoEvent(), setEarlyDeleteHelper(), and skipOnPath().

◆ moduleCallingContext_

ModuleCallingContext edm::Worker::moduleCallingContext_
private

◆ moduleValid_

bool edm::Worker::moduleValid_ = true
private

Definition at line 621 of file Worker.h.

Referenced by clearModule(), and description().

◆ numberOfPathsLeftToRun_

std::atomic<int> edm::Worker::numberOfPathsLeftToRun_
private

Definition at line 607 of file Worker.h.

Referenced by reset(), and skipOnPath().

◆ numberOfPathsOn_

int edm::Worker::numberOfPathsOn_
private

Definition at line 606 of file Worker.h.

Referenced by addedToPath(), and reset().

◆ ranAcquireWithoutException_

bool edm::Worker::ranAcquireWithoutException_
private

Definition at line 620 of file Worker.h.

Referenced by handleExternalWorkException(), and runAcquireAfterAsyncPrefetch().

◆ state_

std::atomic<State> edm::Worker::state_
private

◆ timesExcept_

std::atomic<int> edm::Worker::timesExcept_
private

Definition at line 604 of file Worker.h.

Referenced by clearCounters(), setException(), and timesExcept().

◆ timesFailed_

std::atomic<int> edm::Worker::timesFailed_
private

Definition at line 603 of file Worker.h.

Referenced by clearCounters(), setFailed(), and timesFailed().

◆ timesPassed_

std::atomic<int> edm::Worker::timesPassed_
private

Definition at line 602 of file Worker.h.

Referenced by clearCounters(), setPassed(), and timesPassed().

◆ timesRun_

std::atomic<int> edm::Worker::timesRun_
private

Definition at line 600 of file Worker.h.

Referenced by clearCounters(), prePrefetchSelectionAsync(), runAcquire(), runModule(), and timesRun().

◆ timesVisited_

std::atomic<int> edm::Worker::timesVisited_
private

Definition at line 601 of file Worker.h.

Referenced by clearCounters(), doWorkAsync(), runModuleDirectly(), and timesVisited().

◆ waitingTasks_

edm::WaitingTaskList edm::Worker::waitingTasks_
private

◆ workStarted_

std::atomic<bool> edm::Worker::workStarted_
private

Definition at line 619 of file Worker.h.

Referenced by doWorkAsync(), doWorkNoPrefetchingAsync(), and reset().