CMS 3D CMS Logo

List of all members | Classes | Public Types | Public Member Functions | Protected Member Functions | Private Member Functions | Private Attributes | Friends
edm::Worker Class Referenceabstract

#include <Worker.h>

Inheritance diagram for edm::Worker:
edm::WorkerT< T >

Classes

class  AcquireTask
 
class  AcquireTask< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin >, DUMMY >
 
class  HandleExternalWorkExceptionTask
 
class  RunModuleTask
 
struct  TaskQueueAdaptor
 

Public Types

enum  ConcurrencyTypes {
  kGlobal, kLimited, kOne, kStream,
  kLegacy
}
 
enum  State { Ready, Pass, Fail, Exception }
 
enum  Types { kAnalyzer, kFilter, kProducer, kOutputModule }
 

Public Member Functions

void addedToPath ()
 
void beginJob ()
 
void beginStream (StreamID id, StreamContext &streamContext)
 
void callWhenDoneAsync (WaitingTaskHolder task)
 
void clearCounters ()
 
void clearModule ()
 
virtual std::vector< ConsumesInfoconsumesInfo () const =0
 
virtual void convertCurrentProcessAlias (std::string const &processName)=0
 
ModuleDescription const * description () const
 
template<typename T >
void doWorkAsync (WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
 
template<typename T >
void doWorkNoPrefetchingAsync (WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
 
void endJob ()
 
void endStream (StreamID id, StreamContext &streamContext)
 
virtual SerialTaskQueueglobalLuminosityBlocksQueue ()=0
 
virtual SerialTaskQueueglobalRunsQueue ()=0
 
virtual bool hasAccumulator () const =0
 
virtual ConcurrencyTypes moduleConcurrencyType () const =0
 
virtual void modulesWhoseProductsAreConsumed (std::array< std::vector< ModuleDescription const *> *, NumBranchTypes > &modules, std::vector< ModuleProcessName > &modulesInPreviousProcesses, ProductRegistry const &preg, std::map< std::string, ModuleDescription const *> const &labelsToDesc) const =0
 
virtual Types moduleType () const =0
 
Workeroperator= (Worker const &)=delete
 
void postDoEvent (EventPrincipal const &)
 
void prePrefetchSelectionAsync (oneapi::tbb::task_group &, WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *)
 
void prePrefetchSelectionAsync (oneapi::tbb::task_group &, WaitingTask *task, ServiceToken const &, StreamID stream, void const *)
 
void registerThinnedAssociations (ProductRegistry const &registry, ThinnedAssociationsHelper &helper)
 
void reset ()
 
virtual void resolvePutIndicies (BranchType iBranchType, std::unordered_multimap< std::string, std::tuple< TypeID const *, const char *, edm::ProductResolverIndex >> const &iIndicies)=0
 
void respondToCloseInputFile (FileBlock const &fb)
 
void respondToCloseOutputFile ()
 
void respondToOpenInputFile (FileBlock const &fb)
 
template<typename T >
std::exception_ptr runModuleDirectly (typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
 
virtual void selectInputProcessBlocks (ProductRegistry const &, ProcessBlockHelperBase const &)=0
 
void setActivityRegistry (std::shared_ptr< ActivityRegistry > areg)
 
void setEarlyDeleteHelper (EarlyDeleteHelper *iHelper)
 
void skipOnPath (EventPrincipal const &iEvent)
 
State state () const
 
int timesExcept () const
 
int timesFailed () const
 
int timesPass () const
 
int timesPassed () const
 
int timesRun () const
 
int timesVisited () const
 
virtual void updateLookup (BranchType iBranchType, ProductResolverIndexHelper const &)=0
 
virtual void updateLookup (eventsetup::ESRecordsToProxyIndices const &)=0
 
edm::WaitingTaskListwaitingTaskList ()
 
virtual bool wantsGlobalLuminosityBlocks () const =0
 
virtual bool wantsGlobalRuns () const =0
 
virtual bool wantsInputProcessBlocks () const =0
 
virtual bool wantsProcessBlocks () const =0
 
virtual bool wantsStreamLuminosityBlocks () const =0
 
virtual bool wantsStreamRuns () const =0
 
 Worker (ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
 
 Worker (Worker const &)=delete
 
virtual ~Worker ()
 

Protected Member Functions

ActivityRegistryactivityRegistry ()
 
virtual void doClearModule ()=0
 
virtual void implBeginJob ()=0
 
virtual void implBeginStream (StreamID)=0
 
virtual bool implDo (EventTransitionInfo const &, ModuleCallingContext const *)=0
 
virtual bool implDoAccessInputProcessBlock (ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
 
virtual void implDoAcquire (EventTransitionInfo const &, ModuleCallingContext const *, WaitingTaskWithArenaHolder &)=0
 
virtual bool implDoBegin (RunTransitionInfo const &, ModuleCallingContext const *)=0
 
virtual bool implDoBegin (LumiTransitionInfo const &, ModuleCallingContext const *)=0
 
virtual bool implDoBeginProcessBlock (ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
 
virtual bool implDoEnd (RunTransitionInfo const &, ModuleCallingContext const *)=0
 
virtual bool implDoEnd (LumiTransitionInfo const &, ModuleCallingContext const *)=0
 
virtual bool implDoEndProcessBlock (ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
 
virtual bool implDoPrePrefetchSelection (StreamID, EventPrincipal const &, ModuleCallingContext const *)=0
 
virtual bool implDoStreamBegin (StreamID, RunTransitionInfo const &, ModuleCallingContext const *)=0
 
virtual bool implDoStreamBegin (StreamID, LumiTransitionInfo const &, ModuleCallingContext const *)=0
 
virtual bool implDoStreamEnd (StreamID, RunTransitionInfo const &, ModuleCallingContext const *)=0
 
virtual bool implDoStreamEnd (StreamID, LumiTransitionInfo const &, ModuleCallingContext const *)=0
 
virtual void implEndJob ()=0
 
virtual void implEndStream (StreamID)=0
 
virtual bool implNeedToRunSelection () const =0
 
virtual void itemsToGetForSelection (std::vector< ProductResolverIndexAndSkipBit > &) const =0
 
void resetModuleDescription (ModuleDescription const *)
 
virtual std::string workerType () const =0
 

Private Member Functions

void edPrefetchAsync (WaitingTaskHolder, ServiceToken const &, Principal const &) const
 
void emitPostModuleEventPrefetchingSignal ()
 
void emitPostModuleGlobalPrefetchingSignal ()
 
void emitPostModuleStreamPrefetchingSignal ()
 
virtual std::vector< ESProxyIndex > const & esItemsToGetFrom (Transition) const =0
 
void esPrefetchAsync (WaitingTaskHolder, EventSetupImpl const &, Transition, ServiceToken const &)
 
virtual std::vector< ESRecordIndex > const & esRecordsToGetFrom (Transition) const =0
 
std::exception_ptr handleExternalWorkException (std::exception_ptr iEPtr, ParentContext const &parentContext)
 
virtual bool hasAcquire () const =0
 
virtual void implRegisterThinnedAssociations (ProductRegistry const &, ThinnedAssociationsHelper &)=0
 
virtual void implRespondToCloseInputFile (FileBlock const &fb)=0
 
virtual void implRespondToCloseOutputFile ()=0
 
virtual void implRespondToOpenInputFile (FileBlock const &fb)=0
 
virtual void itemsMayGet (BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
 
virtual std::vector< ProductResolverIndex > const & itemsShouldPutInEvent () const =0
 
virtual void itemsToGet (BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
 
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom (BranchType) const =0
 
bool needsESPrefetching (Transition iTrans) const noexcept
 
virtual void preActionBeforeRunEventAsync (WaitingTaskHolder iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const =0
 
template<typename T >
void prefetchAsync (WaitingTaskHolder, ServiceToken const &, ParentContext const &, typename T::TransitionInfoType const &, Transition)
 
void runAcquire (EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder &)
 
void runAcquireAfterAsyncPrefetch (std::exception_ptr, EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder)
 
template<typename T >
bool runModule (typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
 
template<typename T >
std::exception_ptr runModuleAfterAsyncPrefetch (std::exception_ptr, typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
 
virtual TaskQueueAdaptor serializeRunModule ()=0
 
template<bool IS_EVENT>
std::exception_ptr setException (std::exception_ptr iException)
 
template<bool IS_EVENT>
bool setFailed ()
 
template<bool IS_EVENT>
bool setPassed ()
 
bool shouldRethrowException (std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent) const
 

Private Attributes

ExceptionToActionTable const * actions_
 
std::shared_ptr< ActivityRegistryactReg_
 
std::exception_ptr cached_exception_
 
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
 
ModuleCallingContext moduleCallingContext_
 
bool moduleValid_ = true
 
std::atomic< int > numberOfPathsLeftToRun_
 
int numberOfPathsOn_
 
bool ranAcquireWithoutException_
 
std::atomic< Statestate_
 
std::atomic< int > timesExcept_
 
std::atomic< int > timesFailed_
 
std::atomic< int > timesPassed_
 
std::atomic< int > timesRun_
 
std::atomic< int > timesVisited_
 
edm::WaitingTaskList waitingTasks_
 
std::atomic< bool > workStarted_
 

Friends

template<typename O >
class workerhelper::CallImpl
 

Detailed Description

Definition at line 92 of file Worker.h.

Member Enumeration Documentation

◆ ConcurrencyTypes

Enumerator
kGlobal 
kLimited 
kOne 
kStream 
kLegacy 

Definition at line 96 of file Worker.h.

◆ State

Enumerator
Ready 
Pass 
Fail 
Exception 

Definition at line 94 of file Worker.h.

◆ Types

Enumerator
kAnalyzer 
kFilter 
kProducer 
kOutputModule 

Definition at line 95 of file Worker.h.

Constructor & Destructor Documentation

◆ Worker() [1/2]

edm::Worker::Worker ( ModuleDescription const &  iMD,
ExceptionToActionTable const *  iActions 
)

Definition at line 89 of file Worker.cc.

90  : timesRun_(0),
91  timesVisited_(0),
92  timesPassed_(0),
93  timesFailed_(0),
94  timesExcept_(0),
95  state_(Ready),
99  actions_(iActions),
101  actReg_(),
102  earlyDeleteHelper_(nullptr),
103  workStarted_(false),
std::atomic< int > timesVisited_
Definition: Worker.h:586
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:592
std::atomic< int > timesExcept_
Definition: Worker.h:589
std::atomic< bool > workStarted_
Definition: Worker.h:604
std::atomic< int > timesFailed_
Definition: Worker.h:588
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:599
ExceptionToActionTable const * actions_
Definition: Worker.h:596
bool ranAcquireWithoutException_
Definition: Worker.h:605
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:594
std::exception_ptr cached_exception_
Definition: Worker.h:597
std::atomic< State > state_
Definition: Worker.h:590
int numberOfPathsOn_
Definition: Worker.h:591
std::atomic< int > timesPassed_
Definition: Worker.h:587
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:601
std::atomic< int > timesRun_
Definition: Worker.h:585

◆ ~Worker()

edm::Worker::~Worker ( )
virtual

Definition at line 106 of file Worker.cc.

106 {}

◆ Worker() [2/2]

edm::Worker::Worker ( Worker const &  )
delete

Member Function Documentation

◆ activityRegistry()

ActivityRegistry* edm::Worker::activityRegistry ( )
inlineprotected

Definition at line 282 of file Worker.h.

References actReg_.

282 { return actReg_.get(); }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:599

◆ addedToPath()

void edm::Worker::addedToPath ( )
inline

Definition at line 231 of file Worker.h.

References numberOfPathsOn_.

231 { ++numberOfPathsOn_; }
int numberOfPathsOn_
Definition: Worker.h:591

◆ beginJob()

void edm::Worker::beginJob ( void  )

Definition at line 247 of file Worker.cc.

References actReg_, cms::Exception::addContext(), description(), Exception, implBeginJob(), edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), state_, and edm::convertException::wrap().

Referenced by edm::WorkerManager::beginJob().

247  {
248  try {
249  convertException::wrap([&]() {
250  ModuleBeginJobSignalSentry cpp(actReg_.get(), *description());
251  implBeginJob();
252  });
253  } catch (cms::Exception& ex) {
254  state_ = Exception;
255  std::ostringstream ost;
256  ost << "Calling beginJob for module " << description()->moduleName() << "/'" << description()->moduleLabel()
257  << "'";
258  ex.addContext(ost.str());
259  throw;
260  }
261  }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:599
std::string const & moduleName() const
std::atomic< State > state_
Definition: Worker.h:590
ModuleDescription const * description() const
Definition: Worker.h:189
virtual void implBeginJob()=0
void addContext(std::string const &context)
Definition: Exception.cc:165
auto wrap(F iFunc) -> decltype(iFunc())
std::string const & moduleLabel() const

◆ beginStream()

void edm::Worker::beginStream ( StreamID  id,
StreamContext streamContext 
)

Definition at line 280 of file Worker.cc.

References actReg_, cms::Exception::addContext(), description(), Exception, implBeginStream(), edm::LuminosityBlockIndex::invalidLuminosityBlockIndex(), edm::RunIndex::invalidRunIndex(), edm::StreamContext::kBeginStream, edm::ModuleCallingContext::kRunning, moduleCallingContext_, edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), edm::StreamContext::setEventID(), edm::StreamContext::setLuminosityBlockIndex(), edm::StreamContext::setRunIndex(), edm::ModuleCallingContext::setState(), edm::StreamContext::setTimestamp(), edm::StreamContext::setTransition(), state_, and edm::convertException::wrap().

280  {
281  try {
282  convertException::wrap([&]() {
283  streamContext.setTransition(StreamContext::Transition::kBeginStream);
284  streamContext.setEventID(EventID(0, 0, 0));
285  streamContext.setRunIndex(RunIndex::invalidRunIndex());
286  streamContext.setLuminosityBlockIndex(LuminosityBlockIndex::invalidLuminosityBlockIndex());
287  streamContext.setTimestamp(Timestamp());
288  ParentContext parentContext(&streamContext);
289  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
291  ModuleBeginStreamSignalSentry beginSentry(actReg_.get(), streamContext, moduleCallingContext_);
292  implBeginStream(id);
293  });
294  } catch (cms::Exception& ex) {
295  state_ = Exception;
296  std::ostringstream ost;
297  ost << "Calling beginStream for module " << description()->moduleName() << "/'" << description()->moduleLabel()
298  << "'";
299  ex.addContext(ost.str());
300  throw;
301  }
302  }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:599
std::string const & moduleName() const
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:594
static RunIndex invalidRunIndex()
Definition: RunIndex.cc:9
std::atomic< State > state_
Definition: Worker.h:590
static LuminosityBlockIndex invalidLuminosityBlockIndex()
ModuleDescription const * description() const
Definition: Worker.h:189
void addContext(std::string const &context)
Definition: Exception.cc:165
virtual void implBeginStream(StreamID)=0
auto wrap(F iFunc) -> decltype(iFunc())
std::string const & moduleLabel() const

◆ callWhenDoneAsync()

void edm::Worker::callWhenDoneAsync ( WaitingTaskHolder  task)
inline

Definition at line 168 of file Worker.h.

References edm::WaitingTaskList::add(), eostools::move(), TrackValidation_cff::task, and waitingTasks_.

Referenced by edm::SwitchProducerProductResolver::prefetchAsync_().

edm::WaitingTaskList waitingTasks_
Definition: Worker.h:603
void add(oneapi::tbb::task_group *, WaitingTask *)
Adds task to the waiting list.
def move(src, dest)
Definition: eostools.py:511

◆ clearCounters()

void edm::Worker::clearCounters ( )
inline

Definition at line 223 of file Worker.h.

References timesExcept_, timesFailed_, timesPassed_, timesRun_, and timesVisited_.

Referenced by edm::StreamSchedule::clearCounters().

223  {
224  timesRun_.store(0, std::memory_order_release);
225  timesVisited_.store(0, std::memory_order_release);
226  timesPassed_.store(0, std::memory_order_release);
227  timesFailed_.store(0, std::memory_order_release);
228  timesExcept_.store(0, std::memory_order_release);
229  }
std::atomic< int > timesVisited_
Definition: Worker.h:586
std::atomic< int > timesExcept_
Definition: Worker.h:589
std::atomic< int > timesFailed_
Definition: Worker.h:588
std::atomic< int > timesPassed_
Definition: Worker.h:587
std::atomic< int > timesRun_
Definition: Worker.h:585

◆ clearModule()

void edm::Worker::clearModule ( )
inline

Definition at line 123 of file Worker.h.

References doClearModule(), and moduleValid_.

123  {
124  moduleValid_ = false;
125  doClearModule();
126  }
bool moduleValid_
Definition: Worker.h:606
virtual void doClearModule()=0

◆ consumesInfo()

virtual std::vector<ConsumesInfo> edm::Worker::consumesInfo ( ) const
pure virtual

◆ convertCurrentProcessAlias()

virtual void edm::Worker::convertCurrentProcessAlias ( std::string const &  processName)
pure virtual

Implemented in edm::WorkerT< T >.

◆ description()

ModuleDescription const* edm::Worker::description ( ) const
inline

◆ doClearModule()

virtual void edm::Worker::doClearModule ( )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by clearModule().

◆ doWorkAsync()

template<typename T >
void edm::Worker::doWorkAsync ( WaitingTaskHolder  task,
typename T::TransitionInfoType const &  transitionInfo,
ServiceToken const &  token,
StreamID  streamID,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

Definition at line 953 of file Worker.h.

References edm::WaitingTaskList::add(), visDQMUpload::context, watchdog::group, hasAcquire(), info(), edm::ModuleCallingContext::kPrefetching, edm::make_waiting_task(), moduleCallingContext_, eostools::move(), AlCaHLTBitMon_ParallelJobs::p, prePrefetchSelectionAsync(), fetchall_from_DQM_v2::release, alignCSCRings::s, edm::ModuleCallingContext::setContext(), submitPVValidationJobs::t, TrackValidation_cff::task, timesVisited_, unpackBuffers-CaloStage2::token, waitingTasks_, and workStarted_.

Referenced by edm::UnscheduledProductResolver::prefetchAsync_(), and edm::WorkerInPath::runWorkerAsync().

958  {
959  if (not workerhelper::CallImpl<T>::wantsTransition(this)) {
960  return;
961  }
962 
963  //Need to check workStarted_ before adding to waitingTasks_
964  bool expected = false;
965  bool workStarted = workStarted_.compare_exchange_strong(expected, true);
966 
968  if constexpr (T::isEvent_) {
969  timesVisited_.fetch_add(1, std::memory_order_relaxed);
970  }
971 
972  if (workStarted) {
974 
975  //if have TriggerResults based selection we want to reject the event before doing prefetching
976  if (workerhelper::CallImpl<T>::needToRunSelection(this)) {
977  //We need to run the selection in a different task so that
978  // we can prefetch the data needed for the selection
979  WaitingTask* moduleTask =
980  new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
981 
982  //make sure the task is either run or destroyed
983  struct DestroyTask {
984  DestroyTask(edm::WaitingTask* iTask) : m_task(iTask) {}
985 
986  ~DestroyTask() {
987  auto p = m_task.exchange(nullptr);
988  if (p) {
989  TaskSentry s{p};
990  }
991  }
992 
993  edm::WaitingTask* release() { return m_task.exchange(nullptr); }
994 
995  private:
996  std::atomic<edm::WaitingTask*> m_task;
997  };
998  if constexpr (T::isEvent_) {
999  if (hasAcquire()) {
1000  auto ownRunTask = std::make_shared<DestroyTask>(moduleTask);
1001  ServiceWeakToken weakToken = token;
1002  auto* group = task.group();
1003  moduleTask = make_waiting_task(
1004  [this, weakToken, transitionInfo, parentContext, ownRunTask, group](std::exception_ptr const* iExcept) {
1005  WaitingTaskWithArenaHolder runTaskHolder(
1006  *group, new HandleExternalWorkExceptionTask(this, group, ownRunTask->release(), parentContext));
1007  AcquireTask<T> t(this, transitionInfo, weakToken.lock(), parentContext, runTaskHolder);
1008  t.execute();
1009  });
1010  }
1011  }
1012  auto* group = task.group();
1013  auto ownModuleTask = std::make_shared<DestroyTask>(moduleTask);
1014  ServiceWeakToken weakToken = token;
1015  auto selectionTask =
1016  make_waiting_task([ownModuleTask, parentContext, info = transitionInfo, weakToken, group, this](
1017  std::exception_ptr const*) mutable {
1018  ServiceRegistry::Operate guard(weakToken.lock());
1019  prefetchAsync<T>(WaitingTaskHolder(*group, ownModuleTask->release()),
1020  weakToken.lock(),
1021  parentContext,
1022  info,
1023  T::transition_);
1024  });
1025  prePrefetchSelectionAsync(*group, selectionTask, token, streamID, &transitionInfo.principal());
1026  } else {
1027  WaitingTask* moduleTask =
1028  new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
1029  auto group = task.group();
1030  if constexpr (T::isEvent_) {
1031  if (hasAcquire()) {
1032  WaitingTaskWithArenaHolder runTaskHolder(
1033  *group, new HandleExternalWorkExceptionTask(this, group, moduleTask, parentContext));
1034  moduleTask = new AcquireTask<T>(this, transitionInfo, token, parentContext, std::move(runTaskHolder));
1035  }
1036  }
1037  prefetchAsync<T>(WaitingTaskHolder(*group, moduleTask), token, parentContext, transitionInfo, T::transition_);
1038  }
1039  }
1040  }
virtual bool hasAcquire() const =0
std::atomic< int > timesVisited_
Definition: Worker.h:586
static const TGPicture * info(bool iBackgroundIsBlack)
std::atomic< bool > workStarted_
Definition: Worker.h:604
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:594
void prePrefetchSelectionAsync(oneapi::tbb::task_group &, WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *)
Definition: Worker.cc:150
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:603
void add(oneapi::tbb::task_group *, WaitingTask *)
Adds task to the waiting list.
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
def move(src, dest)
Definition: eostools.py:511

◆ doWorkNoPrefetchingAsync()

template<typename T >
void edm::Worker::doWorkNoPrefetchingAsync ( WaitingTaskHolder  task,
typename T::TransitionInfoType const &  transitionInfo,
ServiceToken const &  serviceToken,
StreamID  streamID,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

Definition at line 1068 of file Worker.h.

References edm::WaitingTaskList::add(), CMS_SA_ALLOW, visDQMUpload::context, edm::WaitingTaskList::doneWaiting(), esPrefetchAsync(), watchdog::group, info(), edm::ServiceWeakToken::lock(), edm::make_waiting_task(), eostools::move(), needsESPrefetching(), createBeamHaloJobs::queue, serializeRunModule(), TrackValidation_cff::task, waitingTasks_, and workStarted_.

Referenced by edm::WorkerInPath::runWorkerAsync().

1073  {
1074  if (not workerhelper::CallImpl<T>::wantsTransition(this)) {
1075  return;
1076  }
1077 
1078  //Need to check workStarted_ before adding to waitingTasks_
1079  bool expected = false;
1080  auto workStarted = workStarted_.compare_exchange_strong(expected, true);
1081 
1083  if (workStarted) {
1084  ServiceWeakToken weakToken = serviceToken;
1085  auto toDo = [this, info = transitionInfo, streamID, parentContext, context, weakToken]() {
1086  std::exception_ptr exceptionPtr;
1087  // Caught exception is propagated via WaitingTaskList
1088  CMS_SA_ALLOW try {
1089  //Need to make the services available
1090  ServiceRegistry::Operate guard(weakToken.lock());
1091 
1092  this->runModule<T>(info, streamID, parentContext, context);
1093  } catch (...) {
1094  exceptionPtr = std::current_exception();
1095  }
1096  this->waitingTasks_.doneWaiting(exceptionPtr);
1097  };
1098 
1099  if (needsESPrefetching(T::transition_)) {
1100  auto group = task.group();
1101  auto afterPrefetch =
1102  edm::make_waiting_task([toDo = std::move(toDo), group, this](std::exception_ptr const* iExcept) {
1103  if (iExcept) {
1104  this->waitingTasks_.doneWaiting(*iExcept);
1105  } else {
1106  if (auto queue = this->serializeRunModule()) {
1107  queue.push(*group, toDo);
1108  } else {
1109  group->run(toDo);
1110  }
1111  }
1112  });
1114  WaitingTaskHolder(*group, afterPrefetch), transitionInfo.eventSetupImpl(), T::transition_, serviceToken);
1115  } else {
1116  auto group = task.group();
1117  if (auto queue = this->serializeRunModule()) {
1118  queue.push(*group, toDo);
1119  } else {
1120  group->run(toDo);
1121  }
1122  }
1123  }
1124  }
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
std::atomic< bool > workStarted_
Definition: Worker.h:604
bool needsESPrefetching(Transition iTrans) const noexcept
Definition: Worker.h:349
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:603
void add(oneapi::tbb::task_group *, WaitingTask *)
Adds task to the waiting list.
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
void esPrefetchAsync(WaitingTaskHolder, EventSetupImpl const &, Transition, ServiceToken const &)
Definition: Worker.cc:199
virtual TaskQueueAdaptor serializeRunModule()=0
def move(src, dest)
Definition: eostools.py:511

◆ edPrefetchAsync()

void edm::Worker::edPrefetchAsync ( WaitingTaskHolder  iTask,
ServiceToken const &  token,
Principal const &  iPrincipal 
) const
private

Definition at line 224 of file Worker.cc.

References edm::Principal::branchType(), B2GTnPMonitor_cfi::item, mps_monitormerge::items, itemsToGetFrom(), moduleCallingContext_, edm::Principal::prefetchAsync(), edm::ProductResolverIndexAmbiguous, and unpackBuffers-CaloStage2::token.

Referenced by prefetchAsync().

224  {
225  // Prefetch products the module declares it consumes
226  std::vector<ProductResolverIndexAndSkipBit> const& items = itemsToGetFrom(iPrincipal.branchType());
227 
228  for (auto const& item : items) {
229  ProductResolverIndex productResolverIndex = item.productResolverIndex();
230  bool skipCurrentProcess = item.skipCurrentProcess();
231  if (productResolverIndex != ProductResolverIndexAmbiguous) {
232  iPrincipal.prefetchAsync(iTask, productResolverIndex, skipCurrentProcess, token, &moduleCallingContext_);
233  }
234  }
235  }
unsigned int ProductResolverIndex
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom(BranchType) const =0
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:594

◆ emitPostModuleEventPrefetchingSignal()

void edm::Worker::emitPostModuleEventPrefetchingSignal ( )
inlineprivate

Definition at line 353 of file Worker.h.

References actReg_, edm::ModuleCallingContext::getStreamContext(), and moduleCallingContext_.

Referenced by edm::Worker::RunModuleTask< T >::execute().

353  {
354  actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
355  }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:599
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:594
StreamContext const * getStreamContext() const

◆ emitPostModuleGlobalPrefetchingSignal()

void edm::Worker::emitPostModuleGlobalPrefetchingSignal ( )
inlineprivate

Definition at line 362 of file Worker.h.

References actReg_, edm::ModuleCallingContext::getGlobalContext(), and moduleCallingContext_.

Referenced by edm::Worker::RunModuleTask< T >::execute().

362  {
363  actReg_->postModuleGlobalPrefetchingSignal_.emit(*moduleCallingContext_.getGlobalContext(),
365  }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:599
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:594
GlobalContext const * getGlobalContext() const

◆ emitPostModuleStreamPrefetchingSignal()

void edm::Worker::emitPostModuleStreamPrefetchingSignal ( )
inlineprivate

Definition at line 357 of file Worker.h.

References actReg_, edm::ModuleCallingContext::getStreamContext(), and moduleCallingContext_.

Referenced by edm::Worker::RunModuleTask< T >::execute().

357  {
358  actReg_->postModuleStreamPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),
360  }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:599
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:594
StreamContext const * getStreamContext() const

◆ endJob()

void edm::Worker::endJob ( void  )

Definition at line 263 of file Worker.cc.

References actReg_, cms::Exception::addContext(), cms::cuda::assert(), submitPVResolutionJobs::desc, description(), Exception, implEndJob(), edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), state_, and edm::convertException::wrap().

263  {
264  try {
265  convertException::wrap([&]() {
266  ModuleDescription const* desc = description();
267  assert(desc != nullptr);
268  ModuleEndJobSignalSentry cpp(actReg_.get(), *desc);
269  implEndJob();
270  });
271  } catch (cms::Exception& ex) {
272  state_ = Exception;
273  std::ostringstream ost;
274  ost << "Calling endJob for module " << description()->moduleName() << "/'" << description()->moduleLabel() << "'";
275  ex.addContext(ost.str());
276  throw;
277  }
278  }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:599
std::string const & moduleName() const
assert(be >=bs)
std::atomic< State > state_
Definition: Worker.h:590
virtual void implEndJob()=0
ModuleDescription const * description() const
Definition: Worker.h:189
void addContext(std::string const &context)
Definition: Exception.cc:165
auto wrap(F iFunc) -> decltype(iFunc())
std::string const & moduleLabel() const

◆ endStream()

void edm::Worker::endStream ( StreamID  id,
StreamContext streamContext 
)

Definition at line 304 of file Worker.cc.

References actReg_, cms::Exception::addContext(), description(), Exception, implEndStream(), edm::LuminosityBlockIndex::invalidLuminosityBlockIndex(), edm::RunIndex::invalidRunIndex(), edm::StreamContext::kEndStream, edm::ModuleCallingContext::kRunning, moduleCallingContext_, edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), edm::StreamContext::setEventID(), edm::StreamContext::setLuminosityBlockIndex(), edm::StreamContext::setRunIndex(), edm::ModuleCallingContext::setState(), edm::StreamContext::setTimestamp(), edm::StreamContext::setTransition(), state_, and edm::convertException::wrap().

304  {
305  try {
306  convertException::wrap([&]() {
307  streamContext.setTransition(StreamContext::Transition::kEndStream);
308  streamContext.setEventID(EventID(0, 0, 0));
309  streamContext.setRunIndex(RunIndex::invalidRunIndex());
310  streamContext.setLuminosityBlockIndex(LuminosityBlockIndex::invalidLuminosityBlockIndex());
311  streamContext.setTimestamp(Timestamp());
312  ParentContext parentContext(&streamContext);
313  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
315  ModuleEndStreamSignalSentry endSentry(actReg_.get(), streamContext, moduleCallingContext_);
316  implEndStream(id);
317  });
318  } catch (cms::Exception& ex) {
319  state_ = Exception;
320  std::ostringstream ost;
321  ost << "Calling endStream for module " << description()->moduleName() << "/'" << description()->moduleLabel()
322  << "'";
323  ex.addContext(ost.str());
324  throw;
325  }
326  }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:599
std::string const & moduleName() const
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:594
static RunIndex invalidRunIndex()
Definition: RunIndex.cc:9
std::atomic< State > state_
Definition: Worker.h:590
static LuminosityBlockIndex invalidLuminosityBlockIndex()
ModuleDescription const * description() const
Definition: Worker.h:189
void addContext(std::string const &context)
Definition: Exception.cc:165
virtual void implEndStream(StreamID)=0
auto wrap(F iFunc) -> decltype(iFunc())
std::string const & moduleLabel() const

◆ esItemsToGetFrom()

virtual std::vector<ESProxyIndex> const& edm::Worker::esItemsToGetFrom ( Transition  ) const
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by esPrefetchAsync(), and needsESPrefetching().

◆ esPrefetchAsync()

void edm::Worker::esPrefetchAsync ( WaitingTaskHolder  iTask,
EventSetupImpl const &  iImpl,
Transition  iTrans,
ServiceToken const &  iToken 
)
private

Definition at line 199 of file Worker.cc.

References cms::cuda::assert(), esItemsToGetFrom(), esRecordsToGetFrom(), edm::EventSetupImpl::findImpl(), mps_fire::i, mps_monitormerge::items, moduleCallingContext_, edm::NumberOfEventSetupTransitions, and edm::eventsetup::EventSetupRecordImpl::prefetchAsync().

Referenced by doWorkNoPrefetchingAsync(), edm::workerhelper::CallImpl< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > >::esPrefetchAsync(), edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > >::esPrefetchAsync(), edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > >::esPrefetchAsync(), edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > >::esPrefetchAsync(), edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > >::esPrefetchAsync(), edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin > >::esPrefetchAsync(), edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin > >::esPrefetchAsync(), edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd > >::esPrefetchAsync(), and edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd > >::esPrefetchAsync().

202  {
204  return;
205  }
206  auto const& recs = esRecordsToGetFrom(iTrans);
207  auto const& items = esItemsToGetFrom(iTrans);
208 
209  assert(items.size() == recs.size());
210  if (items.empty()) {
211  return;
212  }
213 
214  for (size_t i = 0; i != items.size(); ++i) {
215  if (recs[i] != ESRecordIndex{}) {
216  auto rec = iImpl.findImpl(recs[i]);
217  if (rec) {
218  rec->prefetchAsync(iTask, items[i], &iImpl, iToken, ESParentContext(&moduleCallingContext_));
219  }
220  }
221  }
222  }
assert(be >=bs)
virtual std::vector< ESProxyIndex > const & esItemsToGetFrom(Transition) const =0
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:594
virtual std::vector< ESRecordIndex > const & esRecordsToGetFrom(Transition) const =0

◆ esRecordsToGetFrom()

virtual std::vector<ESRecordIndex> const& edm::Worker::esRecordsToGetFrom ( Transition  ) const
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by esPrefetchAsync().

◆ globalLuminosityBlocksQueue()

virtual SerialTaskQueue* edm::Worker::globalLuminosityBlocksQueue ( )
pure virtual

◆ globalRunsQueue()

virtual SerialTaskQueue* edm::Worker::globalRunsQueue ( )
pure virtual

◆ handleExternalWorkException()

std::exception_ptr edm::Worker::handleExternalWorkException ( std::exception_ptr  iEPtr,
ParentContext const &  parentContext 
)
private

Definition at line 391 of file Worker.cc.

References edm::exceptionContext(), moduleCallingContext_, ranAcquireWithoutException_, and edm::convertException::wrap().

391  {
393  try {
394  convertException::wrap([iEPtr]() { std::rethrow_exception(iEPtr); });
395  } catch (cms::Exception& ex) {
396  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
398  return std::current_exception();
399  }
400  }
401  return iEPtr;
402  }
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
bool ranAcquireWithoutException_
Definition: Worker.h:605
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:594
auto wrap(F iFunc) -> decltype(iFunc())

◆ hasAccumulator()

virtual bool edm::Worker::hasAccumulator ( ) const
pure virtual

◆ hasAcquire()

virtual bool edm::Worker::hasAcquire ( ) const
privatepure virtual

◆ implBeginJob()

virtual void edm::Worker::implBeginJob ( )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by beginJob().

◆ implBeginStream()

virtual void edm::Worker::implBeginStream ( StreamID  )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by beginStream().

◆ implDo()

virtual bool edm::Worker::implDo ( EventTransitionInfo const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

◆ implDoAccessInputProcessBlock()

virtual bool edm::Worker::implDoAccessInputProcessBlock ( ProcessBlockPrincipal const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

◆ implDoAcquire()

virtual void edm::Worker::implDoAcquire ( EventTransitionInfo const &  ,
ModuleCallingContext const *  ,
WaitingTaskWithArenaHolder  
)
protectedpure virtual

◆ implDoBegin() [1/2]

virtual bool edm::Worker::implDoBegin ( RunTransitionInfo const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

◆ implDoBegin() [2/2]

virtual bool edm::Worker::implDoBegin ( LumiTransitionInfo const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

◆ implDoBeginProcessBlock()

virtual bool edm::Worker::implDoBeginProcessBlock ( ProcessBlockPrincipal const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

◆ implDoEnd() [1/2]

virtual bool edm::Worker::implDoEnd ( RunTransitionInfo const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

◆ implDoEnd() [2/2]

virtual bool edm::Worker::implDoEnd ( LumiTransitionInfo const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

◆ implDoEndProcessBlock()

virtual bool edm::Worker::implDoEndProcessBlock ( ProcessBlockPrincipal const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

◆ implDoPrePrefetchSelection()

virtual bool edm::Worker::implDoPrePrefetchSelection ( StreamID  ,
EventPrincipal const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

◆ implDoStreamBegin() [1/2]

virtual bool edm::Worker::implDoStreamBegin ( StreamID  ,
RunTransitionInfo const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

◆ implDoStreamBegin() [2/2]

virtual bool edm::Worker::implDoStreamBegin ( StreamID  ,
LumiTransitionInfo const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

◆ implDoStreamEnd() [1/2]

virtual bool edm::Worker::implDoStreamEnd ( StreamID  ,
RunTransitionInfo const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

◆ implDoStreamEnd() [2/2]

virtual bool edm::Worker::implDoStreamEnd ( StreamID  ,
LumiTransitionInfo const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

◆ implEndJob()

virtual void edm::Worker::implEndJob ( )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by endJob().

◆ implEndStream()

virtual void edm::Worker::implEndStream ( StreamID  )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by endStream().

◆ implNeedToRunSelection()

virtual bool edm::Worker::implNeedToRunSelection ( ) const
protectedpure virtual

◆ implRegisterThinnedAssociations()

virtual void edm::Worker::implRegisterThinnedAssociations ( ProductRegistry const &  ,
ThinnedAssociationsHelper  
)
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by registerThinnedAssociations().

◆ implRespondToCloseInputFile()

virtual void edm::Worker::implRespondToCloseInputFile ( FileBlock const &  fb)
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by respondToCloseInputFile().

◆ implRespondToCloseOutputFile()

virtual void edm::Worker::implRespondToCloseOutputFile ( )
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by respondToCloseOutputFile().

◆ implRespondToOpenInputFile()

virtual void edm::Worker::implRespondToOpenInputFile ( FileBlock const &  fb)
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by respondToOpenInputFile().

◆ itemsMayGet()

virtual void edm::Worker::itemsMayGet ( BranchType  ,
std::vector< ProductResolverIndexAndSkipBit > &   
) const
privatepure virtual

Implemented in edm::WorkerT< T >.

◆ itemsShouldPutInEvent()

virtual std::vector<ProductResolverIndex> const& edm::Worker::itemsShouldPutInEvent ( ) const
privatepure virtual

Implemented in edm::WorkerT< T >.

◆ itemsToGet()

virtual void edm::Worker::itemsToGet ( BranchType  ,
std::vector< ProductResolverIndexAndSkipBit > &   
) const
privatepure virtual

Implemented in edm::WorkerT< T >.

◆ itemsToGetForSelection()

virtual void edm::Worker::itemsToGetForSelection ( std::vector< ProductResolverIndexAndSkipBit > &  ) const
protectedpure virtual

◆ itemsToGetFrom()

virtual std::vector<ProductResolverIndexAndSkipBit> const& edm::Worker::itemsToGetFrom ( BranchType  ) const
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by edPrefetchAsync().

◆ moduleConcurrencyType()

virtual ConcurrencyTypes edm::Worker::moduleConcurrencyType ( ) const
pure virtual

◆ modulesWhoseProductsAreConsumed()

virtual void edm::Worker::modulesWhoseProductsAreConsumed ( std::array< std::vector< ModuleDescription const *> *, NumBranchTypes > &  modules,
std::vector< ModuleProcessName > &  modulesInPreviousProcesses,
ProductRegistry const &  preg,
std::map< std::string, ModuleDescription const *> const &  labelsToDesc 
) const
pure virtual

Implemented in edm::WorkerT< T >.

◆ moduleType()

virtual Types edm::Worker::moduleType ( ) const
pure virtual

◆ needsESPrefetching()

bool edm::Worker::needsESPrefetching ( Transition  iTrans) const
inlineprivatenoexcept

Definition at line 349 of file Worker.h.

References esItemsToGetFrom(), and edm::NumberOfEventSetupTransitions.

Referenced by doWorkNoPrefetchingAsync().

349  {
350  return iTrans < edm::Transition::NumberOfEventSetupTransitions ? not esItemsToGetFrom(iTrans).empty() : false;
351  }
virtual std::vector< ESProxyIndex > const & esItemsToGetFrom(Transition) const =0

◆ operator=()

Worker& edm::Worker::operator= ( Worker const &  )
delete

◆ postDoEvent()

void edm::Worker::postDoEvent ( EventPrincipal const &  iEvent)

Definition at line 346 of file Worker.cc.

References earlyDeleteHelper_, and iEvent.

346  {
347  if (earlyDeleteHelper_) {
348  earlyDeleteHelper_->moduleRan(iEvent);
349  }
350  }
int iEvent
Definition: GenABIO.cc:224
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:601

◆ preActionBeforeRunEventAsync()

virtual void edm::Worker::preActionBeforeRunEventAsync ( WaitingTaskHolder  iTask,
ModuleCallingContext const &  moduleCallingContext,
Principal const &  iPrincipal 
) const
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by prefetchAsync().

◆ prefetchAsync()

template<typename T >
void edm::Worker::prefetchAsync ( WaitingTaskHolder  iTask,
ServiceToken const &  token,
ParentContext const &  parentContext,
typename T::TransitionInfoType const &  transitionInfo,
Transition  iTransition 
)
private

Definition at line 927 of file Worker.h.

References actReg_, edm::Principal::branchType(), edPrefetchAsync(), edm::ModuleCallingContext::getGlobalContext(), edm::ModuleCallingContext::getStreamContext(), edm::InEvent, edm::ModuleCallingContext::kPrefetching, moduleCallingContext_, preActionBeforeRunEventAsync(), edm::ModuleCallingContext::setContext(), and unpackBuffers-CaloStage2::token.

931  {
932  Principal const& principal = transitionInfo.principal();
933 
935 
936  if constexpr (T::isEvent_) {
937  actReg_->preModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
938  } else if constexpr (std::is_same_v<typename T::Context, StreamContext>) {
939  actReg_->preModuleStreamPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
940  } else if constexpr (std::is_same_v<typename T::Context, GlobalContext>) {
941  actReg_->preModuleGlobalPrefetchingSignal_.emit(*moduleCallingContext_.getGlobalContext(), moduleCallingContext_);
942  }
943 
944  workerhelper::CallImpl<T>::esPrefetchAsync(this, iTask, token, transitionInfo, iTransition);
945  edPrefetchAsync(iTask, token, principal);
946 
947  if (principal.branchType() == InEvent) {
949  }
950  }
virtual void preActionBeforeRunEventAsync(WaitingTaskHolder iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const =0
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:599
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:594
void edPrefetchAsync(WaitingTaskHolder, ServiceToken const &, Principal const &) const
Definition: Worker.cc:224
StreamContext const * getStreamContext() const
GlobalContext const * getGlobalContext() const

◆ prePrefetchSelectionAsync() [1/2]

void edm::Worker::prePrefetchSelectionAsync ( oneapi::tbb::task_group &  group,
WaitingTask task,
ServiceToken const &  token,
StreamID  stream,
EventPrincipal const *  iPrincipal 
)

Definition at line 150 of file Worker.cc.

References CMS_SA_ALLOW, edm::TaskBase::decrement_ref_count(), edm::WaitingTaskList::doneWaiting(), edm::TaskBase::execute(), watchdog::group, implDoPrePrefetchSelection(), edm::TaskBase::increment_ref_count(), B2GTnPMonitor_cfi::item, mps_monitormerge::items, itemsToGetForSelection(), edm::ServiceWeakToken::lock(), edm::make_waiting_task(), moduleCallingContext_, edm::Principal::prefetchAsync(), edm::ProductResolverIndexAmbiguous, alignCSCRings::s, timesRun_, unpackBuffers-CaloStage2::token, and waitingTasks_.

Referenced by doWorkAsync().

154  {
155  successTask->increment_ref_count();
156 
157  ServiceWeakToken weakToken = token;
158  auto choiceTask =
159  edm::make_waiting_task([id, successTask, iPrincipal, this, weakToken, &group](std::exception_ptr const*) {
160  ServiceRegistry::Operate guard(weakToken.lock());
161  // There is no reasonable place to rethrow, and implDoPrePrefetchSelection() should not throw in the first place.
162  CMS_SA_ALLOW try {
163  if (not implDoPrePrefetchSelection(id, *iPrincipal, &moduleCallingContext_)) {
164  timesRun_.fetch_add(1, std::memory_order_relaxed);
165  setPassed<true>();
166  waitingTasks_.doneWaiting(nullptr);
167  //TBB requires that destroyed tasks have count 0
168  if (0 == successTask->decrement_ref_count()) {
169  TaskSentry s(successTask);
170  }
171  return;
172  }
173  } catch (...) {
174  }
175  if (0 == successTask->decrement_ref_count()) {
176  group.run([successTask]() {
177  TaskSentry s(successTask);
178  successTask->execute();
179  });
180  }
181  });
182 
183  WaitingTaskHolder choiceHolder{group, choiceTask};
184 
185  std::vector<ProductResolverIndexAndSkipBit> items;
187 
188  for (auto const& item : items) {
189  ProductResolverIndex productResolverIndex = item.productResolverIndex();
190  bool skipCurrentProcess = item.skipCurrentProcess();
191  if (productResolverIndex != ProductResolverIndexAmbiguous) {
192  iPrincipal->prefetchAsync(
193  choiceHolder, productResolverIndex, skipCurrentProcess, token, &moduleCallingContext_);
194  }
195  }
196  choiceHolder.doneWaiting(std::exception_ptr{});
197  }
#define CMS_SA_ALLOW
unsigned int ProductResolverIndex
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:594
virtual bool implDoPrePrefetchSelection(StreamID, EventPrincipal const &, ModuleCallingContext const *)=0
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:603
virtual void itemsToGetForSelection(std::vector< ProductResolverIndexAndSkipBit > &) const =0
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
std::atomic< int > timesRun_
Definition: Worker.h:585

◆ prePrefetchSelectionAsync() [2/2]

void edm::Worker::prePrefetchSelectionAsync ( oneapi::tbb::task_group &  ,
WaitingTask task,
ServiceToken const &  ,
StreamID  stream,
void const *   
)
inline

Definition at line 141 of file Worker.h.

References cms::cuda::assert().

142  {
143  assert(false);
144  }
assert(be >=bs)

◆ registerThinnedAssociations()

void edm::Worker::registerThinnedAssociations ( ProductRegistry const &  registry,
ThinnedAssociationsHelper helper 
)

Definition at line 328 of file Worker.cc.

References cms::Exception::addContext(), description(), implRegisterThinnedAssociations(), and HerwigMaxPtPartonFilter_cfi::moduleLabel.

328  {
329  try {
331  } catch (cms::Exception& ex) {
332  ex.addContext("Calling registerThinnedAssociations() for module " + description()->moduleLabel());
333  throw ex;
334  }
335  }
Definition: helper.py:1
virtual void implRegisterThinnedAssociations(ProductRegistry const &, ThinnedAssociationsHelper &)=0
ModuleDescription const * description() const
Definition: Worker.h:189
void addContext(std::string const &context)
Definition: Exception.cc:165

◆ reset()

void edm::Worker::reset ( void  )
inline

Definition at line 179 of file Worker.h.

References cached_exception_, numberOfPathsLeftToRun_, numberOfPathsOn_, Ready, edm::WaitingTaskList::reset(), state_, waitingTasks_, and workStarted_.

Referenced by edm::WorkerManager::resetAll().

179  {
180  cached_exception_ = std::exception_ptr();
181  state_ = Ready;
183  workStarted_ = false;
185  }
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:592
std::atomic< bool > workStarted_
Definition: Worker.h:604
void reset()
Resets access to the resource so that added tasks will wait.
std::exception_ptr cached_exception_
Definition: Worker.h:597
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:603
std::atomic< State > state_
Definition: Worker.h:590
int numberOfPathsOn_
Definition: Worker.h:591

◆ resetModuleDescription()

void edm::Worker::resetModuleDescription ( ModuleDescription const *  iDesc)
protected

◆ resolvePutIndicies()

virtual void edm::Worker::resolvePutIndicies ( BranchType  iBranchType,
std::unordered_multimap< std::string, std::tuple< TypeID const *, const char *, edm::ProductResolverIndex >> const &  iIndicies 
)
pure virtual

Implemented in edm::WorkerT< T >.

◆ respondToCloseInputFile()

void edm::Worker::respondToCloseInputFile ( FileBlock const &  fb)
inline

Definition at line 175 of file Worker.h.

References implRespondToCloseInputFile().

Referenced by edm::Schedule::respondToCloseInputFile().

virtual void implRespondToCloseInputFile(FileBlock const &fb)=0

◆ respondToCloseOutputFile()

void edm::Worker::respondToCloseOutputFile ( )
inline

Definition at line 176 of file Worker.h.

References implRespondToCloseOutputFile().

virtual void implRespondToCloseOutputFile()=0

◆ respondToOpenInputFile()

void edm::Worker::respondToOpenInputFile ( FileBlock const &  fb)
inline

Definition at line 174 of file Worker.h.

References implRespondToOpenInputFile().

Referenced by edm::Schedule::respondToOpenInputFile().

virtual void implRespondToOpenInputFile(FileBlock const &fb)=0

◆ runAcquire()

void edm::Worker::runAcquire ( EventTransitionInfo const &  info,
ParentContext const &  parentContext,
WaitingTaskWithArenaHolder holder 
)
private

Definition at line 352 of file Worker.cc.

References edm::exceptionContext(), implDoAcquire(), moduleCallingContext_, shouldRethrowException(), timesRun_, and edm::convertException::wrap().

Referenced by runAcquireAfterAsyncPrefetch().

354  {
355  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
356  try {
357  convertException::wrap([&]() { this->implDoAcquire(info, &moduleCallingContext_, holder); });
358  } catch (cms::Exception& ex) {
360  if (shouldRethrowException(std::current_exception(), parentContext, true)) {
361  timesRun_.fetch_add(1, std::memory_order_relaxed);
362  throw;
363  }
364  }
365  }
virtual void implDoAcquire(EventTransitionInfo const &, ModuleCallingContext const *, WaitingTaskWithArenaHolder &)=0
static const TGPicture * info(bool iBackgroundIsBlack)
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:594
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent) const
Definition: Worker.cc:110
auto wrap(F iFunc) -> decltype(iFunc())
std::atomic< int > timesRun_
Definition: Worker.h:585

◆ runAcquireAfterAsyncPrefetch()

void edm::Worker::runAcquireAfterAsyncPrefetch ( std::exception_ptr  iEPtr,
EventTransitionInfo const &  eventTransitionInfo,
ParentContext const &  parentContext,
WaitingTaskWithArenaHolder  holder 
)
private

Definition at line 367 of file Worker.cc.

References CMS_SA_ALLOW, edm::WaitingTaskWithArenaHolder::doneWaiting(), edm::ModuleCallingContext::kInvalid, moduleCallingContext_, ranAcquireWithoutException_, runAcquire(), edm::ModuleCallingContext::setContext(), and shouldRethrowException().

370  {
372  std::exception_ptr exceptionPtr;
373  if (iEPtr) {
374  if (shouldRethrowException(iEPtr, parentContext, true)) {
375  exceptionPtr = iEPtr;
376  }
378  } else {
379  // Caught exception is propagated via WaitingTaskWithArenaHolder
380  CMS_SA_ALLOW try {
381  runAcquire(eventTransitionInfo, parentContext, holder);
383  } catch (...) {
384  exceptionPtr = std::current_exception();
385  }
386  }
387  // It is important this is after runAcquire completely finishes
388  holder.doneWaiting(exceptionPtr);
389  }
#define CMS_SA_ALLOW
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
bool ranAcquireWithoutException_
Definition: Worker.h:605
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:594
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent) const
Definition: Worker.cc:110
void runAcquire(EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder &)
Definition: Worker.cc:352

◆ runModule()

template<typename T >
bool edm::Worker::runModule ( typename T::TransitionInfoType const &  transitionInfo,
StreamID  streamID,
ParentContext const &  parentContext,
typename T::Context const *  context 
)
private

Definition at line 1127 of file Worker.h.

References actReg_, cms::cuda::assert(), cached_exception_, visDQMUpload::context, edm::exceptionContext(), moduleCallingContext_, shouldRethrowException(), timesRun_, and edm::convertException::wrap().

1130  {
1131  //unscheduled producers should advance this
1132  //if (T::isEvent_) {
1133  // ++timesVisited_;
1134  //}
1135  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
1136  if constexpr (T::isEvent_) {
1137  timesRun_.fetch_add(1, std::memory_order_relaxed);
1138  }
1139 
1140  bool rc = true;
1141  try {
1142  convertException::wrap([&]() {
1143  rc = workerhelper::CallImpl<T>::call(
1144  this, streamID, transitionInfo, actReg_.get(), &moduleCallingContext_, context);
1145 
1146  if (rc) {
1147  setPassed<T::isEvent_>();
1148  } else {
1149  setFailed<T::isEvent_>();
1150  }
1151  });
1152  } catch (cms::Exception& ex) {
1154  if (shouldRethrowException(std::current_exception(), parentContext, T::isEvent_)) {
1156  setException<T::isEvent_>(std::current_exception());
1157  std::rethrow_exception(cached_exception_);
1158  } else {
1159  rc = setPassed<T::isEvent_>();
1160  }
1161  }
1162 
1163  return rc;
1164  }
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:599
assert(be >=bs)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:594
std::exception_ptr cached_exception_
Definition: Worker.h:597
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent) const
Definition: Worker.cc:110
auto wrap(F iFunc) -> decltype(iFunc())
std::atomic< int > timesRun_
Definition: Worker.h:585

◆ runModuleAfterAsyncPrefetch()

template<typename T >
std::exception_ptr edm::Worker::runModuleAfterAsyncPrefetch ( std::exception_ptr  iEPtr,
typename T::TransitionInfoType const &  transitionInfo,
StreamID  streamID,
ParentContext const &  parentContext,
typename T::Context const *  context 
)
private

Definition at line 1043 of file Worker.h.

References CMS_SA_ALLOW, visDQMUpload::context, edm::WaitingTaskList::doneWaiting(), edm::ModuleCallingContext::kInvalid, moduleCallingContext_, edm::ModuleCallingContext::setContext(), shouldRethrowException(), and waitingTasks_.

Referenced by edm::Worker::RunModuleTask< T >::execute().

1047  {
1048  std::exception_ptr exceptionPtr;
1049  if (iEPtr) {
1050  if (shouldRethrowException(iEPtr, parentContext, T::isEvent_)) {
1051  exceptionPtr = iEPtr;
1052  setException<T::isEvent_>(exceptionPtr);
1053  } else {
1054  setPassed<T::isEvent_>();
1055  }
1057  } else {
1058  // Caught exception is propagated via WaitingTaskList
1059  CMS_SA_ALLOW try { runModule<T>(transitionInfo, streamID, parentContext, context); } catch (...) {
1060  exceptionPtr = std::current_exception();
1061  }
1062  }
1063  waitingTasks_.doneWaiting(exceptionPtr);
1064  return exceptionPtr;
1065  }
#define CMS_SA_ALLOW
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:594
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:603
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent) const
Definition: Worker.cc:110

◆ runModuleDirectly()

template<typename T >
std::exception_ptr edm::Worker::runModuleDirectly ( typename T::TransitionInfoType const &  transitionInfo,
StreamID  streamID,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

Definition at line 1167 of file Worker.h.

References visDQMUpload::context, and timesVisited_.

Referenced by edm::Path::finished().

1170  {
1171  timesVisited_.fetch_add(1, std::memory_order_relaxed);
1172  std::exception_ptr prefetchingException; // null because there was no prefetching to do
1173  return runModuleAfterAsyncPrefetch<T>(prefetchingException, transitionInfo, streamID, parentContext, context);
1174  }
std::atomic< int > timesVisited_
Definition: Worker.h:586

◆ selectInputProcessBlocks()

virtual void edm::Worker::selectInputProcessBlocks ( ProductRegistry const &  ,
ProcessBlockHelperBase const &   
)
pure virtual

Implemented in edm::WorkerT< T >.

◆ serializeRunModule()

virtual TaskQueueAdaptor edm::Worker::serializeRunModule ( )
privatepure virtual

◆ setActivityRegistry()

void edm::Worker::setActivityRegistry ( std::shared_ptr< ActivityRegistry areg)

The signals are required to live longer than the last call to 'doWork' this was done to improve performance based on profiling

Definition at line 108 of file Worker.cc.

References actReg_.

108 { actReg_ = areg; }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:599

◆ setEarlyDeleteHelper()

void edm::Worker::setEarlyDeleteHelper ( EarlyDeleteHelper iHelper)

Definition at line 237 of file Worker.cc.

References earlyDeleteHelper_.

237 { earlyDeleteHelper_ = iHelper; }
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:601

◆ setException()

template<bool IS_EVENT>
std::exception_ptr edm::Worker::setException ( std::exception_ptr  iException)
inlineprivate

Definition at line 330 of file Worker.h.

References cached_exception_, Exception, state_, and timesExcept_.

330  {
331  if (IS_EVENT) {
332  timesExcept_.fetch_add(1, std::memory_order_relaxed);
333  }
334  cached_exception_ = iException; // propagate_const<T> has no reset() function
335  state_ = Exception;
336  return cached_exception_;
337  }
std::atomic< int > timesExcept_
Definition: Worker.h:589
std::exception_ptr cached_exception_
Definition: Worker.h:597
std::atomic< State > state_
Definition: Worker.h:590

◆ setFailed()

template<bool IS_EVENT>
bool edm::Worker::setFailed ( )
inlineprivate

Definition at line 321 of file Worker.h.

References Fail, state_, and timesFailed_.

321  {
322  if (IS_EVENT) {
323  timesFailed_.fetch_add(1, std::memory_order_relaxed);
324  }
325  state_ = Fail;
326  return false;
327  }
std::atomic< int > timesFailed_
Definition: Worker.h:588
std::atomic< State > state_
Definition: Worker.h:590

◆ setPassed()

template<bool IS_EVENT>
bool edm::Worker::setPassed ( )
inlineprivate

Definition at line 312 of file Worker.h.

References Pass, state_, and timesPassed_.

312  {
313  if (IS_EVENT) {
314  timesPassed_.fetch_add(1, std::memory_order_relaxed);
315  }
316  state_ = Pass;
317  return true;
318  }
std::atomic< State > state_
Definition: Worker.h:590
std::atomic< int > timesPassed_
Definition: Worker.h:587

◆ shouldRethrowException()

bool edm::Worker::shouldRethrowException ( std::exception_ptr  iPtr,
ParentContext const &  parentContext,
bool  isEvent 
) const
private

Definition at line 110 of file Worker.cc.

References writedatasetfile::action, actions_, cms::Exception::category(), description(), edm::exception_actions::FailPath, edm::ExceptionToActionTable::find(), edm::ModuleCallingContext::getTopModuleCallingContext(), edm::exception_actions::IgnoreCompletely, edm::PathContext::isEndPath(), edm::ModuleCallingContext::kInvalid, edm::ParentContext::kPlaceInPath, edm::PlaceInPathContext::pathContext(), edm::ModuleCallingContext::placeInPathContext(), edm::printCmsExceptionWarning(), edm::exception_actions::Rethrow, edm::exception_actions::SkipEvent, edm::ModuleCallingContext::type(), and edm::convertException::wrap().

Referenced by runAcquire(), runAcquireAfterAsyncPrefetch(), runModule(), and runModuleAfterAsyncPrefetch().

110  {
111  // NOTE: the warning printed as a result of ignoring or failing
112  // a module will only be printed during the full true processing
113  // pass of this module
114 
115  // Get the action corresponding to this exception. However, if processing
116  // something other than an event (e.g. run, lumi) always rethrow.
117  if (not isEvent) {
118  return true;
119  }
120  try {
121  convertException::wrap([&]() { std::rethrow_exception(iPtr); });
122  } catch (cms::Exception& ex) {
124 
126  return true;
127  }
128 
129  ModuleCallingContext tempContext(description(), ModuleCallingContext::State::kInvalid, parentContext, nullptr);
130 
131  // If we are processing an endpath and the module was scheduled, treat SkipEvent or FailPath
132  // as IgnoreCompletely, so any subsequent OutputModules are still run.
133  // For unscheduled modules only treat FailPath as IgnoreCompletely but still allow SkipEvent to throw
134  ModuleCallingContext const* top_mcc = tempContext.getTopModuleCallingContext();
135  if (top_mcc->type() == ParentContext::Type::kPlaceInPath &&
136  top_mcc->placeInPathContext()->pathContext()->isEndPath()) {
137  if ((action == exception_actions::SkipEvent && tempContext.type() == ParentContext::Type::kPlaceInPath) ||
140  }
141  }
143  edm::printCmsExceptionWarning("IgnoreCompletely", ex);
144  return false;
145  }
146  }
147  return true;
148  }
std::string const & category() const
Definition: Exception.cc:143
ExceptionToActionTable const * actions_
Definition: Worker.h:596
ModuleDescription const * description() const
Definition: Worker.h:189
exception_actions::ActionCodes find(const std::string &category) const
auto wrap(F iFunc) -> decltype(iFunc())
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)

◆ skipOnPath()

void edm::Worker::skipOnPath ( EventPrincipal const &  iEvent)

Definition at line 337 of file Worker.cc.

References cached_exception_, edm::WaitingTaskList::doneWaiting(), earlyDeleteHelper_, iEvent, numberOfPathsLeftToRun_, and waitingTasks_.

Referenced by edm::WorkerInPath::skipWorker().

337  {
338  if (earlyDeleteHelper_) {
339  earlyDeleteHelper_->pathFinished(iEvent);
340  }
341  if (0 == --numberOfPathsLeftToRun_) {
343  }
344  }
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:592
std::exception_ptr cached_exception_
Definition: Worker.h:597
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
int iEvent
Definition: GenABIO.cc:224
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:603
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:601

◆ state()

State edm::Worker::state ( ) const
inline

Definition at line 238 of file Worker.h.

References state_.

Referenced by edm::WorkerInPath::checkResultsOfRunWorker().

238 { return state_; }
std::atomic< State > state_
Definition: Worker.h:590

◆ timesExcept()

int edm::Worker::timesExcept ( ) const
inline

Definition at line 237 of file Worker.h.

References timesExcept_.

237 { return timesExcept_.load(std::memory_order_acquire); }
std::atomic< int > timesExcept_
Definition: Worker.h:589

◆ timesFailed()

int edm::Worker::timesFailed ( ) const
inline

Definition at line 236 of file Worker.h.

References timesFailed_.

236 { return timesFailed_.load(std::memory_order_acquire); }
std::atomic< int > timesFailed_
Definition: Worker.h:588

◆ timesPass()

int edm::Worker::timesPass ( ) const
inline

Definition at line 240 of file Worker.h.

References timesPassed().

240 { return timesPassed(); } // for backward compatibility only - to be removed soon
int timesPassed() const
Definition: Worker.h:235

◆ timesPassed()

int edm::Worker::timesPassed ( ) const
inline

Definition at line 235 of file Worker.h.

References timesPassed_.

Referenced by timesPass().

235 { return timesPassed_.load(std::memory_order_acquire); }
std::atomic< int > timesPassed_
Definition: Worker.h:587

◆ timesRun()

int edm::Worker::timesRun ( ) const
inline

Definition at line 233 of file Worker.h.

References timesRun_.

233 { return timesRun_.load(std::memory_order_acquire); }
std::atomic< int > timesRun_
Definition: Worker.h:585

◆ timesVisited()

int edm::Worker::timesVisited ( ) const
inline

Definition at line 234 of file Worker.h.

References timesVisited_.

234 { return timesVisited_.load(std::memory_order_acquire); }
std::atomic< int > timesVisited_
Definition: Worker.h:586

◆ updateLookup() [1/2]

virtual void edm::Worker::updateLookup ( BranchType  iBranchType,
ProductResolverIndexHelper const &   
)
pure virtual

Implemented in edm::WorkerT< T >.

◆ updateLookup() [2/2]

virtual void edm::Worker::updateLookup ( eventsetup::ESRecordsToProxyIndices const &  )
pure virtual

Implemented in edm::WorkerT< T >.

◆ waitingTaskList()

edm::WaitingTaskList& edm::Worker::waitingTaskList ( )
inline

Definition at line 245 of file Worker.h.

References waitingTasks_.

245 { return waitingTasks_; }
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:603

◆ wantsGlobalLuminosityBlocks()

virtual bool edm::Worker::wantsGlobalLuminosityBlocks ( ) const
pure virtual

◆ wantsGlobalRuns()

virtual bool edm::Worker::wantsGlobalRuns ( ) const
pure virtual

◆ wantsInputProcessBlocks()

virtual bool edm::Worker::wantsInputProcessBlocks ( ) const
pure virtual

◆ wantsProcessBlocks()

virtual bool edm::Worker::wantsProcessBlocks ( ) const
pure virtual

◆ wantsStreamLuminosityBlocks()

virtual bool edm::Worker::wantsStreamLuminosityBlocks ( ) const
pure virtual

◆ wantsStreamRuns()

virtual bool edm::Worker::wantsStreamRuns ( ) const
pure virtual

◆ workerType()

virtual std::string edm::Worker::workerType ( ) const
protectedpure virtual

Implemented in edm::WorkerT< T >.

Friends And Related Function Documentation

◆ workerhelper::CallImpl

template<typename O >
friend class workerhelper::CallImpl
friend

Definition at line 249 of file Worker.h.

Member Data Documentation

◆ actions_

ExceptionToActionTable const* edm::Worker::actions_
private

Definition at line 596 of file Worker.h.

Referenced by shouldRethrowException().

◆ actReg_

std::shared_ptr<ActivityRegistry> edm::Worker::actReg_
private

◆ cached_exception_

std::exception_ptr edm::Worker::cached_exception_
private

Definition at line 597 of file Worker.h.

Referenced by reset(), runModule(), setException(), and skipOnPath().

◆ earlyDeleteHelper_

edm::propagate_const<EarlyDeleteHelper*> edm::Worker::earlyDeleteHelper_
private

Definition at line 601 of file Worker.h.

Referenced by postDoEvent(), setEarlyDeleteHelper(), and skipOnPath().

◆ moduleCallingContext_

ModuleCallingContext edm::Worker::moduleCallingContext_
private

◆ moduleValid_

bool edm::Worker::moduleValid_ = true
private

Definition at line 606 of file Worker.h.

Referenced by clearModule(), and description().

◆ numberOfPathsLeftToRun_

std::atomic<int> edm::Worker::numberOfPathsLeftToRun_
private

Definition at line 592 of file Worker.h.

Referenced by reset(), and skipOnPath().

◆ numberOfPathsOn_

int edm::Worker::numberOfPathsOn_
private

Definition at line 591 of file Worker.h.

Referenced by addedToPath(), and reset().

◆ ranAcquireWithoutException_

bool edm::Worker::ranAcquireWithoutException_
private

Definition at line 605 of file Worker.h.

Referenced by handleExternalWorkException(), and runAcquireAfterAsyncPrefetch().

◆ state_

std::atomic<State> edm::Worker::state_
private

◆ timesExcept_

std::atomic<int> edm::Worker::timesExcept_
private

Definition at line 589 of file Worker.h.

Referenced by clearCounters(), setException(), and timesExcept().

◆ timesFailed_

std::atomic<int> edm::Worker::timesFailed_
private

Definition at line 588 of file Worker.h.

Referenced by clearCounters(), setFailed(), and timesFailed().

◆ timesPassed_

std::atomic<int> edm::Worker::timesPassed_
private

Definition at line 587 of file Worker.h.

Referenced by clearCounters(), setPassed(), and timesPassed().

◆ timesRun_

std::atomic<int> edm::Worker::timesRun_
private

Definition at line 585 of file Worker.h.

Referenced by clearCounters(), prePrefetchSelectionAsync(), runAcquire(), runModule(), and timesRun().

◆ timesVisited_

std::atomic<int> edm::Worker::timesVisited_
private

Definition at line 586 of file Worker.h.

Referenced by clearCounters(), doWorkAsync(), runModuleDirectly(), and timesVisited().

◆ waitingTasks_

edm::WaitingTaskList edm::Worker::waitingTasks_
private

◆ workStarted_

std::atomic<bool> edm::Worker::workStarted_
private

Definition at line 604 of file Worker.h.

Referenced by doWorkAsync(), doWorkNoPrefetchingAsync(), and reset().