CMS 3D CMS Logo

List of all members | Classes | Public Types | Public Member Functions | Protected Member Functions | Private Member Functions | Private Attributes | Friends
edm::Worker Class Referenceabstract

#include <Worker.h>

Inheritance diagram for edm::Worker:
edm::WorkerT< T >

Classes

class  AcquireTask
 
class  AcquireTask< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin >, DUMMY >
 
class  HandleExternalWorkExceptionTask
 
class  RunModuleTask
 
struct  TaskQueueAdaptor
 

Public Types

enum  ConcurrencyTypes {
  kGlobal, kLimited, kOne, kStream,
  kLegacy
}
 
enum  State { Ready, Pass, Fail, Exception }
 
enum  Types { kAnalyzer, kFilter, kProducer, kOutputModule }
 

Public Member Functions

void addedToPath ()
 
void beginJob ()
 
void beginStream (StreamID id, StreamContext &streamContext)
 
void callWhenDoneAsync (WaitingTaskHolder task)
 
void clearCounters ()
 
void clearModule ()
 
virtual std::vector< ConsumesInfoconsumesInfo () const =0
 
virtual void convertCurrentProcessAlias (std::string const &processName)=0
 
ModuleDescription const * description () const
 
void doTransformAsync (WaitingTaskHolder, size_t iTransformIndex, EventPrincipal const &, ServiceToken const &, StreamID, ModuleCallingContext const &, StreamContext const *)
 
template<typename T >
void doWorkAsync (WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
 
template<typename T >
void doWorkNoPrefetchingAsync (WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
 
void endJob ()
 
void endStream (StreamID id, StreamContext &streamContext)
 
virtual SerialTaskQueueglobalLuminosityBlocksQueue ()=0
 
virtual SerialTaskQueueglobalRunsQueue ()=0
 
virtual bool hasAccumulator () const =0
 
virtual ConcurrencyTypes moduleConcurrencyType () const =0
 
virtual void modulesWhoseProductsAreConsumed (std::array< std::vector< ModuleDescription const *> *, NumBranchTypes > &modules, std::vector< ModuleProcessName > &modulesInPreviousProcesses, ProductRegistry const &preg, std::map< std::string, ModuleDescription const *> const &labelsToDesc) const =0
 
virtual Types moduleType () const =0
 
Workeroperator= (Worker const &)=delete
 
void postDoEvent (EventPrincipal const &)
 
void prePrefetchSelectionAsync (oneapi::tbb::task_group &, WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *)
 
void prePrefetchSelectionAsync (oneapi::tbb::task_group &, WaitingTask *task, ServiceToken const &, StreamID stream, void const *)
 
void registerThinnedAssociations (ProductRegistry const &registry, ThinnedAssociationsHelper &helper)
 
void reset ()
 
virtual void resolvePutIndicies (BranchType iBranchType, std::unordered_multimap< std::string, std::tuple< TypeID const *, const char *, edm::ProductResolverIndex >> const &iIndicies)=0
 
void respondToCloseInputFile (FileBlock const &fb)
 
void respondToCloseOutputFile ()
 
void respondToOpenInputFile (FileBlock const &fb)
 
template<typename T >
std::exception_ptr runModuleDirectly (typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
 
virtual void selectInputProcessBlocks (ProductRegistry const &, ProcessBlockHelperBase const &)=0
 
void setActivityRegistry (std::shared_ptr< ActivityRegistry > areg)
 
void setEarlyDeleteHelper (EarlyDeleteHelper *iHelper)
 
void skipOnPath (EventPrincipal const &iEvent)
 
State state () const
 
int timesExcept () const
 
int timesFailed () const
 
int timesPass () const
 
int timesPassed () const
 
int timesRun () const
 
int timesVisited () const
 
virtual size_t transformIndex (edm::BranchDescription const &) const =0
 
virtual void updateLookup (BranchType iBranchType, ProductResolverIndexHelper const &)=0
 
virtual void updateLookup (eventsetup::ESRecordsToProductResolverIndices const &)=0
 
edm::WaitingTaskListwaitingTaskList ()
 
virtual bool wantsGlobalLuminosityBlocks () const =0
 
virtual bool wantsGlobalRuns () const =0
 
virtual bool wantsInputProcessBlocks () const =0
 
virtual bool wantsProcessBlocks () const =0
 
virtual bool wantsStreamLuminosityBlocks () const =0
 
virtual bool wantsStreamRuns () const =0
 
 Worker (ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
 
 Worker (Worker const &)=delete
 
virtual ~Worker ()
 

Protected Member Functions

ActivityRegistryactivityRegistry ()
 
virtual void doClearModule ()=0
 
virtual void implBeginJob ()=0
 
virtual void implBeginStream (StreamID)=0
 
virtual bool implDo (EventTransitionInfo const &, ModuleCallingContext const *)=0
 
virtual bool implDoAccessInputProcessBlock (ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
 
virtual void implDoAcquire (EventTransitionInfo const &, ModuleCallingContext const *, WaitingTaskWithArenaHolder &)=0
 
virtual bool implDoBegin (RunTransitionInfo const &, ModuleCallingContext const *)=0
 
virtual bool implDoBegin (LumiTransitionInfo const &, ModuleCallingContext const *)=0
 
virtual bool implDoBeginProcessBlock (ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
 
virtual bool implDoEnd (RunTransitionInfo const &, ModuleCallingContext const *)=0
 
virtual bool implDoEnd (LumiTransitionInfo const &, ModuleCallingContext const *)=0
 
virtual bool implDoEndProcessBlock (ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
 
virtual bool implDoPrePrefetchSelection (StreamID, EventPrincipal const &, ModuleCallingContext const *)=0
 
virtual bool implDoStreamBegin (StreamID, RunTransitionInfo const &, ModuleCallingContext const *)=0
 
virtual bool implDoStreamBegin (StreamID, LumiTransitionInfo const &, ModuleCallingContext const *)=0
 
virtual bool implDoStreamEnd (StreamID, RunTransitionInfo const &, ModuleCallingContext const *)=0
 
virtual bool implDoStreamEnd (StreamID, LumiTransitionInfo const &, ModuleCallingContext const *)=0
 
virtual void implDoTransformAsync (WaitingTaskHolder, size_t iTransformIndex, EventPrincipal const &, ParentContext const &, ServiceWeakToken const &)=0
 
virtual void implEndJob ()=0
 
virtual void implEndStream (StreamID)=0
 
virtual bool implNeedToRunSelection () const =0
 
virtual void itemsToGetForSelection (std::vector< ProductResolverIndexAndSkipBit > &) const =0
 
virtual ProductResolverIndex itemToGetForTransform (size_t iTransformIndex) const =0
 
void resetModuleDescription (ModuleDescription const *)
 
virtual std::string workerType () const =0
 

Private Member Functions

void checkForShouldTryToContinue (ModuleDescription const &)
 
void edPrefetchAsync (WaitingTaskHolder, ServiceToken const &, Principal const &) const
 
void emitPostModuleEventPrefetchingSignal ()
 
void emitPostModuleGlobalPrefetchingSignal ()
 
void emitPostModuleStreamPrefetchingSignal ()
 
virtual std::vector< ESResolverIndex > const & esItemsToGetFrom (Transition) const =0
 
void esPrefetchAsync (WaitingTaskHolder, EventSetupImpl const &, Transition, ServiceToken const &)
 
virtual std::vector< ESRecordIndex > const & esRecordsToGetFrom (Transition) const =0
 
std::exception_ptr handleExternalWorkException (std::exception_ptr iEPtr, ParentContext const &parentContext)
 
virtual bool hasAcquire () const =0
 
virtual void implRegisterThinnedAssociations (ProductRegistry const &, ThinnedAssociationsHelper &)=0
 
virtual void implRespondToCloseInputFile (FileBlock const &fb)=0
 
virtual void implRespondToCloseOutputFile ()=0
 
virtual void implRespondToOpenInputFile (FileBlock const &fb)=0
 
virtual void itemsMayGet (BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
 
virtual void itemsToGet (BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
 
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom (BranchType) const =0
 
bool needsESPrefetching (Transition iTrans) const noexcept
 
virtual void preActionBeforeRunEventAsync (WaitingTaskHolder iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const =0
 
template<typename T >
void prefetchAsync (WaitingTaskHolder, ServiceToken const &, ParentContext const &, typename T::TransitionInfoType const &, Transition)
 
void runAcquire (EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder &)
 
void runAcquireAfterAsyncPrefetch (std::exception_ptr, EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder)
 
template<typename T >
bool runModule (typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
 
template<typename T >
std::exception_ptr runModuleAfterAsyncPrefetch (std::exception_ptr, typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
 
virtual TaskQueueAdaptor serializeRunModule ()=0
 
template<bool IS_EVENT>
std::exception_ptr setException (std::exception_ptr iException)
 
template<bool IS_EVENT>
bool setFailed ()
 
template<bool IS_EVENT>
bool setPassed ()
 
bool shouldRethrowException (std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, bool isTryToContinue) const
 

Private Attributes

ExceptionToActionTable const * actions_
 
std::shared_ptr< ActivityRegistryactReg_
 
std::exception_ptr cached_exception_
 
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
 
ModuleCallingContext moduleCallingContext_
 
bool moduleValid_ = true
 
std::atomic< int > numberOfPathsLeftToRun_
 
int numberOfPathsOn_
 
bool ranAcquireWithoutException_
 
bool shouldTryToContinue_ = false
 
std::atomic< Statestate_
 
std::atomic< int > timesExcept_
 
std::atomic< int > timesFailed_
 
std::atomic< int > timesPassed_
 
std::atomic< int > timesRun_
 
std::atomic< int > timesVisited_
 
edm::WaitingTaskList waitingTasks_
 
std::atomic< bool > workStarted_
 

Friends

template<typename O >
class workerhelper::CallImpl
 

Detailed Description

Definition at line 92 of file Worker.h.

Member Enumeration Documentation

◆ ConcurrencyTypes

Enumerator
kGlobal 
kLimited 
kOne 
kStream 
kLegacy 

Definition at line 96 of file Worker.h.

◆ State

Enumerator
Ready 
Pass 
Fail 
Exception 

Definition at line 94 of file Worker.h.

◆ Types

Enumerator
kAnalyzer 
kFilter 
kProducer 
kOutputModule 

Definition at line 95 of file Worker.h.

Constructor & Destructor Documentation

◆ Worker() [1/2]

edm::Worker::Worker ( ModuleDescription const &  iMD,
ExceptionToActionTable const *  iActions 
)

Definition at line 90 of file Worker.cc.

References checkForShouldTryToContinue().

91  : timesRun_(0),
92  timesVisited_(0),
93  timesPassed_(0),
94  timesFailed_(0),
95  timesExcept_(0),
96  state_(Ready),
100  actions_(iActions),
102  actReg_(),
103  earlyDeleteHelper_(nullptr),
104  workStarted_(false),
107  }
std::atomic< int > timesVisited_
Definition: Worker.h:605
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:611
std::atomic< int > timesExcept_
Definition: Worker.h:608
std::atomic< bool > workStarted_
Definition: Worker.h:623
std::atomic< int > timesFailed_
Definition: Worker.h:607
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:618
ExceptionToActionTable const * actions_
Definition: Worker.h:615
bool ranAcquireWithoutException_
Definition: Worker.h:624
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:613
std::exception_ptr cached_exception_
Definition: Worker.h:616
std::atomic< State > state_
Definition: Worker.h:609
int numberOfPathsOn_
Definition: Worker.h:610
void checkForShouldTryToContinue(ModuleDescription const &)
Definition: Worker.cc:113
std::atomic< int > timesPassed_
Definition: Worker.h:606
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:620
std::atomic< int > timesRun_
Definition: Worker.h:604

◆ ~Worker()

edm::Worker::~Worker ( )
virtual

Definition at line 109 of file Worker.cc.

109 {}

◆ Worker() [2/2]

edm::Worker::Worker ( Worker const &  )
delete

Member Function Documentation

◆ activityRegistry()

ActivityRegistry* edm::Worker::activityRegistry ( )
inlineprotected

Definition at line 298 of file Worker.h.

References actReg_.

298 { return actReg_.get(); }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:618

◆ addedToPath()

void edm::Worker::addedToPath ( )
inline

Definition at line 240 of file Worker.h.

References numberOfPathsOn_.

240 { ++numberOfPathsOn_; }
int numberOfPathsOn_
Definition: Worker.h:610

◆ beginJob()

void edm::Worker::beginJob ( void  )

Definition at line 280 of file Worker.cc.

References actReg_, cms::Exception::addContext(), description(), Exception, implBeginJob(), edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), state_, and edm::convertException::wrap().

Referenced by edm::WorkerManager::beginJob().

280  {
281  try {
282  convertException::wrap([&]() {
283  ModuleBeginJobSignalSentry cpp(actReg_.get(), *description());
284  implBeginJob();
285  });
286  } catch (cms::Exception& ex) {
287  state_ = Exception;
288  std::ostringstream ost;
289  ost << "Calling beginJob for module " << description()->moduleName() << "/'" << description()->moduleLabel()
290  << "'";
291  ex.addContext(ost.str());
292  throw;
293  }
294  }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:618
std::string const & moduleName() const
std::atomic< State > state_
Definition: Worker.h:609
ModuleDescription const * description() const
Definition: Worker.h:198
virtual void implBeginJob()=0
void addContext(std::string const &context)
Definition: Exception.cc:169
auto wrap(F iFunc) -> decltype(iFunc())
std::string const & moduleLabel() const

◆ beginStream()

void edm::Worker::beginStream ( StreamID  id,
StreamContext streamContext 
)

Definition at line 313 of file Worker.cc.

References actReg_, cms::Exception::addContext(), description(), Exception, implBeginStream(), edm::LuminosityBlockIndex::invalidLuminosityBlockIndex(), edm::RunIndex::invalidRunIndex(), edm::StreamContext::kBeginStream, edm::ModuleCallingContext::kRunning, moduleCallingContext_, edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), edm::StreamContext::setEventID(), edm::StreamContext::setLuminosityBlockIndex(), edm::StreamContext::setRunIndex(), edm::ModuleCallingContext::setState(), edm::StreamContext::setTimestamp(), edm::StreamContext::setTransition(), state_, and edm::convertException::wrap().

313  {
314  try {
315  convertException::wrap([&]() {
316  streamContext.setTransition(StreamContext::Transition::kBeginStream);
317  streamContext.setEventID(EventID(0, 0, 0));
318  streamContext.setRunIndex(RunIndex::invalidRunIndex());
319  streamContext.setLuminosityBlockIndex(LuminosityBlockIndex::invalidLuminosityBlockIndex());
320  streamContext.setTimestamp(Timestamp());
321  ParentContext parentContext(&streamContext);
322  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
324  ModuleBeginStreamSignalSentry beginSentry(actReg_.get(), streamContext, moduleCallingContext_);
325  implBeginStream(id);
326  });
327  } catch (cms::Exception& ex) {
328  state_ = Exception;
329  std::ostringstream ost;
330  ost << "Calling beginStream for module " << description()->moduleName() << "/'" << description()->moduleLabel()
331  << "'";
332  ex.addContext(ost.str());
333  throw;
334  }
335  }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:618
std::string const & moduleName() const
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:613
static RunIndex invalidRunIndex()
Definition: RunIndex.cc:9
std::atomic< State > state_
Definition: Worker.h:609
static LuminosityBlockIndex invalidLuminosityBlockIndex()
ModuleDescription const * description() const
Definition: Worker.h:198
void addContext(std::string const &context)
Definition: Exception.cc:169
virtual void implBeginStream(StreamID)=0
auto wrap(F iFunc) -> decltype(iFunc())
std::string const & moduleLabel() const

◆ callWhenDoneAsync()

void edm::Worker::callWhenDoneAsync ( WaitingTaskHolder  task)
inline

Definition at line 177 of file Worker.h.

References edm::WaitingTaskList::add(), eostools::move(), TrackValidation_cff::task, and waitingTasks_.

Referenced by edm::SwitchProducerProductResolver::prefetchAsync_().

edm::WaitingTaskList waitingTasks_
Definition: Worker.h:622
void add(oneapi::tbb::task_group *, WaitingTask *)
Adds task to the waiting list.
def move(src, dest)
Definition: eostools.py:511

◆ checkForShouldTryToContinue()

void edm::Worker::checkForShouldTryToContinue ( ModuleDescription const &  iDesc)
private

Definition at line 113 of file Worker.cc.

References edm::pset::Registry::getMapped(), edm::pset::Registry::instance(), edm::ModuleDescription::parameterSetID(), muonDTDigis_cfi::pset, and shouldTryToContinue_.

Referenced by resetModuleDescription(), and Worker().

113  {
114  auto pset = edm::pset::Registry::instance()->getMapped(iDesc.parameterSetID());
115  if (pset and pset->exists("@shouldTryToContinue")) {
116  shouldTryToContinue_ = true;
117  }
118  }
bool getMapped(key_type const &k, value_type &result) const
Definition: Registry.cc:17
static Registry * instance()
Definition: Registry.cc:12
bool shouldTryToContinue_
Definition: Worker.h:626

◆ clearCounters()

void edm::Worker::clearCounters ( )
inline

Definition at line 232 of file Worker.h.

References timesExcept_, timesFailed_, timesPassed_, timesRun_, and timesVisited_.

Referenced by edm::StreamSchedule::clearCounters().

232  {
233  timesRun_.store(0, std::memory_order_release);
234  timesVisited_.store(0, std::memory_order_release);
235  timesPassed_.store(0, std::memory_order_release);
236  timesFailed_.store(0, std::memory_order_release);
237  timesExcept_.store(0, std::memory_order_release);
238  }
std::atomic< int > timesVisited_
Definition: Worker.h:605
std::atomic< int > timesExcept_
Definition: Worker.h:608
std::atomic< int > timesFailed_
Definition: Worker.h:607
std::atomic< int > timesPassed_
Definition: Worker.h:606
std::atomic< int > timesRun_
Definition: Worker.h:604

◆ clearModule()

void edm::Worker::clearModule ( )
inline

Definition at line 123 of file Worker.h.

References doClearModule(), and moduleValid_.

123  {
124  moduleValid_ = false;
125  doClearModule();
126  }
bool moduleValid_
Definition: Worker.h:625
virtual void doClearModule()=0

◆ consumesInfo()

virtual std::vector<ConsumesInfo> edm::Worker::consumesInfo ( ) const
pure virtual

◆ convertCurrentProcessAlias()

virtual void edm::Worker::convertCurrentProcessAlias ( std::string const &  processName)
pure virtual

Implemented in edm::WorkerT< T >.

◆ description()

ModuleDescription const* edm::Worker::description ( ) const
inline

◆ doClearModule()

virtual void edm::Worker::doClearModule ( )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by clearModule().

◆ doTransformAsync()

void edm::Worker::doTransformAsync ( WaitingTaskHolder  iTask,
size_t  iTransformIndex,
EventPrincipal const &  iPrincipal,
ServiceToken const &  iToken,
StreamID  ,
ModuleCallingContext const &  mcc,
StreamContext const *   
)

Definition at line 245 of file Worker.cc.

References edm::WaitingTaskHolder::doneWaiting(), edm::WaitingTaskHolder::group(), implDoTransformAsync(), itemToGetForTransform(), edm::make_waiting_task(), edm::ModuleCallingContext::parent(), class-composition::parent, edm::Principal::prefetchAsync(), and TrackValidation_cff::task.

Referenced by edm::TransformingProductResolver::prefetchAsync_().

251  {
252  ServiceWeakToken weakToken = iToken;
253 
254  //Need to make the services available early so other services can see them
255  auto task = make_waiting_task([this, iTask, weakToken, &iPrincipal, iTransformIndex, parent = mcc.parent()](
256  std::exception_ptr const* iExcept) mutable {
257  if (iExcept) {
258  iTask.doneWaiting(*iExcept);
259  return;
260  }
261  implDoTransformAsync(iTask, iTransformIndex, iPrincipal, parent, weakToken);
262  });
263 
264  //NOTE: need different ModuleCallingContext. The ProductResolver will copy the context in order to get
265  // a longer lifetime than this function call.
266  iPrincipal.prefetchAsync(
267  WaitingTaskHolder(*iTask.group(), task), itemToGetForTransform(iTransformIndex), false, iToken, &mcc);
268  }
virtual void implDoTransformAsync(WaitingTaskHolder, size_t iTransformIndex, EventPrincipal const &, ParentContext const &, ServiceWeakToken const &)=0
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
virtual ProductResolverIndex itemToGetForTransform(size_t iTransformIndex) const =0

◆ doWorkAsync()

template<typename T >
void edm::Worker::doWorkAsync ( WaitingTaskHolder  task,
typename T::TransitionInfoType const &  transitionInfo,
ServiceToken const &  token,
StreamID  streamID,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

Definition at line 973 of file Worker.h.

References edm::WaitingTaskList::add(), visDQMUpload::context, watchdog::group, hasAcquire(), info(), edm::ModuleCallingContext::kPrefetching, edm::make_waiting_task(), moduleCallingContext_, eostools::move(), AlCaHLTBitMon_ParallelJobs::p, prePrefetchSelectionAsync(), fetchall_from_DQM_v2::release, alignCSCRings::s, edm::ModuleCallingContext::setContext(), submitPVValidationJobs::t, TrackValidation_cff::task, timesVisited_, unpackBuffers-CaloStage2::token, waitingTasks_, and workStarted_.

Referenced by edm::UnscheduledProductResolver::prefetchAsync_(), and edm::WorkerInPath::runWorkerAsync().

978  {
979  if (not workerhelper::CallImpl<T>::wantsTransition(this)) {
980  return;
981  }
982 
983  //Need to check workStarted_ before adding to waitingTasks_
984  bool expected = false;
985  bool workStarted = workStarted_.compare_exchange_strong(expected, true);
986 
988  if constexpr (T::isEvent_) {
989  timesVisited_.fetch_add(1, std::memory_order_relaxed);
990  }
991 
992  if (workStarted) {
994 
995  //if have TriggerResults based selection we want to reject the event before doing prefetching
996  if (workerhelper::CallImpl<T>::needToRunSelection(this)) {
997  //We need to run the selection in a different task so that
998  // we can prefetch the data needed for the selection
999  WaitingTask* moduleTask =
1000  new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
1001 
1002  //make sure the task is either run or destroyed
1003  struct DestroyTask {
1004  DestroyTask(edm::WaitingTask* iTask) : m_task(iTask) {}
1005 
1006  ~DestroyTask() {
1007  auto p = m_task.exchange(nullptr);
1008  if (p) {
1009  TaskSentry s{p};
1010  }
1011  }
1012 
1013  edm::WaitingTask* release() { return m_task.exchange(nullptr); }
1014 
1015  private:
1016  std::atomic<edm::WaitingTask*> m_task;
1017  };
1018  if constexpr (T::isEvent_) {
1019  if (hasAcquire()) {
1020  auto ownRunTask = std::make_shared<DestroyTask>(moduleTask);
1021  ServiceWeakToken weakToken = token;
1022  auto* group = task.group();
1023  moduleTask = make_waiting_task(
1024  [this, weakToken, transitionInfo, parentContext, ownRunTask, group](std::exception_ptr const* iExcept) {
1025  WaitingTaskWithArenaHolder runTaskHolder(
1026  *group, new HandleExternalWorkExceptionTask(this, group, ownRunTask->release(), parentContext));
1027  AcquireTask<T> t(this, transitionInfo, weakToken.lock(), parentContext, runTaskHolder);
1028  t.execute();
1029  });
1030  }
1031  }
1032  auto* group = task.group();
1033  auto ownModuleTask = std::make_shared<DestroyTask>(moduleTask);
1034  ServiceWeakToken weakToken = token;
1035  auto selectionTask =
1036  make_waiting_task([ownModuleTask, parentContext, info = transitionInfo, weakToken, group, this](
1037  std::exception_ptr const*) mutable {
1038  ServiceRegistry::Operate guard(weakToken.lock());
1039  prefetchAsync<T>(WaitingTaskHolder(*group, ownModuleTask->release()),
1040  weakToken.lock(),
1041  parentContext,
1042  info,
1043  T::transition_);
1044  });
1045  prePrefetchSelectionAsync(*group, selectionTask, token, streamID, &transitionInfo.principal());
1046  } else {
1047  WaitingTask* moduleTask =
1048  new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
1049  auto group = task.group();
1050  if constexpr (T::isEvent_) {
1051  if (hasAcquire()) {
1052  WaitingTaskWithArenaHolder runTaskHolder(
1053  *group, new HandleExternalWorkExceptionTask(this, group, moduleTask, parentContext));
1054  moduleTask = new AcquireTask<T>(this, transitionInfo, token, parentContext, std::move(runTaskHolder));
1055  }
1056  }
1057  prefetchAsync<T>(WaitingTaskHolder(*group, moduleTask), token, parentContext, transitionInfo, T::transition_);
1058  }
1059  }
1060  }
virtual bool hasAcquire() const =0
std::atomic< int > timesVisited_
Definition: Worker.h:605
static const TGPicture * info(bool iBackgroundIsBlack)
std::atomic< bool > workStarted_
Definition: Worker.h:623
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:613
void prePrefetchSelectionAsync(oneapi::tbb::task_group &, WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *)
Definition: Worker.cc:155
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:622
void add(oneapi::tbb::task_group *, WaitingTask *)
Adds task to the waiting list.
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
def move(src, dest)
Definition: eostools.py:511

◆ doWorkNoPrefetchingAsync()

template<typename T >
void edm::Worker::doWorkNoPrefetchingAsync ( WaitingTaskHolder  task,
typename T::TransitionInfoType const &  transitionInfo,
ServiceToken const &  serviceToken,
StreamID  streamID,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

Definition at line 1095 of file Worker.h.

References edm::WaitingTaskList::add(), CMS_SA_ALLOW, visDQMUpload::context, edm::WaitingTaskList::doneWaiting(), esPrefetchAsync(), watchdog::group, info(), edm::ModuleCallingContext::kPrefetching, edm::ServiceWeakToken::lock(), edm::make_waiting_task(), moduleCallingContext_, eostools::move(), needsESPrefetching(), createBeamHaloJobs::queue, serializeRunModule(), edm::ModuleCallingContext::setContext(), TrackValidation_cff::task, waitingTasks_, and workStarted_.

Referenced by edm::WorkerInPath::runWorkerAsync().

1100  {
1101  if (not workerhelper::CallImpl<T>::wantsTransition(this)) {
1102  return;
1103  }
1104 
1105  //Need to check workStarted_ before adding to waitingTasks_
1106  bool expected = false;
1107  auto workStarted = workStarted_.compare_exchange_strong(expected, true);
1108 
1110  if (workStarted) {
1111  ServiceWeakToken weakToken = serviceToken;
1112  auto toDo = [this, info = transitionInfo, streamID, parentContext, context, weakToken]() {
1113  std::exception_ptr exceptionPtr;
1114  // Caught exception is propagated via WaitingTaskList
1115  CMS_SA_ALLOW try {
1116  //Need to make the services available
1117  ServiceRegistry::Operate guard(weakToken.lock());
1118 
1119  this->runModule<T>(info, streamID, parentContext, context);
1120  } catch (...) {
1121  exceptionPtr = std::current_exception();
1122  }
1123  this->waitingTasks_.doneWaiting(exceptionPtr);
1124  };
1125 
1126  if (needsESPrefetching(T::transition_)) {
1127  auto group = task.group();
1128  auto afterPrefetch =
1129  edm::make_waiting_task([toDo = std::move(toDo), group, this](std::exception_ptr const* iExcept) {
1130  if (iExcept) {
1131  this->waitingTasks_.doneWaiting(*iExcept);
1132  } else {
1133  if (auto queue = this->serializeRunModule()) {
1134  queue.push(*group, toDo);
1135  } else {
1136  group->run(toDo);
1137  }
1138  }
1139  });
1142  WaitingTaskHolder(*group, afterPrefetch), transitionInfo.eventSetupImpl(), T::transition_, serviceToken);
1143  } else {
1144  auto group = task.group();
1145  if (auto queue = this->serializeRunModule()) {
1146  queue.push(*group, toDo);
1147  } else {
1148  group->run(toDo);
1149  }
1150  }
1151  }
1152  }
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
std::atomic< bool > workStarted_
Definition: Worker.h:623
bool needsESPrefetching(Transition iTrans) const noexcept
Definition: Worker.h:368
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:613
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:622
void add(oneapi::tbb::task_group *, WaitingTask *)
Adds task to the waiting list.
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
void esPrefetchAsync(WaitingTaskHolder, EventSetupImpl const &, Transition, ServiceToken const &)
Definition: Worker.cc:204
virtual TaskQueueAdaptor serializeRunModule()=0
def move(src, dest)
Definition: eostools.py:511

◆ edPrefetchAsync()

void edm::Worker::edPrefetchAsync ( WaitingTaskHolder  iTask,
ServiceToken const &  token,
Principal const &  iPrincipal 
) const
private

Definition at line 229 of file Worker.cc.

References edm::Principal::branchType(), B2GTnPMonitor_cfi::item, mps_monitormerge::items, itemsToGetFrom(), moduleCallingContext_, edm::Principal::prefetchAsync(), edm::ProductResolverIndexAmbiguous, and unpackBuffers-CaloStage2::token.

Referenced by prefetchAsync().

229  {
230  // Prefetch products the module declares it consumes
231  std::vector<ProductResolverIndexAndSkipBit> const& items = itemsToGetFrom(iPrincipal.branchType());
232 
233  for (auto const& item : items) {
234  ProductResolverIndex productResolverIndex = item.productResolverIndex();
235  bool skipCurrentProcess = item.skipCurrentProcess();
236  if (productResolverIndex != ProductResolverIndexAmbiguous) {
237  iPrincipal.prefetchAsync(iTask, productResolverIndex, skipCurrentProcess, token, &moduleCallingContext_);
238  }
239  }
240  }
unsigned int ProductResolverIndex
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom(BranchType) const =0
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:613

◆ emitPostModuleEventPrefetchingSignal()

void edm::Worker::emitPostModuleEventPrefetchingSignal ( )
inlineprivate

Definition at line 372 of file Worker.h.

References actReg_, edm::ModuleCallingContext::getStreamContext(), and moduleCallingContext_.

Referenced by edm::Worker::RunModuleTask< T >::execute().

372  {
373  actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
374  }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:618
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:613
StreamContext const * getStreamContext() const

◆ emitPostModuleGlobalPrefetchingSignal()

void edm::Worker::emitPostModuleGlobalPrefetchingSignal ( )
inlineprivate

Definition at line 381 of file Worker.h.

References actReg_, edm::ModuleCallingContext::getGlobalContext(), and moduleCallingContext_.

Referenced by edm::Worker::RunModuleTask< T >::execute().

381  {
382  actReg_->postModuleGlobalPrefetchingSignal_.emit(*moduleCallingContext_.getGlobalContext(),
384  }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:618
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:613
GlobalContext const * getGlobalContext() const

◆ emitPostModuleStreamPrefetchingSignal()

void edm::Worker::emitPostModuleStreamPrefetchingSignal ( )
inlineprivate

Definition at line 376 of file Worker.h.

References actReg_, edm::ModuleCallingContext::getStreamContext(), and moduleCallingContext_.

Referenced by edm::Worker::RunModuleTask< T >::execute().

376  {
377  actReg_->postModuleStreamPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),
379  }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:618
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:613
StreamContext const * getStreamContext() const

◆ endJob()

void edm::Worker::endJob ( void  )

Definition at line 296 of file Worker.cc.

References actReg_, cms::Exception::addContext(), cms::cuda::assert(), submitPVResolutionJobs::desc, description(), Exception, implEndJob(), edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), state_, and edm::convertException::wrap().

296  {
297  try {
298  convertException::wrap([&]() {
299  ModuleDescription const* desc = description();
300  assert(desc != nullptr);
301  ModuleEndJobSignalSentry cpp(actReg_.get(), *desc);
302  implEndJob();
303  });
304  } catch (cms::Exception& ex) {
305  state_ = Exception;
306  std::ostringstream ost;
307  ost << "Calling endJob for module " << description()->moduleName() << "/'" << description()->moduleLabel() << "'";
308  ex.addContext(ost.str());
309  throw;
310  }
311  }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:618
std::string const & moduleName() const
assert(be >=bs)
std::atomic< State > state_
Definition: Worker.h:609
virtual void implEndJob()=0
ModuleDescription const * description() const
Definition: Worker.h:198
void addContext(std::string const &context)
Definition: Exception.cc:169
auto wrap(F iFunc) -> decltype(iFunc())
std::string const & moduleLabel() const

◆ endStream()

void edm::Worker::endStream ( StreamID  id,
StreamContext streamContext 
)

Definition at line 337 of file Worker.cc.

References actReg_, cms::Exception::addContext(), description(), Exception, implEndStream(), edm::LuminosityBlockIndex::invalidLuminosityBlockIndex(), edm::RunIndex::invalidRunIndex(), edm::StreamContext::kEndStream, edm::ModuleCallingContext::kRunning, moduleCallingContext_, edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), edm::StreamContext::setEventID(), edm::StreamContext::setLuminosityBlockIndex(), edm::StreamContext::setRunIndex(), edm::ModuleCallingContext::setState(), edm::StreamContext::setTimestamp(), edm::StreamContext::setTransition(), state_, and edm::convertException::wrap().

337  {
338  try {
339  convertException::wrap([&]() {
340  streamContext.setTransition(StreamContext::Transition::kEndStream);
341  streamContext.setEventID(EventID(0, 0, 0));
342  streamContext.setRunIndex(RunIndex::invalidRunIndex());
343  streamContext.setLuminosityBlockIndex(LuminosityBlockIndex::invalidLuminosityBlockIndex());
344  streamContext.setTimestamp(Timestamp());
345  ParentContext parentContext(&streamContext);
346  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
348  ModuleEndStreamSignalSentry endSentry(actReg_.get(), streamContext, moduleCallingContext_);
349  implEndStream(id);
350  });
351  } catch (cms::Exception& ex) {
352  state_ = Exception;
353  std::ostringstream ost;
354  ost << "Calling endStream for module " << description()->moduleName() << "/'" << description()->moduleLabel()
355  << "'";
356  ex.addContext(ost.str());
357  throw;
358  }
359  }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:618
std::string const & moduleName() const
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:613
static RunIndex invalidRunIndex()
Definition: RunIndex.cc:9
std::atomic< State > state_
Definition: Worker.h:609
static LuminosityBlockIndex invalidLuminosityBlockIndex()
ModuleDescription const * description() const
Definition: Worker.h:198
void addContext(std::string const &context)
Definition: Exception.cc:169
virtual void implEndStream(StreamID)=0
auto wrap(F iFunc) -> decltype(iFunc())
std::string const & moduleLabel() const

◆ esItemsToGetFrom()

virtual std::vector<ESResolverIndex> const& edm::Worker::esItemsToGetFrom ( Transition  ) const
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by esPrefetchAsync(), and needsESPrefetching().

◆ esPrefetchAsync()

void edm::Worker::esPrefetchAsync ( WaitingTaskHolder  iTask,
EventSetupImpl const &  iImpl,
Transition  iTrans,
ServiceToken const &  iToken 
)
private

Definition at line 204 of file Worker.cc.

References cms::cuda::assert(), esItemsToGetFrom(), esRecordsToGetFrom(), edm::EventSetupImpl::findImpl(), mps_fire::i, mps_monitormerge::items, moduleCallingContext_, edm::NumberOfEventSetupTransitions, and edm::eventsetup::EventSetupRecordImpl::prefetchAsync().

Referenced by doWorkNoPrefetchingAsync(), edm::workerhelper::CallImpl< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > >::esPrefetchAsync(), edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > >::esPrefetchAsync(), edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > >::esPrefetchAsync(), edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > >::esPrefetchAsync(), edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > >::esPrefetchAsync(), edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin > >::esPrefetchAsync(), edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin > >::esPrefetchAsync(), edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd > >::esPrefetchAsync(), and edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd > >::esPrefetchAsync().

207  {
209  return;
210  }
211  auto const& recs = esRecordsToGetFrom(iTrans);
212  auto const& items = esItemsToGetFrom(iTrans);
213 
214  assert(items.size() == recs.size());
215  if (items.empty()) {
216  return;
217  }
218 
219  for (size_t i = 0; i != items.size(); ++i) {
220  if (recs[i] != ESRecordIndex{}) {
221  auto rec = iImpl.findImpl(recs[i]);
222  if (rec) {
223  rec->prefetchAsync(iTask, items[i], &iImpl, iToken, ESParentContext(&moduleCallingContext_));
224  }
225  }
226  }
227  }
assert(be >=bs)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:613
virtual std::vector< ESRecordIndex > const & esRecordsToGetFrom(Transition) const =0
virtual std::vector< ESResolverIndex > const & esItemsToGetFrom(Transition) const =0

◆ esRecordsToGetFrom()

virtual std::vector<ESRecordIndex> const& edm::Worker::esRecordsToGetFrom ( Transition  ) const
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by esPrefetchAsync().

◆ globalLuminosityBlocksQueue()

virtual SerialTaskQueue* edm::Worker::globalLuminosityBlocksQueue ( )
pure virtual

◆ globalRunsQueue()

virtual SerialTaskQueue* edm::Worker::globalRunsQueue ( )
pure virtual

◆ handleExternalWorkException()

std::exception_ptr edm::Worker::handleExternalWorkException ( std::exception_ptr  iEPtr,
ParentContext const &  parentContext 
)
private

Definition at line 424 of file Worker.cc.

References edm::exceptionContext(), moduleCallingContext_, ranAcquireWithoutException_, and edm::convertException::wrap().

424  {
426  try {
427  convertException::wrap([iEPtr]() { std::rethrow_exception(iEPtr); });
428  } catch (cms::Exception& ex) {
429  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
431  return std::current_exception();
432  }
433  }
434  return iEPtr;
435  }
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
bool ranAcquireWithoutException_
Definition: Worker.h:624
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:613
auto wrap(F iFunc) -> decltype(iFunc())

◆ hasAccumulator()

virtual bool edm::Worker::hasAccumulator ( ) const
pure virtual

◆ hasAcquire()

virtual bool edm::Worker::hasAcquire ( ) const
privatepure virtual

◆ implBeginJob()

virtual void edm::Worker::implBeginJob ( )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by beginJob().

◆ implBeginStream()

virtual void edm::Worker::implBeginStream ( StreamID  )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by beginStream().

◆ implDo()

virtual bool edm::Worker::implDo ( EventTransitionInfo const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

◆ implDoAccessInputProcessBlock()

virtual bool edm::Worker::implDoAccessInputProcessBlock ( ProcessBlockPrincipal const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

◆ implDoAcquire()

virtual void edm::Worker::implDoAcquire ( EventTransitionInfo const &  ,
ModuleCallingContext const *  ,
WaitingTaskWithArenaHolder  
)
protectedpure virtual

◆ implDoBegin() [1/2]

virtual bool edm::Worker::implDoBegin ( RunTransitionInfo const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

◆ implDoBegin() [2/2]

virtual bool edm::Worker::implDoBegin ( LumiTransitionInfo const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

◆ implDoBeginProcessBlock()

virtual bool edm::Worker::implDoBeginProcessBlock ( ProcessBlockPrincipal const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

◆ implDoEnd() [1/2]

virtual bool edm::Worker::implDoEnd ( RunTransitionInfo const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

◆ implDoEnd() [2/2]

virtual bool edm::Worker::implDoEnd ( LumiTransitionInfo const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

◆ implDoEndProcessBlock()

virtual bool edm::Worker::implDoEndProcessBlock ( ProcessBlockPrincipal const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

◆ implDoPrePrefetchSelection()

virtual bool edm::Worker::implDoPrePrefetchSelection ( StreamID  ,
EventPrincipal const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

◆ implDoStreamBegin() [1/2]

virtual bool edm::Worker::implDoStreamBegin ( StreamID  ,
RunTransitionInfo const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

◆ implDoStreamBegin() [2/2]

virtual bool edm::Worker::implDoStreamBegin ( StreamID  ,
LumiTransitionInfo const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

◆ implDoStreamEnd() [1/2]

virtual bool edm::Worker::implDoStreamEnd ( StreamID  ,
RunTransitionInfo const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

◆ implDoStreamEnd() [2/2]

virtual bool edm::Worker::implDoStreamEnd ( StreamID  ,
LumiTransitionInfo const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

◆ implDoTransformAsync()

virtual void edm::Worker::implDoTransformAsync ( WaitingTaskHolder  ,
size_t  iTransformIndex,
EventPrincipal const &  ,
ParentContext const &  ,
ServiceWeakToken const &   
)
protectedpure virtual

◆ implEndJob()

virtual void edm::Worker::implEndJob ( )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by endJob().

◆ implEndStream()

virtual void edm::Worker::implEndStream ( StreamID  )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by endStream().

◆ implNeedToRunSelection()

virtual bool edm::Worker::implNeedToRunSelection ( ) const
protectedpure virtual

◆ implRegisterThinnedAssociations()

virtual void edm::Worker::implRegisterThinnedAssociations ( ProductRegistry const &  ,
ThinnedAssociationsHelper  
)
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by registerThinnedAssociations().

◆ implRespondToCloseInputFile()

virtual void edm::Worker::implRespondToCloseInputFile ( FileBlock const &  fb)
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by respondToCloseInputFile().

◆ implRespondToCloseOutputFile()

virtual void edm::Worker::implRespondToCloseOutputFile ( )
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by respondToCloseOutputFile().

◆ implRespondToOpenInputFile()

virtual void edm::Worker::implRespondToOpenInputFile ( FileBlock const &  fb)
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by respondToOpenInputFile().

◆ itemsMayGet()

virtual void edm::Worker::itemsMayGet ( BranchType  ,
std::vector< ProductResolverIndexAndSkipBit > &   
) const
privatepure virtual

Implemented in edm::WorkerT< T >.

◆ itemsToGet()

virtual void edm::Worker::itemsToGet ( BranchType  ,
std::vector< ProductResolverIndexAndSkipBit > &   
) const
privatepure virtual

Implemented in edm::WorkerT< T >.

◆ itemsToGetForSelection()

virtual void edm::Worker::itemsToGetForSelection ( std::vector< ProductResolverIndexAndSkipBit > &  ) const
protectedpure virtual

◆ itemsToGetFrom()

virtual std::vector<ProductResolverIndexAndSkipBit> const& edm::Worker::itemsToGetFrom ( BranchType  ) const
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by edPrefetchAsync().

◆ itemToGetForTransform()

virtual ProductResolverIndex edm::Worker::itemToGetForTransform ( size_t  iTransformIndex) const
protectedpure virtual

◆ moduleConcurrencyType()

virtual ConcurrencyTypes edm::Worker::moduleConcurrencyType ( ) const
pure virtual

◆ modulesWhoseProductsAreConsumed()

virtual void edm::Worker::modulesWhoseProductsAreConsumed ( std::array< std::vector< ModuleDescription const *> *, NumBranchTypes > &  modules,
std::vector< ModuleProcessName > &  modulesInPreviousProcesses,
ProductRegistry const &  preg,
std::map< std::string, ModuleDescription const *> const &  labelsToDesc 
) const
pure virtual

Implemented in edm::WorkerT< T >.

◆ moduleType()

virtual Types edm::Worker::moduleType ( ) const
pure virtual

◆ needsESPrefetching()

bool edm::Worker::needsESPrefetching ( Transition  iTrans) const
inlineprivatenoexcept

Definition at line 368 of file Worker.h.

References esItemsToGetFrom(), and edm::NumberOfEventSetupTransitions.

Referenced by doWorkNoPrefetchingAsync().

368  {
369  return iTrans < edm::Transition::NumberOfEventSetupTransitions ? not esItemsToGetFrom(iTrans).empty() : false;
370  }
virtual std::vector< ESResolverIndex > const & esItemsToGetFrom(Transition) const =0

◆ operator=()

Worker& edm::Worker::operator= ( Worker const &  )
delete

◆ postDoEvent()

void edm::Worker::postDoEvent ( EventPrincipal const &  iEvent)

Definition at line 379 of file Worker.cc.

References earlyDeleteHelper_, and iEvent.

379  {
380  if (earlyDeleteHelper_) {
381  earlyDeleteHelper_->moduleRan(iEvent);
382  }
383  }
int iEvent
Definition: GenABIO.cc:224
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:620

◆ preActionBeforeRunEventAsync()

virtual void edm::Worker::preActionBeforeRunEventAsync ( WaitingTaskHolder  iTask,
ModuleCallingContext const &  moduleCallingContext,
Principal const &  iPrincipal 
) const
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by prefetchAsync().

◆ prefetchAsync()

template<typename T >
void edm::Worker::prefetchAsync ( WaitingTaskHolder  iTask,
ServiceToken const &  token,
ParentContext const &  parentContext,
typename T::TransitionInfoType const &  transitionInfo,
Transition  iTransition 
)
private

Definition at line 947 of file Worker.h.

References actReg_, edm::Principal::branchType(), edPrefetchAsync(), edm::ModuleCallingContext::getGlobalContext(), edm::ModuleCallingContext::getStreamContext(), edm::InEvent, edm::ModuleCallingContext::kPrefetching, moduleCallingContext_, preActionBeforeRunEventAsync(), edm::ModuleCallingContext::setContext(), and unpackBuffers-CaloStage2::token.

951  {
952  Principal const& principal = transitionInfo.principal();
953 
955 
956  if constexpr (T::isEvent_) {
957  actReg_->preModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
958  } else if constexpr (std::is_same_v<typename T::Context, StreamContext>) {
959  actReg_->preModuleStreamPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
960  } else if constexpr (std::is_same_v<typename T::Context, GlobalContext>) {
961  actReg_->preModuleGlobalPrefetchingSignal_.emit(*moduleCallingContext_.getGlobalContext(), moduleCallingContext_);
962  }
963 
964  workerhelper::CallImpl<T>::esPrefetchAsync(this, iTask, token, transitionInfo, iTransition);
965  edPrefetchAsync(iTask, token, principal);
966 
967  if (principal.branchType() == InEvent) {
969  }
970  }
virtual void preActionBeforeRunEventAsync(WaitingTaskHolder iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const =0
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:618
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:613
void edPrefetchAsync(WaitingTaskHolder, ServiceToken const &, Principal const &) const
Definition: Worker.cc:229
StreamContext const * getStreamContext() const
GlobalContext const * getGlobalContext() const

◆ prePrefetchSelectionAsync() [1/2]

void edm::Worker::prePrefetchSelectionAsync ( oneapi::tbb::task_group &  group,
WaitingTask task,
ServiceToken const &  token,
StreamID  stream,
EventPrincipal const *  iPrincipal 
)

Definition at line 155 of file Worker.cc.

References CMS_SA_ALLOW, edm::TaskBase::decrement_ref_count(), edm::WaitingTaskList::doneWaiting(), edm::TaskBase::execute(), watchdog::group, implDoPrePrefetchSelection(), edm::TaskBase::increment_ref_count(), B2GTnPMonitor_cfi::item, mps_monitormerge::items, itemsToGetForSelection(), edm::ServiceWeakToken::lock(), edm::make_waiting_task(), moduleCallingContext_, edm::Principal::prefetchAsync(), edm::ProductResolverIndexAmbiguous, alignCSCRings::s, timesRun_, unpackBuffers-CaloStage2::token, and waitingTasks_.

Referenced by doWorkAsync().

159  {
160  successTask->increment_ref_count();
161 
162  ServiceWeakToken weakToken = token;
163  auto choiceTask =
164  edm::make_waiting_task([id, successTask, iPrincipal, this, weakToken, &group](std::exception_ptr const*) {
165  ServiceRegistry::Operate guard(weakToken.lock());
166  // There is no reasonable place to rethrow, and implDoPrePrefetchSelection() should not throw in the first place.
167  CMS_SA_ALLOW try {
168  if (not implDoPrePrefetchSelection(id, *iPrincipal, &moduleCallingContext_)) {
169  timesRun_.fetch_add(1, std::memory_order_relaxed);
170  setPassed<true>();
171  waitingTasks_.doneWaiting(nullptr);
172  //TBB requires that destroyed tasks have count 0
173  if (0 == successTask->decrement_ref_count()) {
174  TaskSentry s(successTask);
175  }
176  return;
177  }
178  } catch (...) {
179  }
180  if (0 == successTask->decrement_ref_count()) {
181  group.run([successTask]() {
182  TaskSentry s(successTask);
183  successTask->execute();
184  });
185  }
186  });
187 
188  WaitingTaskHolder choiceHolder{group, choiceTask};
189 
190  std::vector<ProductResolverIndexAndSkipBit> items;
192 
193  for (auto const& item : items) {
194  ProductResolverIndex productResolverIndex = item.productResolverIndex();
195  bool skipCurrentProcess = item.skipCurrentProcess();
196  if (productResolverIndex != ProductResolverIndexAmbiguous) {
197  iPrincipal->prefetchAsync(
198  choiceHolder, productResolverIndex, skipCurrentProcess, token, &moduleCallingContext_);
199  }
200  }
201  choiceHolder.doneWaiting(std::exception_ptr{});
202  }
#define CMS_SA_ALLOW
unsigned int ProductResolverIndex
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:613
virtual bool implDoPrePrefetchSelection(StreamID, EventPrincipal const &, ModuleCallingContext const *)=0
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:622
virtual void itemsToGetForSelection(std::vector< ProductResolverIndexAndSkipBit > &) const =0
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
std::atomic< int > timesRun_
Definition: Worker.h:604

◆ prePrefetchSelectionAsync() [2/2]

void edm::Worker::prePrefetchSelectionAsync ( oneapi::tbb::task_group &  ,
WaitingTask task,
ServiceToken const &  ,
StreamID  stream,
void const *   
)
inline

Definition at line 141 of file Worker.h.

References cms::cuda::assert().

142  {
143  assert(false);
144  }
assert(be >=bs)

◆ registerThinnedAssociations()

void edm::Worker::registerThinnedAssociations ( ProductRegistry const &  registry,
ThinnedAssociationsHelper helper 
)

Definition at line 361 of file Worker.cc.

References cms::Exception::addContext(), description(), implRegisterThinnedAssociations(), and HerwigMaxPtPartonFilter_cfi::moduleLabel.

361  {
362  try {
364  } catch (cms::Exception& ex) {
365  ex.addContext("Calling registerThinnedAssociations() for module " + description()->moduleLabel());
366  throw ex;
367  }
368  }
Definition: helper.py:1
virtual void implRegisterThinnedAssociations(ProductRegistry const &, ThinnedAssociationsHelper &)=0
ModuleDescription const * description() const
Definition: Worker.h:198
void addContext(std::string const &context)
Definition: Exception.cc:169

◆ reset()

void edm::Worker::reset ( void  )
inline

Definition at line 188 of file Worker.h.

References cached_exception_, numberOfPathsLeftToRun_, numberOfPathsOn_, Ready, edm::WaitingTaskList::reset(), state_, waitingTasks_, and workStarted_.

Referenced by edm::WorkerManager::resetAll().

188  {
189  cached_exception_ = std::exception_ptr();
190  state_ = Ready;
192  workStarted_ = false;
194  }
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:611
std::atomic< bool > workStarted_
Definition: Worker.h:623
void reset()
Resets access to the resource so that added tasks will wait.
std::exception_ptr cached_exception_
Definition: Worker.h:616
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:622
std::atomic< State > state_
Definition: Worker.h:609
int numberOfPathsOn_
Definition: Worker.h:610

◆ resetModuleDescription()

void edm::Worker::resetModuleDescription ( ModuleDescription const *  iDesc)
protected

Definition at line 270 of file Worker.cc.

References cms::cuda::assert(), checkForShouldTryToContinue(), moduleCallingContext_, edm::ModuleCallingContext::parent(), edm::ModuleCallingContext::previousModuleOnThread(), edm::ModuleCallingContext::state(), and groupFilesInBlocks::temp.

Referenced by edm::WorkerT< T >::setModule().

270  {
271  ModuleCallingContext temp(iDesc,
276  assert(iDesc);
278  }
ModuleCallingContext const * previousModuleOnThread() const
assert(be >=bs)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:613
void checkForShouldTryToContinue(ModuleDescription const &)
Definition: Worker.cc:113
ParentContext const & parent() const

◆ resolvePutIndicies()

virtual void edm::Worker::resolvePutIndicies ( BranchType  iBranchType,
std::unordered_multimap< std::string, std::tuple< TypeID const *, const char *, edm::ProductResolverIndex >> const &  iIndicies 
)
pure virtual

Implemented in edm::WorkerT< T >.

◆ respondToCloseInputFile()

void edm::Worker::respondToCloseInputFile ( FileBlock const &  fb)
inline

Definition at line 184 of file Worker.h.

References implRespondToCloseInputFile().

Referenced by edm::Schedule::respondToCloseInputFile().

virtual void implRespondToCloseInputFile(FileBlock const &fb)=0

◆ respondToCloseOutputFile()

void edm::Worker::respondToCloseOutputFile ( )
inline

Definition at line 185 of file Worker.h.

References implRespondToCloseOutputFile().

virtual void implRespondToCloseOutputFile()=0

◆ respondToOpenInputFile()

void edm::Worker::respondToOpenInputFile ( FileBlock const &  fb)
inline

Definition at line 183 of file Worker.h.

References implRespondToOpenInputFile().

Referenced by edm::Schedule::respondToOpenInputFile().

virtual void implRespondToOpenInputFile(FileBlock const &fb)=0

◆ runAcquire()

void edm::Worker::runAcquire ( EventTransitionInfo const &  info,
ParentContext const &  parentContext,
WaitingTaskWithArenaHolder holder 
)
private

Definition at line 385 of file Worker.cc.

References edm::exceptionContext(), implDoAcquire(), moduleCallingContext_, shouldRethrowException(), shouldTryToContinue_, timesRun_, and edm::convertException::wrap().

Referenced by runAcquireAfterAsyncPrefetch().

387  {
388  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
389  try {
390  convertException::wrap([&]() { this->implDoAcquire(info, &moduleCallingContext_, holder); });
391  } catch (cms::Exception& ex) {
393  if (shouldRethrowException(std::current_exception(), parentContext, true, shouldTryToContinue_)) {
394  timesRun_.fetch_add(1, std::memory_order_relaxed);
395  throw;
396  }
397  }
398  }
virtual void implDoAcquire(EventTransitionInfo const &, ModuleCallingContext const *, WaitingTaskWithArenaHolder &)=0
static const TGPicture * info(bool iBackgroundIsBlack)
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:613
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, bool isTryToContinue) const
Definition: Worker.cc:120
auto wrap(F iFunc) -> decltype(iFunc())
std::atomic< int > timesRun_
Definition: Worker.h:604
bool shouldTryToContinue_
Definition: Worker.h:626

◆ runAcquireAfterAsyncPrefetch()

void edm::Worker::runAcquireAfterAsyncPrefetch ( std::exception_ptr  iEPtr,
EventTransitionInfo const &  eventTransitionInfo,
ParentContext const &  parentContext,
WaitingTaskWithArenaHolder  holder 
)
private

Definition at line 400 of file Worker.cc.

References CMS_SA_ALLOW, edm::WaitingTaskWithArenaHolder::doneWaiting(), edm::ModuleCallingContext::kInvalid, moduleCallingContext_, ranAcquireWithoutException_, runAcquire(), edm::ModuleCallingContext::setContext(), shouldRethrowException(), and shouldTryToContinue_.

403  {
405  std::exception_ptr exceptionPtr;
406  if (iEPtr) {
407  if (shouldRethrowException(iEPtr, parentContext, true, shouldTryToContinue_)) {
408  exceptionPtr = iEPtr;
409  }
411  } else {
412  // Caught exception is propagated via WaitingTaskWithArenaHolder
413  CMS_SA_ALLOW try {
414  runAcquire(eventTransitionInfo, parentContext, holder);
416  } catch (...) {
417  exceptionPtr = std::current_exception();
418  }
419  }
420  // It is important this is after runAcquire completely finishes
421  holder.doneWaiting(exceptionPtr);
422  }
#define CMS_SA_ALLOW
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
bool ranAcquireWithoutException_
Definition: Worker.h:624
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:613
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, bool isTryToContinue) const
Definition: Worker.cc:120
void runAcquire(EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder &)
Definition: Worker.cc:385
bool shouldTryToContinue_
Definition: Worker.h:626

◆ runModule()

template<typename T >
bool edm::Worker::runModule ( typename T::TransitionInfoType const &  transitionInfo,
StreamID  streamID,
ParentContext const &  parentContext,
typename T::Context const *  context 
)
private

Definition at line 1155 of file Worker.h.

References actReg_, cms::cuda::assert(), cached_exception_, visDQMUpload::context, edm::exceptionContext(), moduleCallingContext_, shouldRethrowException(), shouldTryToContinue_, timesRun_, and edm::convertException::wrap().

1158  {
1159  //unscheduled producers should advance this
1160  //if (T::isEvent_) {
1161  // ++timesVisited_;
1162  //}
1163  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
1164  if constexpr (T::isEvent_) {
1165  timesRun_.fetch_add(1, std::memory_order_relaxed);
1166  }
1167 
1168  bool rc = true;
1169  try {
1170  convertException::wrap([&]() {
1171  rc = workerhelper::CallImpl<T>::call(
1172  this, streamID, transitionInfo, actReg_.get(), &moduleCallingContext_, context);
1173 
1174  if (rc) {
1175  setPassed<T::isEvent_>();
1176  } else {
1177  setFailed<T::isEvent_>();
1178  }
1179  });
1180  } catch (cms::Exception& ex) {
1182  if (shouldRethrowException(std::current_exception(), parentContext, T::isEvent_, shouldTryToContinue_)) {
1184  setException<T::isEvent_>(std::current_exception());
1185  std::rethrow_exception(cached_exception_);
1186  } else {
1187  rc = setPassed<T::isEvent_>();
1188  }
1189  }
1190 
1191  return rc;
1192  }
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:618
assert(be >=bs)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:613
std::exception_ptr cached_exception_
Definition: Worker.h:616
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, bool isTryToContinue) const
Definition: Worker.cc:120
auto wrap(F iFunc) -> decltype(iFunc())
std::atomic< int > timesRun_
Definition: Worker.h:604
bool shouldTryToContinue_
Definition: Worker.h:626

◆ runModuleAfterAsyncPrefetch()

template<typename T >
std::exception_ptr edm::Worker::runModuleAfterAsyncPrefetch ( std::exception_ptr  iEPtr,
typename T::TransitionInfoType const &  transitionInfo,
StreamID  streamID,
ParentContext const &  parentContext,
typename T::Context const *  context 
)
private

Definition at line 1063 of file Worker.h.

References CMS_SA_ALLOW, visDQMUpload::context, edm::WaitingTaskList::doneWaiting(), edm::ModuleCallingContext::kInvalid, moduleCallingContext_, edm::ModuleCallingContext::setContext(), shouldRethrowException(), shouldTryToContinue_, and waitingTasks_.

Referenced by edm::Worker::RunModuleTask< T >::execute().

1067  {
1068  std::exception_ptr exceptionPtr;
1069  bool shouldRun = true;
1070  if (iEPtr) {
1071  if (shouldRethrowException(iEPtr, parentContext, T::isEvent_, shouldTryToContinue_)) {
1072  exceptionPtr = iEPtr;
1073  setException<T::isEvent_>(exceptionPtr);
1074  shouldRun = false;
1075  } else {
1076  if (not shouldTryToContinue_) {
1077  setPassed<T::isEvent_>();
1078  shouldRun = false;
1079  }
1080  }
1081  }
1082  if (shouldRun) {
1083  // Caught exception is propagated via WaitingTaskList
1084  CMS_SA_ALLOW try { runModule<T>(transitionInfo, streamID, parentContext, context); } catch (...) {
1085  exceptionPtr = std::current_exception();
1086  }
1087  } else {
1089  }
1090  waitingTasks_.doneWaiting(exceptionPtr);
1091  return exceptionPtr;
1092  }
#define CMS_SA_ALLOW
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:613
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:622
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, bool isTryToContinue) const
Definition: Worker.cc:120
bool shouldTryToContinue_
Definition: Worker.h:626

◆ runModuleDirectly()

template<typename T >
std::exception_ptr edm::Worker::runModuleDirectly ( typename T::TransitionInfoType const &  transitionInfo,
StreamID  streamID,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

Definition at line 1195 of file Worker.h.

References visDQMUpload::context, and timesVisited_.

Referenced by edm::Path::finished().

1198  {
1199  timesVisited_.fetch_add(1, std::memory_order_relaxed);
1200  std::exception_ptr prefetchingException; // null because there was no prefetching to do
1201  return runModuleAfterAsyncPrefetch<T>(prefetchingException, transitionInfo, streamID, parentContext, context);
1202  }
std::atomic< int > timesVisited_
Definition: Worker.h:605

◆ selectInputProcessBlocks()

virtual void edm::Worker::selectInputProcessBlocks ( ProductRegistry const &  ,
ProcessBlockHelperBase const &   
)
pure virtual

Implemented in edm::WorkerT< T >.

◆ serializeRunModule()

virtual TaskQueueAdaptor edm::Worker::serializeRunModule ( )
privatepure virtual

◆ setActivityRegistry()

void edm::Worker::setActivityRegistry ( std::shared_ptr< ActivityRegistry areg)

The signals are required to live longer than the last call to 'doWork' this was done to improve performance based on profiling

Definition at line 111 of file Worker.cc.

References actReg_.

111 { actReg_ = areg; }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:618

◆ setEarlyDeleteHelper()

void edm::Worker::setEarlyDeleteHelper ( EarlyDeleteHelper iHelper)

Definition at line 242 of file Worker.cc.

References earlyDeleteHelper_.

242 { earlyDeleteHelper_ = iHelper; }
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:620

◆ setException()

template<bool IS_EVENT>
std::exception_ptr edm::Worker::setException ( std::exception_ptr  iException)
inlineprivate

Definition at line 349 of file Worker.h.

References cached_exception_, Exception, state_, and timesExcept_.

349  {
350  if (IS_EVENT) {
351  timesExcept_.fetch_add(1, std::memory_order_relaxed);
352  }
353  cached_exception_ = iException; // propagate_const<T> has no reset() function
354  state_ = Exception;
355  return cached_exception_;
356  }
std::atomic< int > timesExcept_
Definition: Worker.h:608
std::exception_ptr cached_exception_
Definition: Worker.h:616
std::atomic< State > state_
Definition: Worker.h:609

◆ setFailed()

template<bool IS_EVENT>
bool edm::Worker::setFailed ( )
inlineprivate

Definition at line 340 of file Worker.h.

References Fail, state_, and timesFailed_.

340  {
341  if (IS_EVENT) {
342  timesFailed_.fetch_add(1, std::memory_order_relaxed);
343  }
344  state_ = Fail;
345  return false;
346  }
std::atomic< int > timesFailed_
Definition: Worker.h:607
std::atomic< State > state_
Definition: Worker.h:609

◆ setPassed()

template<bool IS_EVENT>
bool edm::Worker::setPassed ( )
inlineprivate

Definition at line 331 of file Worker.h.

References Pass, state_, and timesPassed_.

331  {
332  if (IS_EVENT) {
333  timesPassed_.fetch_add(1, std::memory_order_relaxed);
334  }
335  state_ = Pass;
336  return true;
337  }
std::atomic< State > state_
Definition: Worker.h:609
std::atomic< int > timesPassed_
Definition: Worker.h:606

◆ shouldRethrowException()

bool edm::Worker::shouldRethrowException ( std::exception_ptr  iPtr,
ParentContext const &  parentContext,
bool  isEvent,
bool  isTryToContinue 
) const
private

Definition at line 120 of file Worker.cc.

References writedatasetfile::action, actions_, cms::Exception::category(), edm::ExceptionToActionTable::find(), edm::exception_actions::IgnoreCompletely, edm::printCmsExceptionWarning(), edm::exception_actions::Rethrow, edm::exception_actions::TryToContinue, and edm::convertException::wrap().

Referenced by runAcquire(), runAcquireAfterAsyncPrefetch(), runModule(), and runModuleAfterAsyncPrefetch().

123  {
124  // NOTE: the warning printed as a result of ignoring or failing
125  // a module will only be printed during the full true processing
126  // pass of this module
127 
128  // Get the action corresponding to this exception. However, if processing
129  // something other than an event (e.g. run, lumi) always rethrow.
130  if (not isEvent) {
131  return true;
132  }
133  try {
134  convertException::wrap([&]() { std::rethrow_exception(iPtr); });
135  } catch (cms::Exception& ex) {
137 
139  return true;
140  }
142  if (shouldTryToContinue) {
143  edm::printCmsExceptionWarning("TryToContinue", ex);
144  }
145  return not shouldTryToContinue;
146  }
148  edm::printCmsExceptionWarning("IgnoreCompletely", ex);
149  return false;
150  }
151  }
152  return true;
153  }
std::string const & category() const
Definition: Exception.cc:147
ExceptionToActionTable const * actions_
Definition: Worker.h:615
exception_actions::ActionCodes find(const std::string &category) const
auto wrap(F iFunc) -> decltype(iFunc())
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)

◆ skipOnPath()

void edm::Worker::skipOnPath ( EventPrincipal const &  iEvent)

Definition at line 370 of file Worker.cc.

References cached_exception_, edm::WaitingTaskList::doneWaiting(), earlyDeleteHelper_, iEvent, numberOfPathsLeftToRun_, and waitingTasks_.

Referenced by edm::WorkerInPath::skipWorker().

370  {
371  if (earlyDeleteHelper_) {
372  earlyDeleteHelper_->pathFinished(iEvent);
373  }
374  if (0 == --numberOfPathsLeftToRun_) {
376  }
377  }
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:611
std::exception_ptr cached_exception_
Definition: Worker.h:616
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
int iEvent
Definition: GenABIO.cc:224
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:622
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:620

◆ state()

State edm::Worker::state ( ) const
inline

Definition at line 247 of file Worker.h.

References state_.

Referenced by edm::WorkerInPath::checkResultsOfRunWorker().

247 { return state_; }
std::atomic< State > state_
Definition: Worker.h:609

◆ timesExcept()

int edm::Worker::timesExcept ( ) const
inline

Definition at line 246 of file Worker.h.

References timesExcept_.

246 { return timesExcept_.load(std::memory_order_acquire); }
std::atomic< int > timesExcept_
Definition: Worker.h:608

◆ timesFailed()

int edm::Worker::timesFailed ( ) const
inline

Definition at line 245 of file Worker.h.

References timesFailed_.

245 { return timesFailed_.load(std::memory_order_acquire); }
std::atomic< int > timesFailed_
Definition: Worker.h:607

◆ timesPass()

int edm::Worker::timesPass ( ) const
inline

Definition at line 249 of file Worker.h.

References timesPassed().

249 { return timesPassed(); } // for backward compatibility only - to be removed soon
int timesPassed() const
Definition: Worker.h:244

◆ timesPassed()

int edm::Worker::timesPassed ( ) const
inline

Definition at line 244 of file Worker.h.

References timesPassed_.

Referenced by timesPass().

244 { return timesPassed_.load(std::memory_order_acquire); }
std::atomic< int > timesPassed_
Definition: Worker.h:606

◆ timesRun()

int edm::Worker::timesRun ( ) const
inline

Definition at line 242 of file Worker.h.

References timesRun_.

242 { return timesRun_.load(std::memory_order_acquire); }
std::atomic< int > timesRun_
Definition: Worker.h:604

◆ timesVisited()

int edm::Worker::timesVisited ( ) const
inline

Definition at line 243 of file Worker.h.

References timesVisited_.

243 { return timesVisited_.load(std::memory_order_acquire); }
std::atomic< int > timesVisited_
Definition: Worker.h:605

◆ transformIndex()

size_t edm::Worker::transformIndex ( edm::BranchDescription const &  ) const
pure virtual

◆ updateLookup() [1/2]

virtual void edm::Worker::updateLookup ( BranchType  iBranchType,
ProductResolverIndexHelper const &   
)
pure virtual

Implemented in edm::WorkerT< T >.

◆ updateLookup() [2/2]

virtual void edm::Worker::updateLookup ( eventsetup::ESRecordsToProductResolverIndices const &  )
pure virtual

Implemented in edm::WorkerT< T >.

◆ waitingTaskList()

edm::WaitingTaskList& edm::Worker::waitingTaskList ( )
inline

Definition at line 254 of file Worker.h.

References waitingTasks_.

254 { return waitingTasks_; }
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:622

◆ wantsGlobalLuminosityBlocks()

virtual bool edm::Worker::wantsGlobalLuminosityBlocks ( ) const
pure virtual

◆ wantsGlobalRuns()

virtual bool edm::Worker::wantsGlobalRuns ( ) const
pure virtual

◆ wantsInputProcessBlocks()

virtual bool edm::Worker::wantsInputProcessBlocks ( ) const
pure virtual

◆ wantsProcessBlocks()

virtual bool edm::Worker::wantsProcessBlocks ( ) const
pure virtual

◆ wantsStreamLuminosityBlocks()

virtual bool edm::Worker::wantsStreamLuminosityBlocks ( ) const
pure virtual

◆ wantsStreamRuns()

virtual bool edm::Worker::wantsStreamRuns ( ) const
pure virtual

◆ workerType()

virtual std::string edm::Worker::workerType ( ) const
protectedpure virtual

Implemented in edm::WorkerT< T >.

Friends And Related Function Documentation

◆ workerhelper::CallImpl

template<typename O >
friend class workerhelper::CallImpl
friend

Definition at line 258 of file Worker.h.

Member Data Documentation

◆ actions_

ExceptionToActionTable const* edm::Worker::actions_
private

Definition at line 615 of file Worker.h.

Referenced by shouldRethrowException().

◆ actReg_

std::shared_ptr<ActivityRegistry> edm::Worker::actReg_
private

◆ cached_exception_

std::exception_ptr edm::Worker::cached_exception_
private

Definition at line 616 of file Worker.h.

Referenced by reset(), runModule(), setException(), and skipOnPath().

◆ earlyDeleteHelper_

edm::propagate_const<EarlyDeleteHelper*> edm::Worker::earlyDeleteHelper_
private

Definition at line 620 of file Worker.h.

Referenced by postDoEvent(), setEarlyDeleteHelper(), and skipOnPath().

◆ moduleCallingContext_

ModuleCallingContext edm::Worker::moduleCallingContext_
private

◆ moduleValid_

bool edm::Worker::moduleValid_ = true
private

Definition at line 625 of file Worker.h.

Referenced by clearModule(), and description().

◆ numberOfPathsLeftToRun_

std::atomic<int> edm::Worker::numberOfPathsLeftToRun_
private

Definition at line 611 of file Worker.h.

Referenced by reset(), and skipOnPath().

◆ numberOfPathsOn_

int edm::Worker::numberOfPathsOn_
private

Definition at line 610 of file Worker.h.

Referenced by addedToPath(), and reset().

◆ ranAcquireWithoutException_

bool edm::Worker::ranAcquireWithoutException_
private

Definition at line 624 of file Worker.h.

Referenced by handleExternalWorkException(), and runAcquireAfterAsyncPrefetch().

◆ shouldTryToContinue_

bool edm::Worker::shouldTryToContinue_ = false
private

◆ state_

std::atomic<State> edm::Worker::state_
private

◆ timesExcept_

std::atomic<int> edm::Worker::timesExcept_
private

Definition at line 608 of file Worker.h.

Referenced by clearCounters(), setException(), and timesExcept().

◆ timesFailed_

std::atomic<int> edm::Worker::timesFailed_
private

Definition at line 607 of file Worker.h.

Referenced by clearCounters(), setFailed(), and timesFailed().

◆ timesPassed_

std::atomic<int> edm::Worker::timesPassed_
private

Definition at line 606 of file Worker.h.

Referenced by clearCounters(), setPassed(), and timesPassed().

◆ timesRun_

std::atomic<int> edm::Worker::timesRun_
private

Definition at line 604 of file Worker.h.

Referenced by clearCounters(), prePrefetchSelectionAsync(), runAcquire(), runModule(), and timesRun().

◆ timesVisited_

std::atomic<int> edm::Worker::timesVisited_
private

Definition at line 605 of file Worker.h.

Referenced by clearCounters(), doWorkAsync(), runModuleDirectly(), and timesVisited().

◆ waitingTasks_

edm::WaitingTaskList edm::Worker::waitingTasks_
private

◆ workStarted_

std::atomic<bool> edm::Worker::workStarted_
private

Definition at line 623 of file Worker.h.

Referenced by doWorkAsync(), doWorkNoPrefetchingAsync(), and reset().