CMS 3D CMS Logo

List of all members | Classes | Public Types | Public Member Functions | Protected Member Functions | Private Member Functions | Private Attributes | Friends
edm::Worker Class Referenceabstract

#include <Worker.h>

Inheritance diagram for edm::Worker:
edm::WorkerT< T >

Classes

class  AcquireTask
 
class  AcquireTask< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin >, DUMMY >
 
class  HandleExternalWorkExceptionTask
 
class  RunModuleTask
 
struct  TaskQueueAdaptor
 

Public Types

enum  ConcurrencyTypes { kGlobal, kLimited, kOne, kStream }
 
enum  State { Ready, Pass, Fail, Exception }
 
enum  Types { kAnalyzer, kFilter, kProducer, kOutputModule }
 

Public Member Functions

void addedToPath ()
 
void beginJob ()
 
void beginStream (StreamID id, StreamContext &streamContext)
 
void callWhenDoneAsync (WaitingTaskHolder task)
 
void clearCounters ()
 
void clearModule ()
 
virtual std::vector< ConsumesInfoconsumesInfo () const =0
 
virtual void convertCurrentProcessAlias (std::string const &processName)=0
 
ModuleDescription const * description () const
 
void doTransformAsync (WaitingTaskHolder, size_t iTransformIndex, EventPrincipal const &, ServiceToken const &, StreamID, ModuleCallingContext const &, StreamContext const *)
 
template<typename T >
void doWorkAsync (WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
 
template<typename T >
void doWorkNoPrefetchingAsync (WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
 
void endJob ()
 
void endStream (StreamID id, StreamContext &streamContext)
 
virtual SerialTaskQueueglobalLuminosityBlocksQueue ()=0
 
virtual SerialTaskQueueglobalRunsQueue ()=0
 
virtual bool hasAccumulator () const =0
 
virtual ConcurrencyTypes moduleConcurrencyType () const =0
 
virtual void modulesWhoseProductsAreConsumed (std::array< std::vector< ModuleDescription const *> *, NumBranchTypes > &modules, std::vector< ModuleProcessName > &modulesInPreviousProcesses, ProductRegistry const &preg, std::map< std::string, ModuleDescription const *> const &labelsToDesc) const =0
 
virtual Types moduleType () const =0
 
Workeroperator= (Worker const &)=delete
 
void postDoEvent (EventPrincipal const &)
 
void prePrefetchSelectionAsync (oneapi::tbb::task_group &, WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *)
 
void prePrefetchSelectionAsync (oneapi::tbb::task_group &, WaitingTask *task, ServiceToken const &, StreamID stream, void const *)
 
void registerThinnedAssociations (ProductRegistry const &registry, ThinnedAssociationsHelper &helper)
 
void reset ()
 
virtual void resolvePutIndicies (BranchType iBranchType, std::unordered_multimap< std::string, std::tuple< TypeID const *, const char *, edm::ProductResolverIndex >> const &iIndicies)=0
 
void respondToCloseInputFile (FileBlock const &fb)
 
void respondToCloseOutputFile ()
 
void respondToOpenInputFile (FileBlock const &fb)
 
template<typename T >
std::exception_ptr runModuleDirectly (typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
 
virtual void selectInputProcessBlocks (ProductRegistry const &, ProcessBlockHelperBase const &)=0
 
void setActivityRegistry (std::shared_ptr< ActivityRegistry > areg)
 
void setEarlyDeleteHelper (EarlyDeleteHelper *iHelper)
 
void skipOnPath (EventPrincipal const &iEvent)
 
State state () const
 
int timesExcept () const
 
int timesFailed () const
 
int timesPass () const
 
int timesPassed () const
 
int timesRun () const
 
int timesVisited () const
 
virtual size_t transformIndex (edm::BranchDescription const &) const =0
 
virtual void updateLookup (BranchType iBranchType, ProductResolverIndexHelper const &)=0
 
virtual void updateLookup (eventsetup::ESRecordsToProductResolverIndices const &)=0
 
edm::WaitingTaskListwaitingTaskList ()
 
virtual bool wantsGlobalLuminosityBlocks () const =0
 
virtual bool wantsGlobalRuns () const =0
 
virtual bool wantsInputProcessBlocks () const =0
 
virtual bool wantsProcessBlocks () const =0
 
virtual bool wantsStreamLuminosityBlocks () const =0
 
virtual bool wantsStreamRuns () const =0
 
 Worker (ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
 
 Worker (Worker const &)=delete
 
virtual ~Worker ()
 

Protected Member Functions

ActivityRegistryactivityRegistry ()
 
virtual void doClearModule ()=0
 
virtual void implBeginJob ()=0
 
virtual void implBeginStream (StreamID)=0
 
virtual bool implDo (EventTransitionInfo const &, ModuleCallingContext const *)=0
 
virtual bool implDoAccessInputProcessBlock (ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
 
virtual void implDoAcquire (EventTransitionInfo const &, ModuleCallingContext const *, WaitingTaskWithArenaHolder &)=0
 
virtual bool implDoBegin (RunTransitionInfo const &, ModuleCallingContext const *)=0
 
virtual bool implDoBegin (LumiTransitionInfo const &, ModuleCallingContext const *)=0
 
virtual bool implDoBeginProcessBlock (ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
 
virtual bool implDoEnd (RunTransitionInfo const &, ModuleCallingContext const *)=0
 
virtual bool implDoEnd (LumiTransitionInfo const &, ModuleCallingContext const *)=0
 
virtual bool implDoEndProcessBlock (ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
 
virtual bool implDoPrePrefetchSelection (StreamID, EventPrincipal const &, ModuleCallingContext const *)=0
 
virtual bool implDoStreamBegin (StreamID, RunTransitionInfo const &, ModuleCallingContext const *)=0
 
virtual bool implDoStreamBegin (StreamID, LumiTransitionInfo const &, ModuleCallingContext const *)=0
 
virtual bool implDoStreamEnd (StreamID, RunTransitionInfo const &, ModuleCallingContext const *)=0
 
virtual bool implDoStreamEnd (StreamID, LumiTransitionInfo const &, ModuleCallingContext const *)=0
 
virtual void implDoTransformAsync (WaitingTaskHolder, size_t iTransformIndex, EventPrincipal const &, ParentContext const &, ServiceWeakToken const &)=0
 
virtual void implEndJob ()=0
 
virtual void implEndStream (StreamID)=0
 
virtual bool implNeedToRunSelection () const =0
 
virtual void itemsToGetForSelection (std::vector< ProductResolverIndexAndSkipBit > &) const =0
 
virtual ProductResolverIndex itemToGetForTransform (size_t iTransformIndex) const =0
 
void resetModuleDescription (ModuleDescription const *)
 
virtual std::string workerType () const =0
 

Private Member Functions

void checkForShouldTryToContinue (ModuleDescription const &)
 
void edPrefetchAsync (WaitingTaskHolder, ServiceToken const &, Principal const &) const
 
void emitPostModuleEventPrefetchingSignal ()
 
void emitPostModuleGlobalPrefetchingSignal ()
 
void emitPostModuleStreamPrefetchingSignal ()
 
virtual std::vector< ESResolverIndex > const & esItemsToGetFrom (Transition) const =0
 
void esPrefetchAsync (WaitingTaskHolder, EventSetupImpl const &, Transition, ServiceToken const &)
 
virtual std::vector< ESRecordIndex > const & esRecordsToGetFrom (Transition) const =0
 
std::exception_ptr handleExternalWorkException (std::exception_ptr iEPtr, ParentContext const &parentContext)
 
virtual bool hasAcquire () const =0
 
virtual void implRegisterThinnedAssociations (ProductRegistry const &, ThinnedAssociationsHelper &)=0
 
virtual void implRespondToCloseInputFile (FileBlock const &fb)=0
 
virtual void implRespondToCloseOutputFile ()=0
 
virtual void implRespondToOpenInputFile (FileBlock const &fb)=0
 
virtual void itemsMayGet (BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
 
virtual void itemsToGet (BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
 
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom (BranchType) const =0
 
bool needsESPrefetching (Transition iTrans) const noexcept
 
virtual void preActionBeforeRunEventAsync (WaitingTaskHolder iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const =0
 
template<typename T >
void prefetchAsync (WaitingTaskHolder, ServiceToken const &, ParentContext const &, typename T::TransitionInfoType const &, Transition)
 
void runAcquire (EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder &)
 
void runAcquireAfterAsyncPrefetch (std::exception_ptr, EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder)
 
template<typename T >
bool runModule (typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
 
template<typename T >
std::exception_ptr runModuleAfterAsyncPrefetch (std::exception_ptr, typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
 
virtual TaskQueueAdaptor serializeRunModule ()=0
 
template<bool IS_EVENT>
std::exception_ptr setException (std::exception_ptr iException)
 
template<bool IS_EVENT>
bool setFailed ()
 
template<bool IS_EVENT>
bool setPassed ()
 
bool shouldRethrowException (std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, bool isTryToContinue) const
 

Private Attributes

ExceptionToActionTable const * actions_
 
std::shared_ptr< ActivityRegistryactReg_
 
bool beginSucceeded_ = false
 
std::exception_ptr cached_exception_
 
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
 
ModuleCallingContext moduleCallingContext_
 
bool moduleValid_ = true
 
std::atomic< int > numberOfPathsLeftToRun_
 
int numberOfPathsOn_
 
bool ranAcquireWithoutException_
 
bool shouldTryToContinue_ = false
 
std::atomic< Statestate_
 
std::atomic< int > timesExcept_
 
std::atomic< int > timesFailed_
 
std::atomic< int > timesPassed_
 
std::atomic< int > timesRun_
 
std::atomic< int > timesVisited_
 
edm::WaitingTaskList waitingTasks_
 
std::atomic< bool > workStarted_
 

Friends

template<typename O >
class workerhelper::CallImpl
 

Detailed Description

Definition at line 92 of file Worker.h.

Member Enumeration Documentation

◆ ConcurrencyTypes

Enumerator
kGlobal 
kLimited 
kOne 
kStream 

Definition at line 96 of file Worker.h.

◆ State

Enumerator
Ready 
Pass 
Fail 
Exception 

Definition at line 94 of file Worker.h.

◆ Types

Enumerator
kAnalyzer 
kFilter 
kProducer 
kOutputModule 

Definition at line 95 of file Worker.h.

Constructor & Destructor Documentation

◆ Worker() [1/2]

edm::Worker::Worker ( ModuleDescription const &  iMD,
ExceptionToActionTable const *  iActions 
)

Definition at line 90 of file Worker.cc.

References checkForShouldTryToContinue().

91  : timesRun_(0),
92  timesVisited_(0),
93  timesPassed_(0),
94  timesFailed_(0),
95  timesExcept_(0),
96  state_(Ready),
100  actions_(iActions),
102  actReg_(),
103  earlyDeleteHelper_(nullptr),
104  workStarted_(false),
107  }
std::atomic< int > timesVisited_
Definition: Worker.h:605
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:611
std::atomic< int > timesExcept_
Definition: Worker.h:608
std::atomic< bool > workStarted_
Definition: Worker.h:623
std::atomic< int > timesFailed_
Definition: Worker.h:607
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:618
ExceptionToActionTable const * actions_
Definition: Worker.h:615
bool ranAcquireWithoutException_
Definition: Worker.h:624
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:613
std::exception_ptr cached_exception_
Definition: Worker.h:616
std::atomic< State > state_
Definition: Worker.h:609
int numberOfPathsOn_
Definition: Worker.h:610
void checkForShouldTryToContinue(ModuleDescription const &)
Definition: Worker.cc:113
std::atomic< int > timesPassed_
Definition: Worker.h:606
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:620
std::atomic< int > timesRun_
Definition: Worker.h:604

◆ ~Worker()

edm::Worker::~Worker ( )
virtual

Definition at line 109 of file Worker.cc.

109 {}

◆ Worker() [2/2]

edm::Worker::Worker ( Worker const &  )
delete

Member Function Documentation

◆ activityRegistry()

ActivityRegistry* edm::Worker::activityRegistry ( )
inlineprotected

Definition at line 298 of file Worker.h.

References actReg_.

298 { return actReg_.get(); }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:618

◆ addedToPath()

void edm::Worker::addedToPath ( )
inline

Definition at line 240 of file Worker.h.

References numberOfPathsOn_.

240 { ++numberOfPathsOn_; }
int numberOfPathsOn_
Definition: Worker.h:610

◆ beginJob()

void edm::Worker::beginJob ( void  )

Definition at line 283 of file Worker.cc.

References actReg_, cms::Exception::addContext(), description(), Exception, implBeginJob(), edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), state_, and edm::convertException::wrap().

Referenced by edm::WorkerManager::beginJob().

283  {
284  try {
285  convertException::wrap([&]() {
286  ModuleBeginJobSignalSentry cpp(actReg_.get(), *description());
287  implBeginJob();
288  });
289  } catch (cms::Exception& ex) {
290  state_ = Exception;
291  std::ostringstream ost;
292  ost << "Calling beginJob for module " << description()->moduleName() << "/'" << description()->moduleLabel()
293  << "'";
294  ex.addContext(ost.str());
295  throw;
296  }
297  }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:618
std::string const & moduleName() const
std::atomic< State > state_
Definition: Worker.h:609
ModuleDescription const * description() const
Definition: Worker.h:198
virtual void implBeginJob()=0
void addContext(std::string const &context)
Definition: Exception.cc:169
auto wrap(F iFunc) -> decltype(iFunc())
std::string const & moduleLabel() const

◆ beginStream()

void edm::Worker::beginStream ( StreamID  id,
StreamContext streamContext 
)

Definition at line 316 of file Worker.cc.

References actReg_, cms::Exception::addContext(), description(), Exception, implBeginStream(), edm::LuminosityBlockIndex::invalidLuminosityBlockIndex(), edm::RunIndex::invalidRunIndex(), edm::StreamContext::kBeginStream, edm::ModuleCallingContext::kRunning, moduleCallingContext_, edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), edm::StreamContext::setEventID(), edm::StreamContext::setLuminosityBlockIndex(), edm::StreamContext::setRunIndex(), edm::ModuleCallingContext::setState(), edm::StreamContext::setTimestamp(), edm::StreamContext::setTransition(), state_, and edm::convertException::wrap().

316  {
317  try {
318  convertException::wrap([&]() {
319  streamContext.setTransition(StreamContext::Transition::kBeginStream);
320  streamContext.setEventID(EventID(0, 0, 0));
321  streamContext.setRunIndex(RunIndex::invalidRunIndex());
322  streamContext.setLuminosityBlockIndex(LuminosityBlockIndex::invalidLuminosityBlockIndex());
323  streamContext.setTimestamp(Timestamp());
324  ParentContext parentContext(&streamContext);
325  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
327  ModuleBeginStreamSignalSentry beginSentry(actReg_.get(), streamContext, moduleCallingContext_);
328  implBeginStream(id);
329  });
330  } catch (cms::Exception& ex) {
331  state_ = Exception;
332  std::ostringstream ost;
333  ost << "Calling beginStream for module " << description()->moduleName() << "/'" << description()->moduleLabel()
334  << "'";
335  ex.addContext(ost.str());
336  throw;
337  }
338  }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:618
std::string const & moduleName() const
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:613
static RunIndex invalidRunIndex()
Definition: RunIndex.cc:9
std::atomic< State > state_
Definition: Worker.h:609
static LuminosityBlockIndex invalidLuminosityBlockIndex()
ModuleDescription const * description() const
Definition: Worker.h:198
void addContext(std::string const &context)
Definition: Exception.cc:169
virtual void implBeginStream(StreamID)=0
auto wrap(F iFunc) -> decltype(iFunc())
std::string const & moduleLabel() const

◆ callWhenDoneAsync()

void edm::Worker::callWhenDoneAsync ( WaitingTaskHolder  task)
inline

Definition at line 177 of file Worker.h.

References edm::WaitingTaskList::add(), eostools::move(), TrackValidation_cff::task, and waitingTasks_.

Referenced by edm::SwitchProducerProductResolver::prefetchAsync_().

edm::WaitingTaskList waitingTasks_
Definition: Worker.h:622
void add(oneapi::tbb::task_group *, WaitingTask *)
Adds task to the waiting list.
def move(src, dest)
Definition: eostools.py:511

◆ checkForShouldTryToContinue()

void edm::Worker::checkForShouldTryToContinue ( ModuleDescription const &  iDesc)
private

Definition at line 113 of file Worker.cc.

References edm::pset::Registry::getMapped(), edm::pset::Registry::instance(), edm::ModuleDescription::parameterSetID(), muonDTDigis_cfi::pset, and shouldTryToContinue_.

Referenced by resetModuleDescription(), and Worker().

113  {
114  auto pset = edm::pset::Registry::instance()->getMapped(iDesc.parameterSetID());
115  if (pset and pset->exists("@shouldTryToContinue")) {
116  shouldTryToContinue_ = true;
117  }
118  }
bool getMapped(key_type const &k, value_type &result) const
Definition: Registry.cc:17
static Registry * instance()
Definition: Registry.cc:12
bool shouldTryToContinue_
Definition: Worker.h:626

◆ clearCounters()

void edm::Worker::clearCounters ( )
inline

Definition at line 232 of file Worker.h.

References timesExcept_, timesFailed_, timesPassed_, timesRun_, and timesVisited_.

Referenced by edm::StreamSchedule::clearCounters().

232  {
233  timesRun_.store(0, std::memory_order_release);
234  timesVisited_.store(0, std::memory_order_release);
235  timesPassed_.store(0, std::memory_order_release);
236  timesFailed_.store(0, std::memory_order_release);
237  timesExcept_.store(0, std::memory_order_release);
238  }
std::atomic< int > timesVisited_
Definition: Worker.h:605
std::atomic< int > timesExcept_
Definition: Worker.h:608
std::atomic< int > timesFailed_
Definition: Worker.h:607
std::atomic< int > timesPassed_
Definition: Worker.h:606
std::atomic< int > timesRun_
Definition: Worker.h:604

◆ clearModule()

void edm::Worker::clearModule ( )
inline

Definition at line 123 of file Worker.h.

References doClearModule(), and moduleValid_.

123  {
124  moduleValid_ = false;
125  doClearModule();
126  }
bool moduleValid_
Definition: Worker.h:625
virtual void doClearModule()=0

◆ consumesInfo()

virtual std::vector<ConsumesInfo> edm::Worker::consumesInfo ( ) const
pure virtual

◆ convertCurrentProcessAlias()

virtual void edm::Worker::convertCurrentProcessAlias ( std::string const &  processName)
pure virtual

Implemented in edm::WorkerT< T >.

◆ description()

ModuleDescription const* edm::Worker::description ( ) const
inline

◆ doClearModule()

virtual void edm::Worker::doClearModule ( )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by clearModule().

◆ doTransformAsync()

void edm::Worker::doTransformAsync ( WaitingTaskHolder  iTask,
size_t  iTransformIndex,
EventPrincipal const &  iPrincipal,
ServiceToken const &  iToken,
StreamID  ,
ModuleCallingContext const &  mcc,
StreamContext const *   
)

Definition at line 245 of file Worker.cc.

References actReg_, edm::WaitingTaskHolder::doneWaiting(), edm::ModuleCallingContext::getStreamContext(), edm::WaitingTaskHolder::group(), implDoTransformAsync(), itemToGetForTransform(), edm::make_waiting_task(), edm::ModuleCallingContext::parent(), edm::Principal::prefetchAsync(), and TrackValidation_cff::task.

Referenced by edm::TransformingProductResolver::prefetchAsync_().

251  {
252  ServiceWeakToken weakToken = iToken;
253 
254  //Need to make the services available early so other services can see them
255  auto task = make_waiting_task(
256  [this, iTask, weakToken, &iPrincipal, iTransformIndex, mcc](std::exception_ptr const* iExcept) mutable {
257  //post prefetch signal
258  actReg_->postModuleTransformPrefetchingSignal_.emit(*mcc.getStreamContext(), mcc);
259  if (iExcept) {
260  iTask.doneWaiting(*iExcept);
261  return;
262  }
263  implDoTransformAsync(iTask, iTransformIndex, iPrincipal, mcc.parent(), weakToken);
264  });
265 
266  //pre prefetch signal
267  actReg_->preModuleTransformPrefetchingSignal_.emit(*mcc.getStreamContext(), mcc);
268  iPrincipal.prefetchAsync(
269  WaitingTaskHolder(*iTask.group(), task), itemToGetForTransform(iTransformIndex), false, iToken, &mcc);
270  }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:618
virtual void implDoTransformAsync(WaitingTaskHolder, size_t iTransformIndex, EventPrincipal const &, ParentContext const &, ServiceWeakToken const &)=0
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
virtual ProductResolverIndex itemToGetForTransform(size_t iTransformIndex) const =0

◆ doWorkAsync()

template<typename T >
void edm::Worker::doWorkAsync ( WaitingTaskHolder  task,
typename T::TransitionInfoType const &  transitionInfo,
ServiceToken const &  token,
StreamID  streamID,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

Definition at line 1037 of file Worker.h.

References edm::WaitingTaskList::add(), ALPAKA_ACCELERATOR_NAMESPACE::brokenline::constexpr(), visDQMUpload::context, watchdog::group, hasAcquire(), info(), edm::ModuleCallingContext::kPrefetching, edm::make_waiting_task(), moduleCallingContext_, eostools::move(), AlCaHLTBitMon_ParallelJobs::p, prePrefetchSelectionAsync(), fetchall_from_DQM_v2::release, alignCSCRings::s, edm::ModuleCallingContext::setContext(), submitPVValidationJobs::t, TrackValidation_cff::task, timesVisited_, unpackBuffers-CaloStage2::token, waitingTasks_, and workStarted_.

Referenced by edm::UnscheduledProductResolver::prefetchAsync_(), and edm::WorkerInPath::runWorkerAsync().

1042  {
1043  if (not workerhelper::CallImpl<T>::wantsTransition(this)) {
1044  return;
1045  }
1046 
1047  //Need to check workStarted_ before adding to waitingTasks_
1048  bool expected = false;
1049  bool workStarted = workStarted_.compare_exchange_strong(expected, true);
1050 
1052  if constexpr (T::isEvent_) {
1053  timesVisited_.fetch_add(1, std::memory_order_relaxed);
1054  }
1055 
1056  if (workStarted) {
1058 
1059  //if have TriggerResults based selection we want to reject the event before doing prefetching
1060  if (workerhelper::CallImpl<T>::needToRunSelection(this)) {
1061  //We need to run the selection in a different task so that
1062  // we can prefetch the data needed for the selection
1063  WaitingTask* moduleTask =
1064  new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
1065 
1066  //make sure the task is either run or destroyed
1067  struct DestroyTask {
1068  DestroyTask(edm::WaitingTask* iTask) : m_task(iTask) {}
1069 
1070  ~DestroyTask() {
1071  auto p = m_task.exchange(nullptr);
1072  if (p) {
1073  TaskSentry s{p};
1074  }
1075  }
1076 
1077  edm::WaitingTask* release() { return m_task.exchange(nullptr); }
1078 
1079  private:
1080  std::atomic<edm::WaitingTask*> m_task;
1081  };
1082  if constexpr (T::isEvent_) {
1083  if (hasAcquire()) {
1084  auto ownRunTask = std::make_shared<DestroyTask>(moduleTask);
1085  ServiceWeakToken weakToken = token;
1086  auto* group = task.group();
1087  moduleTask = make_waiting_task(
1088  [this, weakToken, transitionInfo, parentContext, ownRunTask, group](std::exception_ptr const* iExcept) {
1089  WaitingTaskWithArenaHolder runTaskHolder(
1090  *group, new HandleExternalWorkExceptionTask(this, group, ownRunTask->release(), parentContext));
1091  AcquireTask<T> t(this, transitionInfo, weakToken.lock(), parentContext, runTaskHolder);
1092  t.execute();
1093  });
1094  }
1095  }
1096  auto* group = task.group();
1097  auto ownModuleTask = std::make_shared<DestroyTask>(moduleTask);
1098  ServiceWeakToken weakToken = token;
1099  auto selectionTask =
1100  make_waiting_task([ownModuleTask, parentContext, info = transitionInfo, weakToken, group, this](
1101  std::exception_ptr const*) mutable {
1102  ServiceRegistry::Operate guard(weakToken.lock());
1103  prefetchAsync<T>(WaitingTaskHolder(*group, ownModuleTask->release()),
1104  weakToken.lock(),
1105  parentContext,
1106  info,
1107  T::transition_);
1108  });
1109  prePrefetchSelectionAsync(*group, selectionTask, token, streamID, &transitionInfo.principal());
1110  } else {
1111  WaitingTask* moduleTask =
1112  new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
1113  auto group = task.group();
1114  if constexpr (T::isEvent_) {
1115  if (hasAcquire()) {
1116  WaitingTaskWithArenaHolder runTaskHolder(
1117  *group, new HandleExternalWorkExceptionTask(this, group, moduleTask, parentContext));
1118  moduleTask = new AcquireTask<T>(this, transitionInfo, token, parentContext, std::move(runTaskHolder));
1119  }
1120  }
1121  prefetchAsync<T>(WaitingTaskHolder(*group, moduleTask), token, parentContext, transitionInfo, T::transition_);
1122  }
1123  }
1124  }
virtual bool hasAcquire() const =0
std::atomic< int > timesVisited_
Definition: Worker.h:605
static const TGPicture * info(bool iBackgroundIsBlack)
std::atomic< bool > workStarted_
Definition: Worker.h:623
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:613
void prePrefetchSelectionAsync(oneapi::tbb::task_group &, WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *)
Definition: Worker.cc:155
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:622
void add(oneapi::tbb::task_group *, WaitingTask *)
Adds task to the waiting list.
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
def move(src, dest)
Definition: eostools.py:511

◆ doWorkNoPrefetchingAsync()

template<typename T >
void edm::Worker::doWorkNoPrefetchingAsync ( WaitingTaskHolder  task,
typename T::TransitionInfoType const &  transitionInfo,
ServiceToken const &  serviceToken,
StreamID  streamID,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

Definition at line 1159 of file Worker.h.

References edm::WaitingTaskList::add(), CMS_SA_ALLOW, visDQMUpload::context, edm::WaitingTaskList::doneWaiting(), esPrefetchAsync(), watchdog::group, info(), edm::ModuleCallingContext::kPrefetching, edm::ServiceWeakToken::lock(), edm::make_waiting_task(), moduleCallingContext_, eostools::move(), needsESPrefetching(), createBeamHaloJobs::queue, serializeRunModule(), edm::ModuleCallingContext::setContext(), TrackValidation_cff::task, waitingTasks_, and workStarted_.

Referenced by edm::WorkerInPath::runWorkerAsync().

1164  {
1165  if (not workerhelper::CallImpl<T>::wantsTransition(this)) {
1166  return;
1167  }
1168 
1169  //Need to check workStarted_ before adding to waitingTasks_
1170  bool expected = false;
1171  auto workStarted = workStarted_.compare_exchange_strong(expected, true);
1172 
1174  if (workStarted) {
1175  ServiceWeakToken weakToken = serviceToken;
1176  auto toDo = [this, info = transitionInfo, streamID, parentContext, context, weakToken]() {
1177  std::exception_ptr exceptionPtr;
1178  // Caught exception is propagated via WaitingTaskList
1179  CMS_SA_ALLOW try {
1180  //Need to make the services available
1181  ServiceRegistry::Operate guard(weakToken.lock());
1182 
1183  this->runModule<T>(info, streamID, parentContext, context);
1184  } catch (...) {
1185  exceptionPtr = std::current_exception();
1186  }
1187  this->waitingTasks_.doneWaiting(exceptionPtr);
1188  };
1189 
1190  if (needsESPrefetching(T::transition_)) {
1191  auto group = task.group();
1192  auto afterPrefetch =
1193  edm::make_waiting_task([toDo = std::move(toDo), group, this](std::exception_ptr const* iExcept) {
1194  if (iExcept) {
1195  this->waitingTasks_.doneWaiting(*iExcept);
1196  } else {
1197  if (auto queue = this->serializeRunModule()) {
1198  queue.push(*group, toDo);
1199  } else {
1200  group->run(toDo);
1201  }
1202  }
1203  });
1206  WaitingTaskHolder(*group, afterPrefetch), transitionInfo.eventSetupImpl(), T::transition_, serviceToken);
1207  } else {
1208  auto group = task.group();
1209  if (auto queue = this->serializeRunModule()) {
1210  queue.push(*group, toDo);
1211  } else {
1212  group->run(toDo);
1213  }
1214  }
1215  }
1216  }
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
std::atomic< bool > workStarted_
Definition: Worker.h:623
bool needsESPrefetching(Transition iTrans) const noexcept
Definition: Worker.h:368
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:613
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:622
void add(oneapi::tbb::task_group *, WaitingTask *)
Adds task to the waiting list.
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
void esPrefetchAsync(WaitingTaskHolder, EventSetupImpl const &, Transition, ServiceToken const &)
Definition: Worker.cc:204
virtual TaskQueueAdaptor serializeRunModule()=0
def move(src, dest)
Definition: eostools.py:511

◆ edPrefetchAsync()

void edm::Worker::edPrefetchAsync ( WaitingTaskHolder  iTask,
ServiceToken const &  token,
Principal const &  iPrincipal 
) const
private

Definition at line 229 of file Worker.cc.

References edm::Principal::branchType(), B2GTnPMonitor_cfi::item, mps_monitormerge::items, itemsToGetFrom(), moduleCallingContext_, edm::Principal::prefetchAsync(), edm::ProductResolverIndexAmbiguous, and unpackBuffers-CaloStage2::token.

Referenced by prefetchAsync().

229  {
230  // Prefetch products the module declares it consumes
231  std::vector<ProductResolverIndexAndSkipBit> const& items = itemsToGetFrom(iPrincipal.branchType());
232 
233  for (auto const& item : items) {
234  ProductResolverIndex productResolverIndex = item.productResolverIndex();
235  bool skipCurrentProcess = item.skipCurrentProcess();
236  if (productResolverIndex != ProductResolverIndexAmbiguous) {
237  iPrincipal.prefetchAsync(iTask, productResolverIndex, skipCurrentProcess, token, &moduleCallingContext_);
238  }
239  }
240  }
unsigned int ProductResolverIndex
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom(BranchType) const =0
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:613

◆ emitPostModuleEventPrefetchingSignal()

void edm::Worker::emitPostModuleEventPrefetchingSignal ( )
inlineprivate

Definition at line 372 of file Worker.h.

References actReg_, edm::ModuleCallingContext::getStreamContext(), and moduleCallingContext_.

Referenced by edm::Worker::RunModuleTask< T >::execute().

372  {
373  actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
374  }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:618
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:613
StreamContext const * getStreamContext() const

◆ emitPostModuleGlobalPrefetchingSignal()

void edm::Worker::emitPostModuleGlobalPrefetchingSignal ( )
inlineprivate

Definition at line 381 of file Worker.h.

References actReg_, edm::ModuleCallingContext::getGlobalContext(), and moduleCallingContext_.

Referenced by edm::Worker::RunModuleTask< T >::execute().

381  {
382  actReg_->postModuleGlobalPrefetchingSignal_.emit(*moduleCallingContext_.getGlobalContext(),
384  }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:618
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:613
GlobalContext const * getGlobalContext() const

◆ emitPostModuleStreamPrefetchingSignal()

void edm::Worker::emitPostModuleStreamPrefetchingSignal ( )
inlineprivate

Definition at line 376 of file Worker.h.

References actReg_, edm::ModuleCallingContext::getStreamContext(), and moduleCallingContext_.

Referenced by edm::Worker::RunModuleTask< T >::execute().

376  {
377  actReg_->postModuleStreamPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),
379  }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:618
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:613
StreamContext const * getStreamContext() const

◆ endJob()

void edm::Worker::endJob ( void  )

Definition at line 299 of file Worker.cc.

References actReg_, cms::Exception::addContext(), cms::cuda::assert(), submitPVResolutionJobs::desc, description(), Exception, implEndJob(), edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), state_, and edm::convertException::wrap().

299  {
300  try {
301  convertException::wrap([&]() {
302  ModuleDescription const* desc = description();
303  assert(desc != nullptr);
304  ModuleEndJobSignalSentry cpp(actReg_.get(), *desc);
305  implEndJob();
306  });
307  } catch (cms::Exception& ex) {
308  state_ = Exception;
309  std::ostringstream ost;
310  ost << "Calling endJob for module " << description()->moduleName() << "/'" << description()->moduleLabel() << "'";
311  ex.addContext(ost.str());
312  throw;
313  }
314  }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:618
std::string const & moduleName() const
assert(be >=bs)
std::atomic< State > state_
Definition: Worker.h:609
virtual void implEndJob()=0
ModuleDescription const * description() const
Definition: Worker.h:198
void addContext(std::string const &context)
Definition: Exception.cc:169
auto wrap(F iFunc) -> decltype(iFunc())
std::string const & moduleLabel() const

◆ endStream()

void edm::Worker::endStream ( StreamID  id,
StreamContext streamContext 
)

Definition at line 340 of file Worker.cc.

References actReg_, cms::Exception::addContext(), description(), Exception, implEndStream(), edm::LuminosityBlockIndex::invalidLuminosityBlockIndex(), edm::RunIndex::invalidRunIndex(), edm::StreamContext::kEndStream, edm::ModuleCallingContext::kRunning, moduleCallingContext_, edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), edm::StreamContext::setEventID(), edm::StreamContext::setLuminosityBlockIndex(), edm::StreamContext::setRunIndex(), edm::ModuleCallingContext::setState(), edm::StreamContext::setTimestamp(), edm::StreamContext::setTransition(), state_, and edm::convertException::wrap().

340  {
341  try {
342  convertException::wrap([&]() {
343  streamContext.setTransition(StreamContext::Transition::kEndStream);
344  streamContext.setEventID(EventID(0, 0, 0));
345  streamContext.setRunIndex(RunIndex::invalidRunIndex());
346  streamContext.setLuminosityBlockIndex(LuminosityBlockIndex::invalidLuminosityBlockIndex());
347  streamContext.setTimestamp(Timestamp());
348  ParentContext parentContext(&streamContext);
349  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
351  ModuleEndStreamSignalSentry endSentry(actReg_.get(), streamContext, moduleCallingContext_);
352  implEndStream(id);
353  });
354  } catch (cms::Exception& ex) {
355  state_ = Exception;
356  std::ostringstream ost;
357  ost << "Calling endStream for module " << description()->moduleName() << "/'" << description()->moduleLabel()
358  << "'";
359  ex.addContext(ost.str());
360  throw;
361  }
362  }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:618
std::string const & moduleName() const
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:613
static RunIndex invalidRunIndex()
Definition: RunIndex.cc:9
std::atomic< State > state_
Definition: Worker.h:609
static LuminosityBlockIndex invalidLuminosityBlockIndex()
ModuleDescription const * description() const
Definition: Worker.h:198
void addContext(std::string const &context)
Definition: Exception.cc:169
virtual void implEndStream(StreamID)=0
auto wrap(F iFunc) -> decltype(iFunc())
std::string const & moduleLabel() const

◆ esItemsToGetFrom()

virtual std::vector<ESResolverIndex> const& edm::Worker::esItemsToGetFrom ( Transition  ) const
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by esPrefetchAsync(), and needsESPrefetching().

◆ esPrefetchAsync()

void edm::Worker::esPrefetchAsync ( WaitingTaskHolder  iTask,
EventSetupImpl const &  iImpl,
Transition  iTrans,
ServiceToken const &  iToken 
)
private

Definition at line 204 of file Worker.cc.

References cms::cuda::assert(), esItemsToGetFrom(), esRecordsToGetFrom(), edm::EventSetupImpl::findImpl(), mps_fire::i, mps_monitormerge::items, moduleCallingContext_, edm::NumberOfEventSetupTransitions, and edm::eventsetup::EventSetupRecordImpl::prefetchAsync().

Referenced by doWorkNoPrefetchingAsync(), edm::workerhelper::CallImpl< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > >::esPrefetchAsync(), edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > >::esPrefetchAsync(), edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > >::esPrefetchAsync(), edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > >::esPrefetchAsync(), edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > >::esPrefetchAsync(), edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin > >::esPrefetchAsync(), edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin > >::esPrefetchAsync(), edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd > >::esPrefetchAsync(), and edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd > >::esPrefetchAsync().

207  {
209  return;
210  }
211  auto const& recs = esRecordsToGetFrom(iTrans);
212  auto const& items = esItemsToGetFrom(iTrans);
213 
214  assert(items.size() == recs.size());
215  if (items.empty()) {
216  return;
217  }
218 
219  for (size_t i = 0; i != items.size(); ++i) {
220  if (recs[i] != ESRecordIndex{}) {
221  auto rec = iImpl.findImpl(recs[i]);
222  if (rec) {
223  rec->prefetchAsync(iTask, items[i], &iImpl, iToken, ESParentContext(&moduleCallingContext_));
224  }
225  }
226  }
227  }
assert(be >=bs)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:613
virtual std::vector< ESRecordIndex > const & esRecordsToGetFrom(Transition) const =0
virtual std::vector< ESResolverIndex > const & esItemsToGetFrom(Transition) const =0

◆ esRecordsToGetFrom()

virtual std::vector<ESRecordIndex> const& edm::Worker::esRecordsToGetFrom ( Transition  ) const
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by esPrefetchAsync().

◆ globalLuminosityBlocksQueue()

virtual SerialTaskQueue* edm::Worker::globalLuminosityBlocksQueue ( )
pure virtual

◆ globalRunsQueue()

virtual SerialTaskQueue* edm::Worker::globalRunsQueue ( )
pure virtual

◆ handleExternalWorkException()

std::exception_ptr edm::Worker::handleExternalWorkException ( std::exception_ptr  iEPtr,
ParentContext const &  parentContext 
)
private

Definition at line 427 of file Worker.cc.

References edm::exceptionContext(), moduleCallingContext_, ranAcquireWithoutException_, and edm::convertException::wrap().

427  {
429  try {
430  convertException::wrap([iEPtr]() { std::rethrow_exception(iEPtr); });
431  } catch (cms::Exception& ex) {
432  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
434  return std::current_exception();
435  }
436  }
437  return iEPtr;
438  }
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
bool ranAcquireWithoutException_
Definition: Worker.h:624
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:613
auto wrap(F iFunc) -> decltype(iFunc())

◆ hasAccumulator()

virtual bool edm::Worker::hasAccumulator ( ) const
pure virtual

◆ hasAcquire()

virtual bool edm::Worker::hasAcquire ( ) const
privatepure virtual

◆ implBeginJob()

virtual void edm::Worker::implBeginJob ( )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by beginJob().

◆ implBeginStream()

virtual void edm::Worker::implBeginStream ( StreamID  )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by beginStream().

◆ implDo()

virtual bool edm::Worker::implDo ( EventTransitionInfo const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

◆ implDoAccessInputProcessBlock()

virtual bool edm::Worker::implDoAccessInputProcessBlock ( ProcessBlockPrincipal const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

◆ implDoAcquire()

virtual void edm::Worker::implDoAcquire ( EventTransitionInfo const &  ,
ModuleCallingContext const *  ,
WaitingTaskWithArenaHolder  
)
protectedpure virtual

◆ implDoBegin() [1/2]

virtual bool edm::Worker::implDoBegin ( RunTransitionInfo const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

◆ implDoBegin() [2/2]

virtual bool edm::Worker::implDoBegin ( LumiTransitionInfo const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

◆ implDoBeginProcessBlock()

virtual bool edm::Worker::implDoBeginProcessBlock ( ProcessBlockPrincipal const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

◆ implDoEnd() [1/2]

virtual bool edm::Worker::implDoEnd ( RunTransitionInfo const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

◆ implDoEnd() [2/2]

virtual bool edm::Worker::implDoEnd ( LumiTransitionInfo const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

◆ implDoEndProcessBlock()

virtual bool edm::Worker::implDoEndProcessBlock ( ProcessBlockPrincipal const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

◆ implDoPrePrefetchSelection()

virtual bool edm::Worker::implDoPrePrefetchSelection ( StreamID  ,
EventPrincipal const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

◆ implDoStreamBegin() [1/2]

virtual bool edm::Worker::implDoStreamBegin ( StreamID  ,
RunTransitionInfo const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

◆ implDoStreamBegin() [2/2]

virtual bool edm::Worker::implDoStreamBegin ( StreamID  ,
LumiTransitionInfo const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

◆ implDoStreamEnd() [1/2]

virtual bool edm::Worker::implDoStreamEnd ( StreamID  ,
RunTransitionInfo const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

◆ implDoStreamEnd() [2/2]

virtual bool edm::Worker::implDoStreamEnd ( StreamID  ,
LumiTransitionInfo const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

◆ implDoTransformAsync()

virtual void edm::Worker::implDoTransformAsync ( WaitingTaskHolder  ,
size_t  iTransformIndex,
EventPrincipal const &  ,
ParentContext const &  ,
ServiceWeakToken const &   
)
protectedpure virtual

◆ implEndJob()

virtual void edm::Worker::implEndJob ( )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by endJob().

◆ implEndStream()

virtual void edm::Worker::implEndStream ( StreamID  )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by endStream().

◆ implNeedToRunSelection()

virtual bool edm::Worker::implNeedToRunSelection ( ) const
protectedpure virtual

◆ implRegisterThinnedAssociations()

virtual void edm::Worker::implRegisterThinnedAssociations ( ProductRegistry const &  ,
ThinnedAssociationsHelper  
)
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by registerThinnedAssociations().

◆ implRespondToCloseInputFile()

virtual void edm::Worker::implRespondToCloseInputFile ( FileBlock const &  fb)
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by respondToCloseInputFile().

◆ implRespondToCloseOutputFile()

virtual void edm::Worker::implRespondToCloseOutputFile ( )
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by respondToCloseOutputFile().

◆ implRespondToOpenInputFile()

virtual void edm::Worker::implRespondToOpenInputFile ( FileBlock const &  fb)
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by respondToOpenInputFile().

◆ itemsMayGet()

virtual void edm::Worker::itemsMayGet ( BranchType  ,
std::vector< ProductResolverIndexAndSkipBit > &   
) const
privatepure virtual

Implemented in edm::WorkerT< T >.

◆ itemsToGet()

virtual void edm::Worker::itemsToGet ( BranchType  ,
std::vector< ProductResolverIndexAndSkipBit > &   
) const
privatepure virtual

Implemented in edm::WorkerT< T >.

◆ itemsToGetForSelection()

virtual void edm::Worker::itemsToGetForSelection ( std::vector< ProductResolverIndexAndSkipBit > &  ) const
protectedpure virtual

◆ itemsToGetFrom()

virtual std::vector<ProductResolverIndexAndSkipBit> const& edm::Worker::itemsToGetFrom ( BranchType  ) const
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by edPrefetchAsync().

◆ itemToGetForTransform()

virtual ProductResolverIndex edm::Worker::itemToGetForTransform ( size_t  iTransformIndex) const
protectedpure virtual

◆ moduleConcurrencyType()

virtual ConcurrencyTypes edm::Worker::moduleConcurrencyType ( ) const
pure virtual

◆ modulesWhoseProductsAreConsumed()

virtual void edm::Worker::modulesWhoseProductsAreConsumed ( std::array< std::vector< ModuleDescription const *> *, NumBranchTypes > &  modules,
std::vector< ModuleProcessName > &  modulesInPreviousProcesses,
ProductRegistry const &  preg,
std::map< std::string, ModuleDescription const *> const &  labelsToDesc 
) const
pure virtual

Implemented in edm::WorkerT< T >.

◆ moduleType()

virtual Types edm::Worker::moduleType ( ) const
pure virtual

◆ needsESPrefetching()

bool edm::Worker::needsESPrefetching ( Transition  iTrans) const
inlineprivatenoexcept

Definition at line 368 of file Worker.h.

References esItemsToGetFrom(), and edm::NumberOfEventSetupTransitions.

Referenced by doWorkNoPrefetchingAsync().

368  {
369  return iTrans < edm::Transition::NumberOfEventSetupTransitions ? not esItemsToGetFrom(iTrans).empty() : false;
370  }
virtual std::vector< ESResolverIndex > const & esItemsToGetFrom(Transition) const =0

◆ operator=()

Worker& edm::Worker::operator= ( Worker const &  )
delete

◆ postDoEvent()

void edm::Worker::postDoEvent ( EventPrincipal const &  iEvent)

Definition at line 382 of file Worker.cc.

References earlyDeleteHelper_, and iEvent.

382  {
383  if (earlyDeleteHelper_) {
384  earlyDeleteHelper_->moduleRan(iEvent);
385  }
386  }
int iEvent
Definition: GenABIO.cc:224
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:620

◆ preActionBeforeRunEventAsync()

virtual void edm::Worker::preActionBeforeRunEventAsync ( WaitingTaskHolder  iTask,
ModuleCallingContext const &  moduleCallingContext,
Principal const &  iPrincipal 
) const
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by prefetchAsync().

◆ prefetchAsync()

template<typename T >
void edm::Worker::prefetchAsync ( WaitingTaskHolder  iTask,
ServiceToken const &  token,
ParentContext const &  parentContext,
typename T::TransitionInfoType const &  transitionInfo,
Transition  iTransition 
)
private

Definition at line 1011 of file Worker.h.

References actReg_, edm::Principal::branchType(), ALPAKA_ACCELERATOR_NAMESPACE::brokenline::constexpr(), edPrefetchAsync(), edm::ModuleCallingContext::getGlobalContext(), edm::ModuleCallingContext::getStreamContext(), edm::InEvent, edm::ModuleCallingContext::kPrefetching, moduleCallingContext_, preActionBeforeRunEventAsync(), edm::ModuleCallingContext::setContext(), and unpackBuffers-CaloStage2::token.

1015  {
1016  Principal const& principal = transitionInfo.principal();
1017 
1019 
1020  if constexpr (T::isEvent_) {
1021  actReg_->preModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
1022  } else if constexpr (std::is_same_v<typename T::Context, StreamContext>) {
1023  actReg_->preModuleStreamPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
1024  } else if constexpr (std::is_same_v<typename T::Context, GlobalContext>) {
1025  actReg_->preModuleGlobalPrefetchingSignal_.emit(*moduleCallingContext_.getGlobalContext(), moduleCallingContext_);
1026  }
1027 
1028  workerhelper::CallImpl<T>::esPrefetchAsync(this, iTask, token, transitionInfo, iTransition);
1029  edPrefetchAsync(iTask, token, principal);
1030 
1031  if (principal.branchType() == InEvent) {
1033  }
1034  }
virtual void preActionBeforeRunEventAsync(WaitingTaskHolder iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const =0
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:618
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:613
void edPrefetchAsync(WaitingTaskHolder, ServiceToken const &, Principal const &) const
Definition: Worker.cc:229
StreamContext const * getStreamContext() const
GlobalContext const * getGlobalContext() const

◆ prePrefetchSelectionAsync() [1/2]

void edm::Worker::prePrefetchSelectionAsync ( oneapi::tbb::task_group &  group,
WaitingTask task,
ServiceToken const &  token,
StreamID  stream,
EventPrincipal const *  iPrincipal 
)

Definition at line 155 of file Worker.cc.

References CMS_SA_ALLOW, edm::TaskBase::decrement_ref_count(), edm::WaitingTaskList::doneWaiting(), edm::TaskBase::execute(), watchdog::group, implDoPrePrefetchSelection(), edm::TaskBase::increment_ref_count(), B2GTnPMonitor_cfi::item, mps_monitormerge::items, itemsToGetForSelection(), edm::ServiceWeakToken::lock(), edm::make_waiting_task(), moduleCallingContext_, edm::Principal::prefetchAsync(), edm::ProductResolverIndexAmbiguous, alignCSCRings::s, timesRun_, unpackBuffers-CaloStage2::token, and waitingTasks_.

Referenced by doWorkAsync().

159  {
160  successTask->increment_ref_count();
161 
162  ServiceWeakToken weakToken = token;
163  auto choiceTask =
164  edm::make_waiting_task([id, successTask, iPrincipal, this, weakToken, &group](std::exception_ptr const*) {
165  ServiceRegistry::Operate guard(weakToken.lock());
166  // There is no reasonable place to rethrow, and implDoPrePrefetchSelection() should not throw in the first place.
167  CMS_SA_ALLOW try {
168  if (not implDoPrePrefetchSelection(id, *iPrincipal, &moduleCallingContext_)) {
169  timesRun_.fetch_add(1, std::memory_order_relaxed);
170  setPassed<true>();
171  waitingTasks_.doneWaiting(nullptr);
172  //TBB requires that destroyed tasks have count 0
173  if (0 == successTask->decrement_ref_count()) {
174  TaskSentry s(successTask);
175  }
176  return;
177  }
178  } catch (...) {
179  }
180  if (0 == successTask->decrement_ref_count()) {
181  group.run([successTask]() {
182  TaskSentry s(successTask);
183  successTask->execute();
184  });
185  }
186  });
187 
188  WaitingTaskHolder choiceHolder{group, choiceTask};
189 
190  std::vector<ProductResolverIndexAndSkipBit> items;
192 
193  for (auto const& item : items) {
194  ProductResolverIndex productResolverIndex = item.productResolverIndex();
195  bool skipCurrentProcess = item.skipCurrentProcess();
196  if (productResolverIndex != ProductResolverIndexAmbiguous) {
197  iPrincipal->prefetchAsync(
198  choiceHolder, productResolverIndex, skipCurrentProcess, token, &moduleCallingContext_);
199  }
200  }
201  choiceHolder.doneWaiting(std::exception_ptr{});
202  }
#define CMS_SA_ALLOW
unsigned int ProductResolverIndex
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:613
virtual bool implDoPrePrefetchSelection(StreamID, EventPrincipal const &, ModuleCallingContext const *)=0
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:622
virtual void itemsToGetForSelection(std::vector< ProductResolverIndexAndSkipBit > &) const =0
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
std::atomic< int > timesRun_
Definition: Worker.h:604

◆ prePrefetchSelectionAsync() [2/2]

void edm::Worker::prePrefetchSelectionAsync ( oneapi::tbb::task_group &  ,
WaitingTask task,
ServiceToken const &  ,
StreamID  stream,
void const *   
)
inline

Definition at line 141 of file Worker.h.

References cms::cuda::assert().

142  {
143  assert(false);
144  }
assert(be >=bs)

◆ registerThinnedAssociations()

void edm::Worker::registerThinnedAssociations ( ProductRegistry const &  registry,
ThinnedAssociationsHelper helper 
)

Definition at line 364 of file Worker.cc.

References cms::Exception::addContext(), description(), implRegisterThinnedAssociations(), and HerwigMaxPtPartonFilter_cfi::moduleLabel.

364  {
365  try {
367  } catch (cms::Exception& ex) {
368  ex.addContext("Calling registerThinnedAssociations() for module " + description()->moduleLabel());
369  throw ex;
370  }
371  }
Definition: helper.py:1
virtual void implRegisterThinnedAssociations(ProductRegistry const &, ThinnedAssociationsHelper &)=0
ModuleDescription const * description() const
Definition: Worker.h:198
void addContext(std::string const &context)
Definition: Exception.cc:169

◆ reset()

void edm::Worker::reset ( void  )
inline

Definition at line 188 of file Worker.h.

References cached_exception_, numberOfPathsLeftToRun_, numberOfPathsOn_, Ready, edm::WaitingTaskList::reset(), state_, waitingTasks_, and workStarted_.

Referenced by edm::WorkerManager::resetAll().

188  {
189  cached_exception_ = std::exception_ptr();
190  state_ = Ready;
192  workStarted_ = false;
194  }
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:611
std::atomic< bool > workStarted_
Definition: Worker.h:623
void reset()
Resets access to the resource so that added tasks will wait.
std::exception_ptr cached_exception_
Definition: Worker.h:616
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:622
std::atomic< State > state_
Definition: Worker.h:609
int numberOfPathsOn_
Definition: Worker.h:610

◆ resetModuleDescription()

void edm::Worker::resetModuleDescription ( ModuleDescription const *  iDesc)
protected

Definition at line 272 of file Worker.cc.

References cms::cuda::assert(), checkForShouldTryToContinue(), moduleCallingContext_, edm::ModuleCallingContext::parent(), edm::ModuleCallingContext::previousModuleOnThread(), edm::ModuleCallingContext::state(), and groupFilesInBlocks::temp.

Referenced by edm::WorkerT< T >::setModule().

272  {
273  ModuleCallingContext temp(iDesc,
274  0,
279  assert(iDesc);
281  }
ModuleCallingContext const * previousModuleOnThread() const
assert(be >=bs)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:613
void checkForShouldTryToContinue(ModuleDescription const &)
Definition: Worker.cc:113
ParentContext const & parent() const

◆ resolvePutIndicies()

virtual void edm::Worker::resolvePutIndicies ( BranchType  iBranchType,
std::unordered_multimap< std::string, std::tuple< TypeID const *, const char *, edm::ProductResolverIndex >> const &  iIndicies 
)
pure virtual

Implemented in edm::WorkerT< T >.

◆ respondToCloseInputFile()

void edm::Worker::respondToCloseInputFile ( FileBlock const &  fb)
inline

Definition at line 184 of file Worker.h.

References implRespondToCloseInputFile().

Referenced by edm::Schedule::respondToCloseInputFile().

virtual void implRespondToCloseInputFile(FileBlock const &fb)=0

◆ respondToCloseOutputFile()

void edm::Worker::respondToCloseOutputFile ( )
inline

Definition at line 185 of file Worker.h.

References implRespondToCloseOutputFile().

virtual void implRespondToCloseOutputFile()=0

◆ respondToOpenInputFile()

void edm::Worker::respondToOpenInputFile ( FileBlock const &  fb)
inline

Definition at line 183 of file Worker.h.

References implRespondToOpenInputFile().

Referenced by edm::Schedule::respondToOpenInputFile().

virtual void implRespondToOpenInputFile(FileBlock const &fb)=0

◆ runAcquire()

void edm::Worker::runAcquire ( EventTransitionInfo const &  info,
ParentContext const &  parentContext,
WaitingTaskWithArenaHolder holder 
)
private

Definition at line 388 of file Worker.cc.

References edm::exceptionContext(), implDoAcquire(), moduleCallingContext_, shouldRethrowException(), shouldTryToContinue_, timesRun_, and edm::convertException::wrap().

Referenced by runAcquireAfterAsyncPrefetch().

390  {
391  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
392  try {
393  convertException::wrap([&]() { this->implDoAcquire(info, &moduleCallingContext_, holder); });
394  } catch (cms::Exception& ex) {
396  if (shouldRethrowException(std::current_exception(), parentContext, true, shouldTryToContinue_)) {
397  timesRun_.fetch_add(1, std::memory_order_relaxed);
398  throw;
399  }
400  }
401  }
virtual void implDoAcquire(EventTransitionInfo const &, ModuleCallingContext const *, WaitingTaskWithArenaHolder &)=0
static const TGPicture * info(bool iBackgroundIsBlack)
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:613
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, bool isTryToContinue) const
Definition: Worker.cc:120
auto wrap(F iFunc) -> decltype(iFunc())
std::atomic< int > timesRun_
Definition: Worker.h:604
bool shouldTryToContinue_
Definition: Worker.h:626

◆ runAcquireAfterAsyncPrefetch()

void edm::Worker::runAcquireAfterAsyncPrefetch ( std::exception_ptr  iEPtr,
EventTransitionInfo const &  eventTransitionInfo,
ParentContext const &  parentContext,
WaitingTaskWithArenaHolder  holder 
)
private

Definition at line 403 of file Worker.cc.

References CMS_SA_ALLOW, edm::WaitingTaskWithArenaHolder::doneWaiting(), edm::ModuleCallingContext::kInvalid, moduleCallingContext_, ranAcquireWithoutException_, runAcquire(), edm::ModuleCallingContext::setContext(), shouldRethrowException(), and shouldTryToContinue_.

406  {
408  std::exception_ptr exceptionPtr;
409  if (iEPtr) {
410  if (shouldRethrowException(iEPtr, parentContext, true, shouldTryToContinue_)) {
411  exceptionPtr = iEPtr;
412  }
414  } else {
415  // Caught exception is propagated via WaitingTaskWithArenaHolder
416  CMS_SA_ALLOW try {
417  runAcquire(eventTransitionInfo, parentContext, holder);
419  } catch (...) {
420  exceptionPtr = std::current_exception();
421  }
422  }
423  // It is important this is after runAcquire completely finishes
424  holder.doneWaiting(exceptionPtr);
425  }
#define CMS_SA_ALLOW
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
bool ranAcquireWithoutException_
Definition: Worker.h:624
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:613
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, bool isTryToContinue) const
Definition: Worker.cc:120
void runAcquire(EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder &)
Definition: Worker.cc:388
bool shouldTryToContinue_
Definition: Worker.h:626

◆ runModule()

template<typename T >
bool edm::Worker::runModule ( typename T::TransitionInfoType const &  transitionInfo,
StreamID  streamID,
ParentContext const &  parentContext,
typename T::Context const *  context 
)
private

Definition at line 1219 of file Worker.h.

References actReg_, cms::cuda::assert(), cached_exception_, ALPAKA_ACCELERATOR_NAMESPACE::brokenline::constexpr(), visDQMUpload::context, edm::exceptionContext(), moduleCallingContext_, shouldRethrowException(), shouldTryToContinue_, timesRun_, and edm::convertException::wrap().

1222  {
1223  //unscheduled producers should advance this
1224  //if (T::isEvent_) {
1225  // ++timesVisited_;
1226  //}
1227  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
1228  if constexpr (T::isEvent_) {
1229  timesRun_.fetch_add(1, std::memory_order_relaxed);
1230  }
1231 
1232  bool rc = true;
1233  try {
1234  convertException::wrap([&]() {
1235  rc = workerhelper::CallImpl<T>::call(
1236  this, streamID, transitionInfo, actReg_.get(), &moduleCallingContext_, context);
1237 
1238  if (rc) {
1239  setPassed<T::isEvent_>();
1240  } else {
1241  setFailed<T::isEvent_>();
1242  }
1243  });
1244  } catch (cms::Exception& ex) {
1246  if (shouldRethrowException(std::current_exception(), parentContext, T::isEvent_, shouldTryToContinue_)) {
1248  setException<T::isEvent_>(std::current_exception());
1249  std::rethrow_exception(cached_exception_);
1250  } else {
1251  rc = setPassed<T::isEvent_>();
1252  }
1253  }
1254 
1255  return rc;
1256  }
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:618
assert(be >=bs)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:613
std::exception_ptr cached_exception_
Definition: Worker.h:616
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, bool isTryToContinue) const
Definition: Worker.cc:120
auto wrap(F iFunc) -> decltype(iFunc())
std::atomic< int > timesRun_
Definition: Worker.h:604
bool shouldTryToContinue_
Definition: Worker.h:626

◆ runModuleAfterAsyncPrefetch()

template<typename T >
std::exception_ptr edm::Worker::runModuleAfterAsyncPrefetch ( std::exception_ptr  iEPtr,
typename T::TransitionInfoType const &  transitionInfo,
StreamID  streamID,
ParentContext const &  parentContext,
typename T::Context const *  context 
)
private

Definition at line 1127 of file Worker.h.

References CMS_SA_ALLOW, visDQMUpload::context, edm::WaitingTaskList::doneWaiting(), edm::ModuleCallingContext::kInvalid, moduleCallingContext_, edm::ModuleCallingContext::setContext(), shouldRethrowException(), shouldTryToContinue_, and waitingTasks_.

Referenced by edm::Worker::RunModuleTask< T >::execute().

1131  {
1132  std::exception_ptr exceptionPtr;
1133  bool shouldRun = true;
1134  if (iEPtr) {
1135  if (shouldRethrowException(iEPtr, parentContext, T::isEvent_, shouldTryToContinue_)) {
1136  exceptionPtr = iEPtr;
1137  setException<T::isEvent_>(exceptionPtr);
1138  shouldRun = false;
1139  } else {
1140  if (not shouldTryToContinue_) {
1141  setPassed<T::isEvent_>();
1142  shouldRun = false;
1143  }
1144  }
1145  }
1146  if (shouldRun) {
1147  // Caught exception is propagated via WaitingTaskList
1148  CMS_SA_ALLOW try { runModule<T>(transitionInfo, streamID, parentContext, context); } catch (...) {
1149  exceptionPtr = std::current_exception();
1150  }
1151  } else {
1153  }
1154  waitingTasks_.doneWaiting(exceptionPtr);
1155  return exceptionPtr;
1156  }
#define CMS_SA_ALLOW
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:613
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:622
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, bool isTryToContinue) const
Definition: Worker.cc:120
bool shouldTryToContinue_
Definition: Worker.h:626

◆ runModuleDirectly()

template<typename T >
std::exception_ptr edm::Worker::runModuleDirectly ( typename T::TransitionInfoType const &  transitionInfo,
StreamID  streamID,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

Definition at line 1259 of file Worker.h.

References visDQMUpload::context, and timesVisited_.

Referenced by edm::Path::finished().

1262  {
1263  timesVisited_.fetch_add(1, std::memory_order_relaxed);
1264  std::exception_ptr prefetchingException; // null because there was no prefetching to do
1265  return runModuleAfterAsyncPrefetch<T>(prefetchingException, transitionInfo, streamID, parentContext, context);
1266  }
std::atomic< int > timesVisited_
Definition: Worker.h:605

◆ selectInputProcessBlocks()

virtual void edm::Worker::selectInputProcessBlocks ( ProductRegistry const &  ,
ProcessBlockHelperBase const &   
)
pure virtual

Implemented in edm::WorkerT< T >.

◆ serializeRunModule()

virtual TaskQueueAdaptor edm::Worker::serializeRunModule ( )
privatepure virtual

◆ setActivityRegistry()

void edm::Worker::setActivityRegistry ( std::shared_ptr< ActivityRegistry areg)

The signals are required to live longer than the last call to 'doWork' this was done to improve performance based on profiling

Definition at line 111 of file Worker.cc.

References actReg_.

111 { actReg_ = areg; }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:618

◆ setEarlyDeleteHelper()

void edm::Worker::setEarlyDeleteHelper ( EarlyDeleteHelper iHelper)

Definition at line 242 of file Worker.cc.

References earlyDeleteHelper_.

242 { earlyDeleteHelper_ = iHelper; }
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:620

◆ setException()

template<bool IS_EVENT>
std::exception_ptr edm::Worker::setException ( std::exception_ptr  iException)
inlineprivate

Definition at line 349 of file Worker.h.

References cached_exception_, Exception, state_, and timesExcept_.

349  {
350  if (IS_EVENT) {
351  timesExcept_.fetch_add(1, std::memory_order_relaxed);
352  }
353  cached_exception_ = iException; // propagate_const<T> has no reset() function
354  state_ = Exception;
355  return cached_exception_;
356  }
std::atomic< int > timesExcept_
Definition: Worker.h:608
std::exception_ptr cached_exception_
Definition: Worker.h:616
std::atomic< State > state_
Definition: Worker.h:609

◆ setFailed()

template<bool IS_EVENT>
bool edm::Worker::setFailed ( )
inlineprivate

Definition at line 340 of file Worker.h.

References Fail, state_, and timesFailed_.

340  {
341  if (IS_EVENT) {
342  timesFailed_.fetch_add(1, std::memory_order_relaxed);
343  }
344  state_ = Fail;
345  return false;
346  }
std::atomic< int > timesFailed_
Definition: Worker.h:607
std::atomic< State > state_
Definition: Worker.h:609

◆ setPassed()

template<bool IS_EVENT>
bool edm::Worker::setPassed ( )
inlineprivate

Definition at line 331 of file Worker.h.

References Pass, state_, and timesPassed_.

331  {
332  if (IS_EVENT) {
333  timesPassed_.fetch_add(1, std::memory_order_relaxed);
334  }
335  state_ = Pass;
336  return true;
337  }
std::atomic< State > state_
Definition: Worker.h:609
std::atomic< int > timesPassed_
Definition: Worker.h:606

◆ shouldRethrowException()

bool edm::Worker::shouldRethrowException ( std::exception_ptr  iPtr,
ParentContext const &  parentContext,
bool  isEvent,
bool  isTryToContinue 
) const
private

Definition at line 120 of file Worker.cc.

References writedatasetfile::action, actions_, cms::Exception::category(), edm::ExceptionToActionTable::find(), edm::exception_actions::IgnoreCompletely, edm::printCmsExceptionWarning(), edm::exception_actions::Rethrow, edm::exception_actions::TryToContinue, and edm::convertException::wrap().

Referenced by runAcquire(), runAcquireAfterAsyncPrefetch(), runModule(), and runModuleAfterAsyncPrefetch().

123  {
124  // NOTE: the warning printed as a result of ignoring or failing
125  // a module will only be printed during the full true processing
126  // pass of this module
127 
128  // Get the action corresponding to this exception. However, if processing
129  // something other than an event (e.g. run, lumi) always rethrow.
130  if (not isEvent) {
131  return true;
132  }
133  try {
134  convertException::wrap([&]() { std::rethrow_exception(iPtr); });
135  } catch (cms::Exception& ex) {
137 
139  return true;
140  }
142  if (shouldTryToContinue) {
143  edm::printCmsExceptionWarning("TryToContinue", ex);
144  }
145  return not shouldTryToContinue;
146  }
148  edm::printCmsExceptionWarning("IgnoreCompletely", ex);
149  return false;
150  }
151  }
152  return true;
153  }
std::string const & category() const
Definition: Exception.cc:147
ExceptionToActionTable const * actions_
Definition: Worker.h:615
exception_actions::ActionCodes find(const std::string &category) const
auto wrap(F iFunc) -> decltype(iFunc())
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)

◆ skipOnPath()

void edm::Worker::skipOnPath ( EventPrincipal const &  iEvent)

Definition at line 373 of file Worker.cc.

References cached_exception_, edm::WaitingTaskList::doneWaiting(), earlyDeleteHelper_, iEvent, numberOfPathsLeftToRun_, and waitingTasks_.

Referenced by edm::WorkerInPath::skipWorker().

373  {
374  if (earlyDeleteHelper_) {
375  earlyDeleteHelper_->pathFinished(iEvent);
376  }
377  if (0 == --numberOfPathsLeftToRun_) {
379  }
380  }
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:611
std::exception_ptr cached_exception_
Definition: Worker.h:616
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
int iEvent
Definition: GenABIO.cc:224
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:622
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:620

◆ state()

State edm::Worker::state ( ) const
inline

Definition at line 247 of file Worker.h.

References state_.

Referenced by edm::WorkerInPath::checkResultsOfRunWorker().

247 { return state_; }
std::atomic< State > state_
Definition: Worker.h:609

◆ timesExcept()

int edm::Worker::timesExcept ( ) const
inline

Definition at line 246 of file Worker.h.

References timesExcept_.

246 { return timesExcept_.load(std::memory_order_acquire); }
std::atomic< int > timesExcept_
Definition: Worker.h:608

◆ timesFailed()

int edm::Worker::timesFailed ( ) const
inline

Definition at line 245 of file Worker.h.

References timesFailed_.

245 { return timesFailed_.load(std::memory_order_acquire); }
std::atomic< int > timesFailed_
Definition: Worker.h:607

◆ timesPass()

int edm::Worker::timesPass ( ) const
inline

Definition at line 249 of file Worker.h.

References timesPassed().

249 { return timesPassed(); } // for backward compatibility only - to be removed soon
int timesPassed() const
Definition: Worker.h:244

◆ timesPassed()

int edm::Worker::timesPassed ( ) const
inline

Definition at line 244 of file Worker.h.

References timesPassed_.

Referenced by timesPass().

244 { return timesPassed_.load(std::memory_order_acquire); }
std::atomic< int > timesPassed_
Definition: Worker.h:606

◆ timesRun()

int edm::Worker::timesRun ( ) const
inline

Definition at line 242 of file Worker.h.

References timesRun_.

242 { return timesRun_.load(std::memory_order_acquire); }
std::atomic< int > timesRun_
Definition: Worker.h:604

◆ timesVisited()

int edm::Worker::timesVisited ( ) const
inline

Definition at line 243 of file Worker.h.

References timesVisited_.

243 { return timesVisited_.load(std::memory_order_acquire); }
std::atomic< int > timesVisited_
Definition: Worker.h:605

◆ transformIndex()

size_t edm::Worker::transformIndex ( edm::BranchDescription const &  ) const
pure virtual

◆ updateLookup() [1/2]

virtual void edm::Worker::updateLookup ( BranchType  iBranchType,
ProductResolverIndexHelper const &   
)
pure virtual

Implemented in edm::WorkerT< T >.

◆ updateLookup() [2/2]

virtual void edm::Worker::updateLookup ( eventsetup::ESRecordsToProductResolverIndices const &  )
pure virtual

Implemented in edm::WorkerT< T >.

◆ waitingTaskList()

edm::WaitingTaskList& edm::Worker::waitingTaskList ( )
inline

Definition at line 254 of file Worker.h.

References waitingTasks_.

254 { return waitingTasks_; }
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:622

◆ wantsGlobalLuminosityBlocks()

virtual bool edm::Worker::wantsGlobalLuminosityBlocks ( ) const
pure virtual

◆ wantsGlobalRuns()

virtual bool edm::Worker::wantsGlobalRuns ( ) const
pure virtual

◆ wantsInputProcessBlocks()

virtual bool edm::Worker::wantsInputProcessBlocks ( ) const
pure virtual

◆ wantsProcessBlocks()

virtual bool edm::Worker::wantsProcessBlocks ( ) const
pure virtual

◆ wantsStreamLuminosityBlocks()

virtual bool edm::Worker::wantsStreamLuminosityBlocks ( ) const
pure virtual

◆ wantsStreamRuns()

virtual bool edm::Worker::wantsStreamRuns ( ) const
pure virtual

◆ workerType()

virtual std::string edm::Worker::workerType ( ) const
protectedpure virtual

Implemented in edm::WorkerT< T >.

Friends And Related Function Documentation

◆ workerhelper::CallImpl

template<typename O >
friend class workerhelper::CallImpl
friend

Definition at line 258 of file Worker.h.

Member Data Documentation

◆ actions_

ExceptionToActionTable const* edm::Worker::actions_
private

Definition at line 615 of file Worker.h.

Referenced by shouldRethrowException().

◆ actReg_

std::shared_ptr<ActivityRegistry> edm::Worker::actReg_
private

◆ beginSucceeded_

bool edm::Worker::beginSucceeded_ = false
private

◆ cached_exception_

std::exception_ptr edm::Worker::cached_exception_
private

Definition at line 616 of file Worker.h.

Referenced by reset(), runModule(), setException(), and skipOnPath().

◆ earlyDeleteHelper_

edm::propagate_const<EarlyDeleteHelper*> edm::Worker::earlyDeleteHelper_
private

Definition at line 620 of file Worker.h.

Referenced by postDoEvent(), setEarlyDeleteHelper(), and skipOnPath().

◆ moduleCallingContext_

ModuleCallingContext edm::Worker::moduleCallingContext_
private

◆ moduleValid_

bool edm::Worker::moduleValid_ = true
private

Definition at line 625 of file Worker.h.

Referenced by clearModule(), and description().

◆ numberOfPathsLeftToRun_

std::atomic<int> edm::Worker::numberOfPathsLeftToRun_
private

Definition at line 611 of file Worker.h.

Referenced by reset(), and skipOnPath().

◆ numberOfPathsOn_

int edm::Worker::numberOfPathsOn_
private

Definition at line 610 of file Worker.h.

Referenced by addedToPath(), and reset().

◆ ranAcquireWithoutException_

bool edm::Worker::ranAcquireWithoutException_
private

Definition at line 624 of file Worker.h.

Referenced by handleExternalWorkException(), and runAcquireAfterAsyncPrefetch().

◆ shouldTryToContinue_

bool edm::Worker::shouldTryToContinue_ = false
private

◆ state_

std::atomic<State> edm::Worker::state_
private

◆ timesExcept_

std::atomic<int> edm::Worker::timesExcept_
private

Definition at line 608 of file Worker.h.

Referenced by clearCounters(), setException(), and timesExcept().

◆ timesFailed_

std::atomic<int> edm::Worker::timesFailed_
private

Definition at line 607 of file Worker.h.

Referenced by clearCounters(), setFailed(), and timesFailed().

◆ timesPassed_

std::atomic<int> edm::Worker::timesPassed_
private

Definition at line 606 of file Worker.h.

Referenced by clearCounters(), setPassed(), and timesPassed().

◆ timesRun_

std::atomic<int> edm::Worker::timesRun_
private

Definition at line 604 of file Worker.h.

Referenced by clearCounters(), prePrefetchSelectionAsync(), runAcquire(), runModule(), and timesRun().

◆ timesVisited_

std::atomic<int> edm::Worker::timesVisited_
private

Definition at line 605 of file Worker.h.

Referenced by clearCounters(), doWorkAsync(), runModuleDirectly(), and timesVisited().

◆ waitingTasks_

edm::WaitingTaskList edm::Worker::waitingTasks_
private

◆ workStarted_

std::atomic<bool> edm::Worker::workStarted_
private

Definition at line 623 of file Worker.h.

Referenced by doWorkAsync(), doWorkNoPrefetchingAsync(), and reset().