CMS 3D CMS Logo

List of all members | Classes | Public Types | Public Member Functions | Protected Member Functions | Private Member Functions | Static Private Member Functions | Private Attributes | Friends
edm::Worker Class Referenceabstract

#include <Worker.h>

Inheritance diagram for edm::Worker:
edm::WorkerT< T >

Classes

class  AcquireTask
 
class  AcquireTask< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin >, DUMMY >
 
class  HandleExternalWorkExceptionTask
 
class  RunModuleTask
 
struct  TaskQueueAdaptor
 

Public Types

enum  State { Ready, Pass, Fail, Exception }
 
enum  Types { kAnalyzer, kFilter, kProducer, kOutputModule }
 

Public Member Functions

void addedToPath ()
 
void beginJob ()
 
void beginStream (StreamID id, StreamContext &streamContext)
 
void callWhenDoneAsync (WaitingTask *task)
 
void clearCounters ()
 
virtual std::vector< ConsumesInfoconsumesInfo () const =0
 
virtual void convertCurrentProcessAlias (std::string const &processName)=0
 
ModuleDescription const * descPtr () const
 
ModuleDescription const & description () const
 
template<typename T >
bool doWork (typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
 
template<typename T >
void doWorkAsync (WaitingTask *, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
 
template<typename T >
void doWorkNoPrefetchingAsync (WaitingTask *, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
 
void endJob ()
 
void endStream (StreamID id, StreamContext &streamContext)
 
virtual SerialTaskQueueglobalLuminosityBlocksQueue ()=0
 
virtual SerialTaskQueueglobalRunsQueue ()=0
 
virtual bool hasAccumulator () const =0
 
virtual void modulesWhoseProductsAreConsumed (std::vector< ModuleDescription const * > &modules, ProductRegistry const &preg, std::map< std::string, ModuleDescription const * > const &labelsToDesc) const =0
 
virtual Types moduleType () const =0
 
Workeroperator= (Worker const &)=delete
 
void postDoEvent (EventPrincipal const &)
 
void prePrefetchSelectionAsync (WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *)
 
void prePrefetchSelectionAsync (WaitingTask *task, ServiceToken const &, StreamID stream, void const *)
 
void registerThinnedAssociations (ProductRegistry const &registry, ThinnedAssociationsHelper &helper)
 
void reset ()
 
virtual void resolvePutIndicies (BranchType iBranchType, std::unordered_multimap< std::string, std::tuple< TypeID const *, const char *, edm::ProductResolverIndex >> const &iIndicies)=0
 
void respondToCloseInputFile (FileBlock const &fb)
 
void respondToOpenInputFile (FileBlock const &fb)
 
template<typename T >
std::exception_ptr runModuleDirectly (typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
 
void setActivityRegistry (std::shared_ptr< ActivityRegistry > areg)
 
void setEarlyDeleteHelper (EarlyDeleteHelper *iHelper)
 
void skipOnPath (EventPrincipal const &iEvent)
 
State state () const
 
int timesExcept () const
 
int timesFailed () const
 
int timesPass () const
 
int timesPassed () const
 
int timesRun () const
 
int timesVisited () const
 
virtual void updateLookup (BranchType iBranchType, ProductResolverIndexHelper const &)=0
 
virtual void updateLookup (eventsetup::ESRecordsToProxyIndices const &)=0
 
virtual bool wantsGlobalLuminosityBlocks () const =0
 
virtual bool wantsGlobalRuns () const =0
 
virtual bool wantsInputProcessBlocks () const =0
 
virtual bool wantsProcessBlocks () const =0
 
virtual bool wantsStreamLuminosityBlocks () const =0
 
virtual bool wantsStreamRuns () const =0
 
 Worker (ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
 
 Worker (Worker const &)=delete
 
virtual ~Worker ()
 

Protected Member Functions

ActivityRegistryactivityRegistry ()
 
virtual void implBeginJob ()=0
 
virtual void implBeginStream (StreamID)=0
 
virtual bool implDo (EventTransitionInfo const &, ModuleCallingContext const *)=0
 
virtual bool implDoAccessInputProcessBlock (ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
 
virtual void implDoAcquire (EventTransitionInfo const &, ModuleCallingContext const *, WaitingTaskWithArenaHolder &)=0
 
virtual bool implDoBegin (LumiTransitionInfo const &, ModuleCallingContext const *)=0
 
virtual bool implDoBegin (RunTransitionInfo const &, ModuleCallingContext const *)=0
 
virtual bool implDoBeginProcessBlock (ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
 
virtual bool implDoEnd (LumiTransitionInfo const &, ModuleCallingContext const *)=0
 
virtual bool implDoEnd (RunTransitionInfo const &, ModuleCallingContext const *)=0
 
virtual bool implDoEndProcessBlock (ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
 
virtual bool implDoPrePrefetchSelection (StreamID, EventPrincipal const &, ModuleCallingContext const *)=0
 
virtual bool implDoStreamBegin (StreamID, LumiTransitionInfo const &, ModuleCallingContext const *)=0
 
virtual bool implDoStreamBegin (StreamID, RunTransitionInfo const &, ModuleCallingContext const *)=0
 
virtual bool implDoStreamEnd (StreamID, LumiTransitionInfo const &, ModuleCallingContext const *)=0
 
virtual bool implDoStreamEnd (StreamID, RunTransitionInfo const &, ModuleCallingContext const *)=0
 
virtual void implEndJob ()=0
 
virtual void implEndStream (StreamID)=0
 
virtual bool implNeedToRunSelection () const =0
 
virtual void itemsToGetForSelection (std::vector< ProductResolverIndexAndSkipBit > &) const =0
 
void resetModuleDescription (ModuleDescription const *)
 
virtual std::string workerType () const =0
 

Private Member Functions

void edPrefetchAsync (WaitingTask *, ServiceToken const &, Principal const &) const
 
void emitPostModuleEventPrefetchingSignal ()
 
virtual std::vector< ESProxyIndex > const & esItemsToGetFrom (Transition) const =0
 
void esPrefetchAsync (WaitingTask *, EventSetupImpl const &, Transition, ServiceToken const &)
 
virtual std::vector< ESRecordIndex > const & esRecordsToGetFrom (Transition) const =0
 
std::exception_ptr handleExternalWorkException (std::exception_ptr const *iEPtr, ParentContext const &parentContext)
 
virtual bool hasAcquire () const =0
 
virtual void implRegisterThinnedAssociations (ProductRegistry const &, ThinnedAssociationsHelper &)=0
 
virtual void implRespondToCloseInputFile (FileBlock const &fb)=0
 
virtual void implRespondToOpenInputFile (FileBlock const &fb)=0
 
virtual void itemsMayGet (BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
 
virtual std::vector< ProductResolverIndex > const & itemsShouldPutInEvent () const =0
 
virtual void itemsToGet (BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
 
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom (BranchType) const =0
 
bool needsESPrefetching (Transition iTrans) const noexcept
 
virtual void preActionBeforeRunEventAsync (WaitingTask *iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const =0
 
template<typename T >
void prefetchAsync (WaitingTask *, ServiceToken const &, ParentContext const &, typename T::TransitionInfoType const &, Transition)
 
void runAcquire (EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder &)
 
void runAcquireAfterAsyncPrefetch (std::exception_ptr const *, EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder)
 
template<typename T >
bool runModule (typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
 
template<typename T >
std::exception_ptr runModuleAfterAsyncPrefetch (std::exception_ptr const *, typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
 
virtual TaskQueueAdaptor serializeRunModule ()=0
 
template<bool IS_EVENT>
std::exception_ptr setException (std::exception_ptr iException)
 
template<bool IS_EVENT>
bool setFailed ()
 
template<bool IS_EVENT>
bool setPassed ()
 
bool shouldRethrowException (std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent) const
 

Static Private Member Functions

static void exceptionContext (cms::Exception &ex, ModuleCallingContext const *mcc)
 

Private Attributes

ExceptionToActionTable const * actions_
 
std::shared_ptr< ActivityRegistryactReg_
 
std::exception_ptr cached_exception_
 
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
 
ModuleCallingContext moduleCallingContext_
 
std::atomic< int > numberOfPathsLeftToRun_
 
int numberOfPathsOn_
 
bool ranAcquireWithoutException_
 
std::atomic< Statestate_
 
std::atomic< int > timesExcept_
 
std::atomic< int > timesFailed_
 
std::atomic< int > timesPassed_
 
std::atomic< int > timesRun_
 
std::atomic< int > timesVisited_
 
edm::WaitingTaskList waitingTasks_
 
std::atomic< bool > workStarted_
 

Friends

template<typename O >
class workerhelper::CallImpl
 

Detailed Description

Definition at line 88 of file Worker.h.

Member Enumeration Documentation

◆ State

Enumerator
Ready 
Pass 
Fail 
Exception 

Definition at line 90 of file Worker.h.

90 { Ready, Pass, Fail, Exception };

◆ Types

Enumerator
kAnalyzer 
kFilter 
kProducer 
kOutputModule 

Definition at line 91 of file Worker.h.

Constructor & Destructor Documentation

◆ Worker() [1/2]

edm::Worker::Worker ( ModuleDescription const &  iMD,
ExceptionToActionTable const *  iActions 
)

Definition at line 91 of file Worker.cc.

92  : timesRun_(0),
93  timesVisited_(0),
94  timesPassed_(0),
95  timesFailed_(0),
96  timesExcept_(0),
97  state_(Ready),
100  moduleCallingContext_(&iMD),
101  actions_(iActions),
103  actReg_(),
104  earlyDeleteHelper_(nullptr),
105  workStarted_(false),

◆ ~Worker()

edm::Worker::~Worker ( )
virtual

Definition at line 108 of file Worker.cc.

108 {}

◆ Worker() [2/2]

edm::Worker::Worker ( Worker const &  )
delete

Member Function Documentation

◆ activityRegistry()

ActivityRegistry* edm::Worker::activityRegistry ( )
inlineprotected

Definition at line 268 of file Worker.h.

268 { return actReg_.get(); }

References actReg_.

◆ addedToPath()

void edm::Worker::addedToPath ( )
inline

Definition at line 223 of file Worker.h.

223 { ++numberOfPathsOn_; }

References numberOfPathsOn_.

◆ beginJob()

void edm::Worker::beginJob ( void  )

Definition at line 349 of file Worker.cc.

349  {
350  try {
351  convertException::wrap([&]() {
352  ModuleBeginJobSignalSentry cpp(actReg_.get(), description());
353  implBeginJob();
354  });
355  } catch (cms::Exception& ex) {
356  state_ = Exception;
357  std::ostringstream ost;
358  ost << "Calling beginJob for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
359  ex.addContext(ost.str());
360  throw;
361  }
362  }

References actReg_, cms::Exception::addContext(), description(), Exception, implBeginJob(), edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), state_, and edm::convertException::wrap().

Referenced by edm::WorkerManager::beginJob().

◆ beginStream()

void edm::Worker::beginStream ( StreamID  id,
StreamContext streamContext 
)

Definition at line 379 of file Worker.cc.

379  {
380  try {
381  convertException::wrap([&]() {
382  streamContext.setTransition(StreamContext::Transition::kBeginStream);
383  streamContext.setEventID(EventID(0, 0, 0));
384  streamContext.setRunIndex(RunIndex::invalidRunIndex());
385  streamContext.setLuminosityBlockIndex(LuminosityBlockIndex::invalidLuminosityBlockIndex());
386  streamContext.setTimestamp(Timestamp());
387  ParentContext parentContext(&streamContext);
388  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
390  ModuleBeginStreamSignalSentry beginSentry(actReg_.get(), streamContext, moduleCallingContext_);
391  implBeginStream(id);
392  });
393  } catch (cms::Exception& ex) {
394  state_ = Exception;
395  std::ostringstream ost;
396  ost << "Calling beginStream for module " << description().moduleName() << "/'" << description().moduleLabel()
397  << "'";
398  ex.addContext(ost.str());
399  throw;
400  }
401  }

References actReg_, cms::Exception::addContext(), description(), Exception, implBeginStream(), edm::LuminosityBlockIndex::invalidLuminosityBlockIndex(), edm::RunIndex::invalidRunIndex(), edm::StreamContext::kBeginStream, edm::ModuleCallingContext::kRunning, moduleCallingContext_, edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), edm::StreamContext::setEventID(), edm::StreamContext::setLuminosityBlockIndex(), edm::StreamContext::setRunIndex(), edm::ModuleCallingContext::setState(), edm::StreamContext::setTimestamp(), edm::StreamContext::setTransition(), state_, and edm::convertException::wrap().

◆ callWhenDoneAsync()

void edm::Worker::callWhenDoneAsync ( WaitingTask task)
inline

◆ clearCounters()

void edm::Worker::clearCounters ( )
inline

Definition at line 215 of file Worker.h.

215  {
216  timesRun_.store(0, std::memory_order_release);
217  timesVisited_.store(0, std::memory_order_release);
218  timesPassed_.store(0, std::memory_order_release);
219  timesFailed_.store(0, std::memory_order_release);
220  timesExcept_.store(0, std::memory_order_release);
221  }

References timesExcept_, timesFailed_, timesPassed_, timesRun_, and timesVisited_.

Referenced by edm::StreamSchedule::clearCounters().

◆ consumesInfo()

virtual std::vector<ConsumesInfo> edm::Worker::consumesInfo ( ) const
pure virtual

◆ convertCurrentProcessAlias()

virtual void edm::Worker::convertCurrentProcessAlias ( std::string const &  processName)
pure virtual

Implemented in edm::WorkerT< T >.

◆ descPtr()

ModuleDescription const* edm::Worker::descPtr ( ) const
inline

◆ description()

ModuleDescription const& edm::Worker::description ( ) const
inline

◆ doWork()

template<typename T >
bool edm::Worker::doWork ( typename T::TransitionInfoType const &  transitionInfo,
StreamID  streamID,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

Definition at line 1077 of file Worker.h.

1080  {
1081  if constexpr (T::isEvent_) {
1082  timesVisited_.fetch_add(1, std::memory_order_relaxed);
1083  }
1084  bool rc = false;
1085 
1086  switch (state_) {
1087  case Ready:
1088  break;
1089  case Pass:
1090  return true;
1091  case Fail:
1092  return false;
1093  case Exception: {
1094  std::rethrow_exception(cached_exception_);
1095  }
1096  }
1097 
1098  bool expected = false;
1099  if (not workStarted_.compare_exchange_strong(expected, true)) {
1100  //another thread beat us here
1101  auto waitTask = edm::make_empty_waiting_task();
1102  waitTask->increment_ref_count();
1103 
1104  waitingTasks_.add(waitTask.get());
1105 
1106  waitTask->wait_for_all();
1107 
1108  switch (state_) {
1109  case Ready:
1110  assert(false);
1111  case Pass:
1112  return true;
1113  case Fail:
1114  return false;
1115  case Exception: {
1116  std::rethrow_exception(cached_exception_);
1117  }
1118  }
1119  }
1120 
1121  //Need the context to be set until after any exception is resolved
1123 
1124  auto resetContext = [](ModuleCallingContext* iContext) {
1125  iContext->setContext(ModuleCallingContext::State::kInvalid, ParentContext(), nullptr);
1126  };
1127  std::unique_ptr<ModuleCallingContext, decltype(resetContext)> prefetchSentry(&moduleCallingContext_, resetContext);
1128 
1129  if constexpr (T::isEvent_) {
1130  //if have TriggerResults based selection we want to reject the event before doing prefetching
1131  if (workerhelper::CallImpl<T>::needToRunSelection(this)) {
1132  auto waitTask = edm::make_empty_waiting_task();
1133  waitTask->set_ref_count(2);
1135  waitTask.get(), ServiceRegistry::instance().presentToken(), streamID, &transitionInfo.principal());
1136  waitTask->decrement_ref_count();
1137  waitTask->wait_for_all();
1138 
1139  if (state() != Ready) {
1140  //The selection must have rejected this event
1141  return true;
1142  }
1143  }
1144  auto waitTask = edm::make_empty_waiting_task();
1145  {
1146  //Make sure signal is sent once the prefetching is done
1147  // [the 'pre' signal was sent in prefetchAsync]
1148  //The purpose of this block is to send the signal after wait_for_all
1149  auto sentryFunc = [this](void*) { emitPostModuleEventPrefetchingSignal(); };
1150  std::unique_ptr<ActivityRegistry, decltype(sentryFunc)> signalSentry(actReg_.get(), sentryFunc);
1151 
1152  //set count to 2 since wait_for_all requires value to not go to 0
1153  waitTask->set_ref_count(2);
1154 
1155  prefetchAsync<T>(
1156  waitTask.get(), ServiceRegistry::instance().presentToken(), parentContext, transitionInfo, T::transition_);
1157  waitTask->decrement_ref_count();
1158  waitTask->wait_for_all();
1159  }
1160  if (waitTask->exceptionPtr() != nullptr) {
1161  if (shouldRethrowException(*waitTask->exceptionPtr(), parentContext, T::isEvent_)) {
1162  setException<T::isEvent_>(*waitTask->exceptionPtr());
1164  std::rethrow_exception(cached_exception_);
1165  } else {
1166  setPassed<T::isEvent_>();
1167  waitingTasks_.doneWaiting(nullptr);
1168  return true;
1169  }
1170  }
1171  }
1172 
1173  //successful prefetch so no reset necessary
1174  prefetchSentry.release();
1175  if (auto queue = serializeRunModule()) {
1176  auto serviceToken = ServiceRegistry::instance().presentToken();
1177  queue.pushAndWait([&]() {
1178  //Need to make the services available
1179  ServiceRegistry::Operate guard(serviceToken);
1180  // This try-catch is primarily for paranoia: runModule() deals internally with exceptions, except for those coming from Service signal actions, which are not supposed to throw exceptions
1181  CMS_SA_ALLOW try { rc = runModule<T>(transitionInfo, streamID, parentContext, context); } catch (...) {
1182  }
1183  });
1184  } else {
1185  // This try-catch is primarily for paranoia: runModule() deals internally with exceptions, except for those coming from Service signal actions, which are not supposed to throw exceptions
1186  CMS_SA_ALLOW try { rc = runModule<T>(transitionInfo, streamID, parentContext, context); } catch (...) {
1187  }
1188  }
1189  if (state_ == Exception) {
1191  std::rethrow_exception(cached_exception_);
1192  }
1193 
1194  waitingTasks_.doneWaiting(nullptr);
1195  return rc;
1196  }

References actReg_, edm::WaitingTaskList::add(), cms::cuda::assert(), cached_exception_, CMS_SA_ALLOW, edm::WaitingTaskList::doneWaiting(), emitPostModuleEventPrefetchingSignal(), Exception, Fail, edm::ServiceRegistry::instance(), edm::ModuleCallingContext::kInvalid, edm::ModuleCallingContext::kPrefetching, edm::make_empty_waiting_task(), moduleCallingContext_, Pass, prePrefetchSelectionAsync(), edm::ServiceRegistry::presentToken(), createBeamHaloJobs::queue, Ready, serializeRunModule(), edm::ModuleCallingContext::setContext(), shouldRethrowException(), state(), state_, timesVisited_, waitingTasks_, and workStarted_.

Referenced by edm::UnscheduledProductResolver::resolveProduct_().

◆ doWorkAsync()

template<typename T >
void edm::Worker::doWorkAsync ( WaitingTask task,
typename T::TransitionInfoType const &  transitionInfo,
ServiceToken const &  token,
StreamID  streamID,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

Definition at line 920 of file Worker.h.

925  {
926  if (not workerhelper::CallImpl<T>::wantsTransition(this)) {
927  return;
928  }
929 
930  //Need to check workStarted_ before adding to waitingTasks_
931  bool expected = false;
932  bool workStarted = workStarted_.compare_exchange_strong(expected, true);
933 
935  if constexpr (T::isEvent_) {
936  timesVisited_.fetch_add(1, std::memory_order_relaxed);
937  }
938 
939  if (workStarted) {
941 
942  //if have TriggerResults based selection we want to reject the event before doing prefetching
943  if (workerhelper::CallImpl<T>::needToRunSelection(this)) {
944  //We need to run the selection in a different task so that
945  // we can prefetch the data needed for the selection
946  auto runTask = new (tbb::task::allocate_root())
947  RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context);
948 
949  //make sure the task is either run or destroyed
950  struct DestroyTask {
951  DestroyTask(edm::WaitingTask* iTask) : m_task(iTask) {}
952 
953  ~DestroyTask() {
954  auto p = m_task.load();
955  if (p) {
956  tbb::task::destroy(*p);
957  }
958  }
959 
961  auto t = m_task.load();
962  m_task.store(nullptr);
963  return t;
964  }
965 
966  std::atomic<edm::WaitingTask*> m_task;
967  };
968 
969  auto ownRunTask = std::make_shared<DestroyTask>(runTask);
970  auto selectionTask = make_waiting_task(
971  tbb::task::allocate_root(),
972  [ownRunTask, parentContext, info = transitionInfo, token, this](std::exception_ptr const*) mutable {
974  prefetchAsync<T>(ownRunTask->release(), token, parentContext, info, T::transition_);
975  });
976  prePrefetchSelectionAsync(selectionTask, token, streamID, &transitionInfo.principal());
977  } else {
978  WaitingTask* moduleTask = new (tbb::task::allocate_root())
979  RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context);
980  if constexpr (T::isEvent_) {
981  if (hasAcquire()) {
982  WaitingTaskWithArenaHolder runTaskHolder(
983  new (tbb::task::allocate_root()) HandleExternalWorkExceptionTask(this, moduleTask, parentContext));
984  moduleTask = new (tbb::task::allocate_root())
985  AcquireTask<T>(this, transitionInfo, token, parentContext, std::move(runTaskHolder));
986  }
987  }
988  prefetchAsync<T>(moduleTask, token, parentContext, transitionInfo, T::transition_);
989  }
990  }
991  }

References edm::WaitingTaskList::add(), hasAcquire(), info(), edm::ModuleCallingContext::kPrefetching, edm::make_waiting_task(), moduleCallingContext_, eostools::move(), AlCaHLTBitMon_ParallelJobs::p, prePrefetchSelectionAsync(), fetchall_from_DQM_v2::release, edm::ModuleCallingContext::setContext(), submitPVValidationJobs::t, TrackValidation_cff::task, timesVisited_, unpackBuffers-CaloStage2::token, waitingTasks_, and workStarted_.

Referenced by edm::UnscheduledProductResolver::prefetchAsync_(), and edm::WorkerInPath::runWorkerAsync().

◆ doWorkNoPrefetchingAsync()

template<typename T >
void edm::Worker::doWorkNoPrefetchingAsync ( WaitingTask task,
typename T::TransitionInfoType const &  transitionInfo,
ServiceToken const &  serviceToken,
StreamID  streamID,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

Definition at line 1020 of file Worker.h.

1025  {
1026  if (not workerhelper::CallImpl<T>::wantsTransition(this)) {
1027  return;
1028  }
1029 
1030  //Need to check workStarted_ before adding to waitingTasks_
1031  bool expected = false;
1032  auto workStarted = workStarted_.compare_exchange_strong(expected, true);
1033 
1035  if (workStarted) {
1036  auto toDo = [this, info = transitionInfo, streamID, parentContext, context, serviceToken]() {
1037  std::exception_ptr exceptionPtr;
1038  // Caught exception is propagated via WaitingTaskList
1039  CMS_SA_ALLOW try {
1040  //Need to make the services available
1041  ServiceRegistry::Operate guard(serviceToken);
1042 
1043  this->runModule<T>(info, streamID, parentContext, context);
1044  } catch (...) {
1045  exceptionPtr = std::current_exception();
1046  }
1047  this->waitingTasks_.doneWaiting(exceptionPtr);
1048  };
1049 
1050  if (needsESPrefetching(T::transition_)) {
1051  auto afterPrefetch = edm::make_waiting_task(
1052  tbb::task::allocate_root(), [toDo = std::move(toDo), this](std::exception_ptr const* iExcept) {
1053  if (iExcept) {
1054  this->waitingTasks_.doneWaiting(*iExcept);
1055  } else {
1056  if (auto queue = this->serializeRunModule()) {
1057  queue.push(toDo);
1058  } else {
1059  auto taskToDo = make_functor_task(tbb::task::allocate_root(), toDo);
1060  tbb::task::spawn(*taskToDo);
1061  }
1062  }
1063  });
1064  esPrefetchAsync(afterPrefetch, transitionInfo.eventSetupImpl(), T::transition_, serviceToken);
1065  } else {
1066  if (auto queue = this->serializeRunModule()) {
1067  queue.push(toDo);
1068  } else {
1069  auto taskToDo = make_functor_task(tbb::task::allocate_root(), toDo);
1070  tbb::task::spawn(*taskToDo);
1071  }
1072  }
1073  }
1074  }

References edm::WaitingTaskList::add(), CMS_SA_ALLOW, edm::WaitingTaskList::doneWaiting(), esPrefetchAsync(), info(), edm::make_functor_task(), edm::make_waiting_task(), eostools::move(), needsESPrefetching(), createBeamHaloJobs::queue, serializeRunModule(), TrackValidation_cff::task, waitingTasks_, and workStarted_.

Referenced by edm::WorkerInPath::runWorkerAsync().

◆ edPrefetchAsync()

void edm::Worker::edPrefetchAsync ( WaitingTask iTask,
ServiceToken const &  token,
Principal const &  iPrincipal 
) const
private

Definition at line 326 of file Worker.cc.

326  {
327  // Prefetch products the module declares it consumes
328  std::vector<ProductResolverIndexAndSkipBit> const& items = itemsToGetFrom(iPrincipal.branchType());
329 
330  for (auto const& item : items) {
331  ProductResolverIndex productResolverIndex = item.productResolverIndex();
332  bool skipCurrentProcess = item.skipCurrentProcess();
333  if (productResolverIndex != ProductResolverIndexAmbiguous) {
334  iPrincipal.prefetchAsync(iTask, productResolverIndex, skipCurrentProcess, token, &moduleCallingContext_);
335  }
336  }
337  }

References edm::Principal::branchType(), B2GTnPMonitor_cfi::item, mps_monitormerge::items, itemsToGetFrom(), moduleCallingContext_, edm::Principal::prefetchAsync(), edm::ProductResolverIndexAmbiguous, and unpackBuffers-CaloStage2::token.

Referenced by prefetchAsync().

◆ emitPostModuleEventPrefetchingSignal()

void edm::Worker::emitPostModuleEventPrefetchingSignal ( )
inlineprivate

Definition at line 337 of file Worker.h.

337  {
338  actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
339  }

References actReg_, edm::ModuleCallingContext::getStreamContext(), and moduleCallingContext_.

Referenced by doWork(), and edm::Worker::RunModuleTask< T >::execute().

◆ endJob()

void edm::Worker::endJob ( void  )

Definition at line 364 of file Worker.cc.

364  {
365  try {
366  convertException::wrap([&]() {
367  ModuleEndJobSignalSentry cpp(actReg_.get(), description());
368  implEndJob();
369  });
370  } catch (cms::Exception& ex) {
371  state_ = Exception;
372  std::ostringstream ost;
373  ost << "Calling endJob for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
374  ex.addContext(ost.str());
375  throw;
376  }
377  }

References actReg_, cms::Exception::addContext(), description(), Exception, implEndJob(), edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), state_, and edm::convertException::wrap().

◆ endStream()

void edm::Worker::endStream ( StreamID  id,
StreamContext streamContext 
)

Definition at line 403 of file Worker.cc.

403  {
404  try {
405  convertException::wrap([&]() {
406  streamContext.setTransition(StreamContext::Transition::kEndStream);
407  streamContext.setEventID(EventID(0, 0, 0));
408  streamContext.setRunIndex(RunIndex::invalidRunIndex());
409  streamContext.setLuminosityBlockIndex(LuminosityBlockIndex::invalidLuminosityBlockIndex());
410  streamContext.setTimestamp(Timestamp());
411  ParentContext parentContext(&streamContext);
412  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
414  ModuleEndStreamSignalSentry endSentry(actReg_.get(), streamContext, moduleCallingContext_);
415  implEndStream(id);
416  });
417  } catch (cms::Exception& ex) {
418  state_ = Exception;
419  std::ostringstream ost;
420  ost << "Calling endStream for module " << description().moduleName() << "/'" << description().moduleLabel()
421  << "'";
422  ex.addContext(ost.str());
423  throw;
424  }
425  }

References actReg_, cms::Exception::addContext(), description(), Exception, implEndStream(), edm::LuminosityBlockIndex::invalidLuminosityBlockIndex(), edm::RunIndex::invalidRunIndex(), edm::StreamContext::kEndStream, edm::ModuleCallingContext::kRunning, moduleCallingContext_, edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), edm::StreamContext::setEventID(), edm::StreamContext::setLuminosityBlockIndex(), edm::StreamContext::setRunIndex(), edm::ModuleCallingContext::setState(), edm::StreamContext::setTimestamp(), edm::StreamContext::setTransition(), state_, and edm::convertException::wrap().

◆ esItemsToGetFrom()

virtual std::vector<ESProxyIndex> const& edm::Worker::esItemsToGetFrom ( Transition  ) const
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by esPrefetchAsync(), and needsESPrefetching().

◆ esPrefetchAsync()

void edm::Worker::esPrefetchAsync ( WaitingTask iTask,
EventSetupImpl const &  iImpl,
Transition  iTrans,
ServiceToken const &  iToken 
)
private

Definition at line 247 of file Worker.cc.

250  {
252  return;
253  }
254  auto const& recs = esRecordsToGetFrom(iTrans);
255  auto const& items = esItemsToGetFrom(iTrans);
256 
257  assert(items.size() == recs.size());
258  if (items.empty()) {
259  return;
260  }
261 
262  //Thread case of 1 thread special. The master thread is doing a wait_for_all on the
263  // default tbb arena. It will not process any tasks on the es arena. We need to add a
264  // task that will synchronously do a wait_for_all in the es arena to be sure prefetching
265  // will work.
266 
267  if UNLIKELY (tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism) == 1) {
268  //We spawn this first so that the other ES tasks are before it in the TBB queue
269  tbb::task_arena edArena(tbb::task_arena::attach{});
270  tbb::task::spawn(
271  *make_functor_task(tbb::task::allocate_root(),
272  [this, task = edm::WaitingTaskHolder(iTask), iTrans, &iImpl, iToken, edArena]() mutable {
273  esTaskArena().execute([this, iTrans, &iImpl, iToken, task = std::move(task), edArena]() {
274  auto waitTask = edm::make_empty_waiting_task();
275  auto const& recs = esRecordsToGetFrom(iTrans);
276  auto const& items = esItemsToGetFrom(iTrans);
277  waitTask->set_ref_count(2);
278  for (size_t i = 0; i != items.size(); ++i) {
279  if (recs[i] != ESRecordIndex{}) {
280  auto rec = iImpl.findImpl(recs[i]);
281  if (rec) {
282  rec->prefetchAsync(waitTask.get(), items[i], &iImpl, iToken);
283  }
284  }
285  }
286  waitTask->decrement_ref_count();
287  waitTask->wait_for_all();
288 
289  auto exPtr = waitTask->exceptionPtr();
290  tbb::task_arena(edArena).execute([task, exPtr]() {
291  auto t = task;
292  if (exPtr) {
293  t.doneWaiting(*exPtr);
294  } else {
295  t.doneWaiting(std::exception_ptr{});
296  }
297  });
298  });
299  }));
300  } else {
301  //We need iTask to run in the default arena since it is not an ES task
302  auto task =
303  make_waiting_task(tbb::task::allocate_root(),
304  [holder = WaitingTaskWithArenaHolder{iTask}](std::exception_ptr const* iExcept) mutable {
305  if (iExcept) {
306  holder.doneWaiting(*iExcept);
307  } else {
308  holder.doneWaiting(std::exception_ptr{});
309  }
310  });
311 
312  WaitingTaskHolder tempH(task);
313  esTaskArena().execute([&]() {
314  for (size_t i = 0; i != items.size(); ++i) {
315  if (recs[i] != ESRecordIndex{}) {
316  auto rec = iImpl.findImpl(recs[i]);
317  if (rec) {
318  rec->prefetchAsync(task, items[i], &iImpl, iToken);
319  }
320  }
321  }
322  });
323  }
324  }

References cms::cuda::assert(), esItemsToGetFrom(), esRecordsToGetFrom(), edm::esTaskArena(), edm::EventSetupImpl::findImpl(), mps_fire::i, mps_monitormerge::items, edm::make_empty_waiting_task(), edm::make_functor_task(), edm::make_waiting_task(), eostools::move(), edm::NumberOfEventSetupTransitions, edm::eventsetup::EventSetupRecordImpl::prefetchAsync(), submitPVValidationJobs::t, TrackValidation_cff::task, and UNLIKELY.

Referenced by doWorkNoPrefetchingAsync(), edm::workerhelper::CallImpl< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > >::esPrefetchAsync(), edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > >::esPrefetchAsync(), edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > >::esPrefetchAsync(), edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > >::esPrefetchAsync(), edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > >::esPrefetchAsync(), edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin > >::esPrefetchAsync(), edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin > >::esPrefetchAsync(), edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd > >::esPrefetchAsync(), and edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd > >::esPrefetchAsync().

◆ esRecordsToGetFrom()

virtual std::vector<ESRecordIndex> const& edm::Worker::esRecordsToGetFrom ( Transition  ) const
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by esPrefetchAsync().

◆ exceptionContext()

void edm::Worker::exceptionContext ( cms::Exception ex,
ModuleCallingContext const *  mcc 
)
staticprivate

Definition at line 112 of file Worker.cc.

112  {
113  ModuleCallingContext const* imcc = mcc;
114  while ((imcc->type() == ParentContext::Type::kModule) or (imcc->type() == ParentContext::Type::kInternal)) {
115  std::ostringstream iost;
116  if (imcc->state() == ModuleCallingContext::State::kPrefetching) {
117  iost << "Prefetching for module ";
118  } else {
119  iost << "Calling method for module ";
120  }
121  iost << imcc->moduleDescription()->moduleName() << "/'" << imcc->moduleDescription()->moduleLabel() << "'";
122 
123  if (imcc->type() == ParentContext::Type::kInternal) {
124  iost << " (probably inside some kind of mixing module)";
125  imcc = imcc->internalContext()->moduleCallingContext();
126  } else {
127  imcc = imcc->moduleCallingContext();
128  }
129  ex.addContext(iost.str());
130  }
131  std::ostringstream ost;
132  if (imcc->state() == ModuleCallingContext::State::kPrefetching) {
133  ost << "Prefetching for module ";
134  } else {
135  ost << "Calling method for module ";
136  }
137  ost << imcc->moduleDescription()->moduleName() << "/'" << imcc->moduleDescription()->moduleLabel() << "'";
138  ex.addContext(ost.str());
139 
140  if (imcc->type() == ParentContext::Type::kPlaceInPath) {
141  ost.str("");
142  ost << "Running path '";
143  ost << imcc->placeInPathContext()->pathContext()->pathName() << "'";
144  ex.addContext(ost.str());
145  auto streamContext = imcc->placeInPathContext()->pathContext()->streamContext();
146  if (streamContext) {
147  ost.str("");
148  edm::exceptionContext(ost, *streamContext);
149  ex.addContext(ost.str());
150  }
151  } else {
152  if (imcc->type() == ParentContext::Type::kStream) {
153  ost.str("");
154  edm::exceptionContext(ost, *(imcc->streamContext()));
155  ex.addContext(ost.str());
156  } else if (imcc->type() == ParentContext::Type::kGlobal) {
157  ost.str("");
158  edm::exceptionContext(ost, *(imcc->globalContext()));
159  ex.addContext(ost.str());
160  }
161  }
162  }

References cms::Exception::addContext(), edm::exceptionContext(), edm::ModuleCallingContext::globalContext(), edm::ModuleCallingContext::internalContext(), edm::ParentContext::kGlobal, edm::ParentContext::kInternal, edm::ParentContext::kModule, edm::ParentContext::kPlaceInPath, edm::ModuleCallingContext::kPrefetching, edm::ParentContext::kStream, edm::InternalContext::moduleCallingContext(), edm::ModuleCallingContext::moduleCallingContext(), edm::ModuleCallingContext::moduleDescription(), edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), or, edm::PlaceInPathContext::pathContext(), edm::PathContext::pathName(), edm::ModuleCallingContext::placeInPathContext(), edm::ModuleCallingContext::state(), edm::PathContext::streamContext(), edm::ModuleCallingContext::streamContext(), and edm::ModuleCallingContext::type().

Referenced by handleExternalWorkException(), runAcquire(), and runModule().

◆ globalLuminosityBlocksQueue()

virtual SerialTaskQueue* edm::Worker::globalLuminosityBlocksQueue ( )
pure virtual

◆ globalRunsQueue()

virtual SerialTaskQueue* edm::Worker::globalRunsQueue ( )
pure virtual

◆ handleExternalWorkException()

std::exception_ptr edm::Worker::handleExternalWorkException ( std::exception_ptr const *  iEPtr,
ParentContext const &  parentContext 
)
private

Definition at line 491 of file Worker.cc.

492  {
494  try {
495  convertException::wrap([iEPtr]() { std::rethrow_exception(*iEPtr); });
496  } catch (cms::Exception& ex) {
497  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
499  return std::current_exception();
500  }
501  }
502  return *iEPtr;
503  }

References exceptionContext(), moduleCallingContext_, ranAcquireWithoutException_, and edm::convertException::wrap().

◆ hasAccumulator()

virtual bool edm::Worker::hasAccumulator ( ) const
pure virtual

◆ hasAcquire()

virtual bool edm::Worker::hasAcquire ( ) const
privatepure virtual

◆ implBeginJob()

virtual void edm::Worker::implBeginJob ( )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by beginJob().

◆ implBeginStream()

virtual void edm::Worker::implBeginStream ( StreamID  )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by beginStream().

◆ implDo()

virtual bool edm::Worker::implDo ( EventTransitionInfo const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

◆ implDoAccessInputProcessBlock()

virtual bool edm::Worker::implDoAccessInputProcessBlock ( ProcessBlockPrincipal const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

◆ implDoAcquire()

virtual void edm::Worker::implDoAcquire ( EventTransitionInfo const &  ,
ModuleCallingContext const *  ,
WaitingTaskWithArenaHolder  
)
protectedpure virtual

◆ implDoBegin() [1/2]

virtual bool edm::Worker::implDoBegin ( LumiTransitionInfo const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

◆ implDoBegin() [2/2]

virtual bool edm::Worker::implDoBegin ( RunTransitionInfo const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

◆ implDoBeginProcessBlock()

virtual bool edm::Worker::implDoBeginProcessBlock ( ProcessBlockPrincipal const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

◆ implDoEnd() [1/2]

virtual bool edm::Worker::implDoEnd ( LumiTransitionInfo const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

◆ implDoEnd() [2/2]

virtual bool edm::Worker::implDoEnd ( RunTransitionInfo const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

◆ implDoEndProcessBlock()

virtual bool edm::Worker::implDoEndProcessBlock ( ProcessBlockPrincipal const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

◆ implDoPrePrefetchSelection()

virtual bool edm::Worker::implDoPrePrefetchSelection ( StreamID  ,
EventPrincipal const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

◆ implDoStreamBegin() [1/2]

virtual bool edm::Worker::implDoStreamBegin ( StreamID  ,
LumiTransitionInfo const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

◆ implDoStreamBegin() [2/2]

virtual bool edm::Worker::implDoStreamBegin ( StreamID  ,
RunTransitionInfo const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

◆ implDoStreamEnd() [1/2]

virtual bool edm::Worker::implDoStreamEnd ( StreamID  ,
LumiTransitionInfo const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

◆ implDoStreamEnd() [2/2]

virtual bool edm::Worker::implDoStreamEnd ( StreamID  ,
RunTransitionInfo const &  ,
ModuleCallingContext const *   
)
protectedpure virtual

◆ implEndJob()

virtual void edm::Worker::implEndJob ( )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by endJob().

◆ implEndStream()

virtual void edm::Worker::implEndStream ( StreamID  )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by endStream().

◆ implNeedToRunSelection()

virtual bool edm::Worker::implNeedToRunSelection ( ) const
protectedpure virtual

◆ implRegisterThinnedAssociations()

virtual void edm::Worker::implRegisterThinnedAssociations ( ProductRegistry const &  ,
ThinnedAssociationsHelper  
)
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by registerThinnedAssociations().

◆ implRespondToCloseInputFile()

virtual void edm::Worker::implRespondToCloseInputFile ( FileBlock const &  fb)
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by respondToCloseInputFile().

◆ implRespondToOpenInputFile()

virtual void edm::Worker::implRespondToOpenInputFile ( FileBlock const &  fb)
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by respondToOpenInputFile().

◆ itemsMayGet()

virtual void edm::Worker::itemsMayGet ( BranchType  ,
std::vector< ProductResolverIndexAndSkipBit > &   
) const
privatepure virtual

Implemented in edm::WorkerT< T >.

◆ itemsShouldPutInEvent()

virtual std::vector<ProductResolverIndex> const& edm::Worker::itemsShouldPutInEvent ( ) const
privatepure virtual

Implemented in edm::WorkerT< T >.

◆ itemsToGet()

virtual void edm::Worker::itemsToGet ( BranchType  ,
std::vector< ProductResolverIndexAndSkipBit > &   
) const
privatepure virtual

Implemented in edm::WorkerT< T >.

◆ itemsToGetForSelection()

virtual void edm::Worker::itemsToGetForSelection ( std::vector< ProductResolverIndexAndSkipBit > &  ) const
protectedpure virtual

◆ itemsToGetFrom()

virtual std::vector<ProductResolverIndexAndSkipBit> const& edm::Worker::itemsToGetFrom ( BranchType  ) const
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by edPrefetchAsync().

◆ modulesWhoseProductsAreConsumed()

virtual void edm::Worker::modulesWhoseProductsAreConsumed ( std::vector< ModuleDescription const * > &  modules,
ProductRegistry const &  preg,
std::map< std::string, ModuleDescription const * > const &  labelsToDesc 
) const
pure virtual

Implemented in edm::WorkerT< T >.

◆ moduleType()

virtual Types edm::Worker::moduleType ( ) const
pure virtual

◆ needsESPrefetching()

bool edm::Worker::needsESPrefetching ( Transition  iTrans) const
inlineprivatenoexcept

Definition at line 333 of file Worker.h.

333  {
334  return iTrans < edm::Transition::NumberOfEventSetupTransitions ? not esItemsToGetFrom(iTrans).empty() : false;
335  }

References esItemsToGetFrom(), and edm::NumberOfEventSetupTransitions.

Referenced by doWorkNoPrefetchingAsync().

◆ operator=()

Worker& edm::Worker::operator= ( Worker const &  )
delete

◆ postDoEvent()

void edm::Worker::postDoEvent ( EventPrincipal const &  iEvent)

Definition at line 445 of file Worker.cc.

445  {
446  if (earlyDeleteHelper_) {
447  earlyDeleteHelper_->moduleRan(iEvent);
448  }
449  }

References earlyDeleteHelper_, and iEvent.

◆ preActionBeforeRunEventAsync()

virtual void edm::Worker::preActionBeforeRunEventAsync ( WaitingTask iTask,
ModuleCallingContext const &  moduleCallingContext,
Principal const &  iPrincipal 
) const
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by prefetchAsync().

◆ prefetchAsync()

template<typename T >
void edm::Worker::prefetchAsync ( WaitingTask iTask,
ServiceToken const &  token,
ParentContext const &  parentContext,
typename T::TransitionInfoType const &  transitionInfo,
Transition  iTransition 
)
private

Definition at line 890 of file Worker.h.

894  {
895  Principal const& principal = transitionInfo.principal();
896 
898 
899  if (principal.branchType() == InEvent) {
900  actReg_->preModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
901  }
902 
903  //Need to be sure the ref count isn't set to 0 immediately
904  iTask->increment_ref_count();
905 
906  workerhelper::CallImpl<T>::esPrefetchAsync(this, iTask, token, transitionInfo, iTransition);
907  edPrefetchAsync(iTask, token, principal);
908 
909  if (principal.branchType() == InEvent) {
911  }
912 
913  if (0 == iTask->decrement_ref_count()) {
914  //if everything finishes before we leave this routine, we need to launch the task
915  tbb::task::spawn(*iTask);
916  }
917  }

References actReg_, edm::Principal::branchType(), edPrefetchAsync(), edm::ModuleCallingContext::getStreamContext(), edm::InEvent, edm::ModuleCallingContext::kPrefetching, moduleCallingContext_, preActionBeforeRunEventAsync(), edm::ModuleCallingContext::setContext(), and unpackBuffers-CaloStage2::token.

◆ prePrefetchSelectionAsync() [1/2]

void edm::Worker::prePrefetchSelectionAsync ( WaitingTask task,
ServiceToken const &  token,
StreamID  stream,
EventPrincipal const *  iPrincipal 
)

Definition at line 204 of file Worker.cc.

207  {
208  successTask->increment_ref_count();
209 
210  auto choiceTask = edm::make_waiting_task(
211  tbb::task::allocate_root(), [id, successTask, iPrincipal, this, token](std::exception_ptr const*) {
213  // There is no reasonable place to rethrow, and implDoPrePrefetchSelection() should not throw in the first place.
214  CMS_SA_ALLOW try {
215  if (not implDoPrePrefetchSelection(id, *iPrincipal, &moduleCallingContext_)) {
216  timesRun_.fetch_add(1, std::memory_order_relaxed);
217  setPassed<true>();
218  waitingTasks_.doneWaiting(nullptr);
219  //TBB requires that destroyed tasks have count 0
220  if (0 == successTask->decrement_ref_count()) {
221  tbb::task::destroy(*successTask);
222  }
223  return;
224  }
225  } catch (...) {
226  }
227  if (0 == successTask->decrement_ref_count()) {
228  tbb::task::spawn(*successTask);
229  }
230  });
231 
232  WaitingTaskHolder choiceHolder{choiceTask};
233 
234  std::vector<ProductResolverIndexAndSkipBit> items;
236 
237  for (auto const& item : items) {
238  ProductResolverIndex productResolverIndex = item.productResolverIndex();
239  bool skipCurrentProcess = item.skipCurrentProcess();
240  if (productResolverIndex != ProductResolverIndexAmbiguous) {
241  iPrincipal->prefetchAsync(choiceTask, productResolverIndex, skipCurrentProcess, token, &moduleCallingContext_);
242  }
243  }
244  choiceHolder.doneWaiting(std::exception_ptr{});
245  }

References CMS_SA_ALLOW, edm::WaitingTaskList::doneWaiting(), implDoPrePrefetchSelection(), B2GTnPMonitor_cfi::item, mps_monitormerge::items, itemsToGetForSelection(), edm::make_waiting_task(), moduleCallingContext_, edm::Principal::prefetchAsync(), edm::ProductResolverIndexAmbiguous, timesRun_, unpackBuffers-CaloStage2::token, and waitingTasks_.

Referenced by doWork(), and doWorkAsync().

◆ prePrefetchSelectionAsync() [2/2]

void edm::Worker::prePrefetchSelectionAsync ( WaitingTask task,
ServiceToken const &  ,
StreamID  stream,
void const *   
)
inline

Definition at line 141 of file Worker.h.

141  {
142  assert(false);
143  }

References cms::cuda::assert().

◆ registerThinnedAssociations()

void edm::Worker::registerThinnedAssociations ( ProductRegistry const &  registry,
ThinnedAssociationsHelper helper 
)

Definition at line 427 of file Worker.cc.

427  {
428  try {
430  } catch (cms::Exception& ex) {
431  ex.addContext("Calling registerThinnedAssociations() for module " + description().moduleLabel());
432  throw ex;
433  }
434  }

References cms::Exception::addContext(), description(), implRegisterThinnedAssociations(), and HerwigMaxPtPartonFilter_cfi::moduleLabel.

◆ reset()

void edm::Worker::reset ( void  )
inline

Definition at line 178 of file Worker.h.

178  {
179  cached_exception_ = std::exception_ptr();
180  state_ = Ready;
182  workStarted_ = false;
184  }

References cached_exception_, numberOfPathsLeftToRun_, numberOfPathsOn_, Ready, edm::WaitingTaskList::reset(), state_, waitingTasks_, and workStarted_.

Referenced by edm::WorkerManager::resetAll().

◆ resetModuleDescription()

void edm::Worker::resetModuleDescription ( ModuleDescription const *  iDesc)
protected

◆ resolvePutIndicies()

virtual void edm::Worker::resolvePutIndicies ( BranchType  iBranchType,
std::unordered_multimap< std::string, std::tuple< TypeID const *, const char *, edm::ProductResolverIndex >> const &  iIndicies 
)
pure virtual

Implemented in edm::WorkerT< T >.

◆ respondToCloseInputFile()

void edm::Worker::respondToCloseInputFile ( FileBlock const &  fb)
inline

◆ respondToOpenInputFile()

void edm::Worker::respondToOpenInputFile ( FileBlock const &  fb)
inline

◆ runAcquire()

void edm::Worker::runAcquire ( EventTransitionInfo const &  info,
ParentContext const &  parentContext,
WaitingTaskWithArenaHolder holder 
)
private

Definition at line 451 of file Worker.cc.

453  {
454  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
455  try {
456  convertException::wrap([&]() { this->implDoAcquire(info, &moduleCallingContext_, holder); });
457  } catch (cms::Exception& ex) {
459  if (shouldRethrowException(std::current_exception(), parentContext, true)) {
460  timesRun_.fetch_add(1, std::memory_order_relaxed);
461  throw;
462  }
463  }
464  }

References exceptionContext(), implDoAcquire(), moduleCallingContext_, shouldRethrowException(), timesRun_, and edm::convertException::wrap().

Referenced by runAcquireAfterAsyncPrefetch().

◆ runAcquireAfterAsyncPrefetch()

void edm::Worker::runAcquireAfterAsyncPrefetch ( std::exception_ptr const *  iEPtr,
EventTransitionInfo const &  eventTransitionInfo,
ParentContext const &  parentContext,
WaitingTaskWithArenaHolder  holder 
)
private

Definition at line 466 of file Worker.cc.

469  {
471  std::exception_ptr exceptionPtr;
472  if (iEPtr) {
473  assert(*iEPtr);
474  if (shouldRethrowException(*iEPtr, parentContext, true)) {
475  exceptionPtr = *iEPtr;
476  }
478  } else {
479  // Caught exception is propagated via WaitingTaskWithArenaHolder
480  CMS_SA_ALLOW try {
481  runAcquire(eventTransitionInfo, parentContext, holder);
483  } catch (...) {
484  exceptionPtr = std::current_exception();
485  }
486  }
487  // It is important this is after runAcquire completely finishes
488  holder.doneWaiting(exceptionPtr);
489  }

References cms::cuda::assert(), CMS_SA_ALLOW, edm::WaitingTaskWithArenaHolder::doneWaiting(), edm::ModuleCallingContext::kInvalid, moduleCallingContext_, ranAcquireWithoutException_, runAcquire(), edm::ModuleCallingContext::setContext(), and shouldRethrowException().

◆ runModule()

template<typename T >
bool edm::Worker::runModule ( typename T::TransitionInfoType const &  transitionInfo,
StreamID  streamID,
ParentContext const &  parentContext,
typename T::Context const *  context 
)
private

Definition at line 1199 of file Worker.h.

1202  {
1203  //unscheduled producers should advance this
1204  //if (T::isEvent_) {
1205  // ++timesVisited_;
1206  //}
1207  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
1208  if constexpr (T::isEvent_) {
1209  timesRun_.fetch_add(1, std::memory_order_relaxed);
1210  }
1211 
1212  bool rc = true;
1213  try {
1214  convertException::wrap([&]() {
1215  rc = workerhelper::CallImpl<T>::call(
1216  this, streamID, transitionInfo, actReg_.get(), &moduleCallingContext_, context);
1217 
1218  if (rc) {
1219  setPassed<T::isEvent_>();
1220  } else {
1221  setFailed<T::isEvent_>();
1222  }
1223  });
1224  } catch (cms::Exception& ex) {
1226  if (shouldRethrowException(std::current_exception(), parentContext, T::isEvent_)) {
1228  setException<T::isEvent_>(std::current_exception());
1229  std::rethrow_exception(cached_exception_);
1230  } else {
1231  rc = setPassed<T::isEvent_>();
1232  }
1233  }
1234 
1235  return rc;
1236  }

References actReg_, cms::cuda::assert(), cached_exception_, exceptionContext(), moduleCallingContext_, shouldRethrowException(), timesRun_, and edm::convertException::wrap().

◆ runModuleAfterAsyncPrefetch()

template<typename T >
std::exception_ptr edm::Worker::runModuleAfterAsyncPrefetch ( std::exception_ptr const *  iEPtr,
typename T::TransitionInfoType const &  transitionInfo,
StreamID  streamID,
ParentContext const &  parentContext,
typename T::Context const *  context 
)
private

Definition at line 994 of file Worker.h.

998  {
999  std::exception_ptr exceptionPtr;
1000  if (iEPtr) {
1001  assert(*iEPtr);
1002  if (shouldRethrowException(*iEPtr, parentContext, T::isEvent_)) {
1003  exceptionPtr = *iEPtr;
1004  setException<T::isEvent_>(exceptionPtr);
1005  } else {
1006  setPassed<T::isEvent_>();
1007  }
1009  } else {
1010  // Caught exception is propagated via WaitingTaskList
1011  CMS_SA_ALLOW try { runModule<T>(transitionInfo, streamID, parentContext, context); } catch (...) {
1012  exceptionPtr = std::current_exception();
1013  }
1014  }
1015  waitingTasks_.doneWaiting(exceptionPtr);
1016  return exceptionPtr;
1017  }

References cms::cuda::assert(), CMS_SA_ALLOW, edm::WaitingTaskList::doneWaiting(), edm::ModuleCallingContext::kInvalid, moduleCallingContext_, edm::ModuleCallingContext::setContext(), shouldRethrowException(), and waitingTasks_.

Referenced by edm::Worker::RunModuleTask< T >::execute().

◆ runModuleDirectly()

template<typename T >
std::exception_ptr edm::Worker::runModuleDirectly ( typename T::TransitionInfoType const &  transitionInfo,
StreamID  streamID,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

Definition at line 1239 of file Worker.h.

1242  {
1243  timesVisited_.fetch_add(1, std::memory_order_relaxed);
1244  std::exception_ptr const* prefetchingException = nullptr; // null because there was no prefetching to do
1245  return runModuleAfterAsyncPrefetch<T>(prefetchingException, transitionInfo, streamID, parentContext, context);
1246  }

References timesVisited_.

Referenced by edm::Path::finished().

◆ serializeRunModule()

virtual TaskQueueAdaptor edm::Worker::serializeRunModule ( )
privatepure virtual

◆ setActivityRegistry()

void edm::Worker::setActivityRegistry ( std::shared_ptr< ActivityRegistry areg)

The signals are required to live longer than the last call to 'doWork' this was done to improve performance based on profiling

Definition at line 110 of file Worker.cc.

110 { actReg_ = areg; }

References actReg_.

◆ setEarlyDeleteHelper()

void edm::Worker::setEarlyDeleteHelper ( EarlyDeleteHelper iHelper)

Definition at line 339 of file Worker.cc.

339 { earlyDeleteHelper_ = iHelper; }

References earlyDeleteHelper_.

◆ setException()

template<bool IS_EVENT>
std::exception_ptr edm::Worker::setException ( std::exception_ptr  iException)
inlineprivate

Definition at line 317 of file Worker.h.

317  {
318  if (IS_EVENT) {
319  timesExcept_.fetch_add(1, std::memory_order_relaxed);
320  }
321  cached_exception_ = iException; // propagate_const<T> has no reset() function
322  state_ = Exception;
323  return cached_exception_;
324  }

References cached_exception_, Exception, state_, and timesExcept_.

◆ setFailed()

template<bool IS_EVENT>
bool edm::Worker::setFailed ( )
inlineprivate

Definition at line 308 of file Worker.h.

308  {
309  if (IS_EVENT) {
310  timesFailed_.fetch_add(1, std::memory_order_relaxed);
311  }
312  state_ = Fail;
313  return false;
314  }

References Fail, state_, and timesFailed_.

◆ setPassed()

template<bool IS_EVENT>
bool edm::Worker::setPassed ( )
inlineprivate

Definition at line 299 of file Worker.h.

299  {
300  if (IS_EVENT) {
301  timesPassed_.fetch_add(1, std::memory_order_relaxed);
302  }
303  state_ = Pass;
304  return true;
305  }

References Pass, state_, and timesPassed_.

◆ shouldRethrowException()

bool edm::Worker::shouldRethrowException ( std::exception_ptr  iPtr,
ParentContext const &  parentContext,
bool  isEvent 
) const
private

Definition at line 164 of file Worker.cc.

164  {
165  // NOTE: the warning printed as a result of ignoring or failing
166  // a module will only be printed during the full true processing
167  // pass of this module
168 
169  // Get the action corresponding to this exception. However, if processing
170  // something other than an event (e.g. run, lumi) always rethrow.
171  if (not isEvent) {
172  return true;
173  }
174  try {
175  convertException::wrap([&]() { std::rethrow_exception(iPtr); });
176  } catch (cms::Exception& ex) {
178 
180  return true;
181  }
182 
183  ModuleCallingContext tempContext(&description(), ModuleCallingContext::State::kInvalid, parentContext, nullptr);
184 
185  // If we are processing an endpath and the module was scheduled, treat SkipEvent or FailPath
186  // as IgnoreCompletely, so any subsequent OutputModules are still run.
187  // For unscheduled modules only treat FailPath as IgnoreCompletely but still allow SkipEvent to throw
188  ModuleCallingContext const* top_mcc = tempContext.getTopModuleCallingContext();
189  if (top_mcc->type() == ParentContext::Type::kPlaceInPath &&
190  top_mcc->placeInPathContext()->pathContext()->isEndPath()) {
191  if ((action == exception_actions::SkipEvent && tempContext.type() == ParentContext::Type::kPlaceInPath) ||
194  }
195  }
197  edm::printCmsExceptionWarning("IgnoreCompletely", ex);
198  return false;
199  }
200  }
201  return true;
202  }

References writedatasetfile::action, actions_, cms::Exception::category(), description(), edm::exception_actions::FailPath, edm::ExceptionToActionTable::find(), edm::ModuleCallingContext::getTopModuleCallingContext(), edm::exception_actions::IgnoreCompletely, edm::PathContext::isEndPath(), edm::ModuleCallingContext::kInvalid, edm::ParentContext::kPlaceInPath, edm::PlaceInPathContext::pathContext(), edm::ModuleCallingContext::placeInPathContext(), edm::printCmsExceptionWarning(), edm::exception_actions::Rethrow, edm::exception_actions::SkipEvent, edm::ModuleCallingContext::type(), and edm::convertException::wrap().

Referenced by doWork(), runAcquire(), runAcquireAfterAsyncPrefetch(), runModule(), and runModuleAfterAsyncPrefetch().

◆ skipOnPath()

void edm::Worker::skipOnPath ( EventPrincipal const &  iEvent)

Definition at line 436 of file Worker.cc.

436  {
437  if (earlyDeleteHelper_) {
438  earlyDeleteHelper_->pathFinished(iEvent);
439  }
440  if (0 == --numberOfPathsLeftToRun_) {
442  }
443  }

References cached_exception_, edm::WaitingTaskList::doneWaiting(), earlyDeleteHelper_, iEvent, numberOfPathsLeftToRun_, and waitingTasks_.

Referenced by edm::WorkerInPath::skipWorker().

◆ state()

State edm::Worker::state ( ) const
inline

Definition at line 230 of file Worker.h.

230 { return state_; }

References state_.

Referenced by edm::WorkerInPath::checkResultsOfRunWorker(), and doWork().

◆ timesExcept()

int edm::Worker::timesExcept ( ) const
inline

Definition at line 229 of file Worker.h.

229 { return timesExcept_.load(std::memory_order_acquire); }

References timesExcept_.

◆ timesFailed()

int edm::Worker::timesFailed ( ) const
inline

Definition at line 228 of file Worker.h.

228 { return timesFailed_.load(std::memory_order_acquire); }

References timesFailed_.

◆ timesPass()

int edm::Worker::timesPass ( ) const
inline

Definition at line 232 of file Worker.h.

232 { return timesPassed(); } // for backward compatibility only - to be removed soon

References timesPassed().

◆ timesPassed()

int edm::Worker::timesPassed ( ) const
inline

Definition at line 227 of file Worker.h.

227 { return timesPassed_.load(std::memory_order_acquire); }

References timesPassed_.

Referenced by timesPass().

◆ timesRun()

int edm::Worker::timesRun ( ) const
inline

Definition at line 225 of file Worker.h.

225 { return timesRun_.load(std::memory_order_acquire); }

References timesRun_.

◆ timesVisited()

int edm::Worker::timesVisited ( ) const
inline

Definition at line 226 of file Worker.h.

226 { return timesVisited_.load(std::memory_order_acquire); }

References timesVisited_.

◆ updateLookup() [1/2]

virtual void edm::Worker::updateLookup ( BranchType  iBranchType,
ProductResolverIndexHelper const &   
)
pure virtual

Implemented in edm::WorkerT< T >.

◆ updateLookup() [2/2]

virtual void edm::Worker::updateLookup ( eventsetup::ESRecordsToProxyIndices const &  )
pure virtual

Implemented in edm::WorkerT< T >.

◆ wantsGlobalLuminosityBlocks()

virtual bool edm::Worker::wantsGlobalLuminosityBlocks ( ) const
pure virtual

◆ wantsGlobalRuns()

virtual bool edm::Worker::wantsGlobalRuns ( ) const
pure virtual

◆ wantsInputProcessBlocks()

virtual bool edm::Worker::wantsInputProcessBlocks ( ) const
pure virtual

◆ wantsProcessBlocks()

virtual bool edm::Worker::wantsProcessBlocks ( ) const
pure virtual

◆ wantsStreamLuminosityBlocks()

virtual bool edm::Worker::wantsStreamLuminosityBlocks ( ) const
pure virtual

◆ wantsStreamRuns()

virtual bool edm::Worker::wantsStreamRuns ( ) const
pure virtual

◆ workerType()

virtual std::string edm::Worker::workerType ( ) const
protectedpure virtual

Implemented in edm::WorkerT< T >.

Friends And Related Function Documentation

◆ workerhelper::CallImpl

template<typename O >
friend class workerhelper::CallImpl
friend

Definition at line 238 of file Worker.h.

Member Data Documentation

◆ actions_

ExceptionToActionTable const* edm::Worker::actions_
private

Definition at line 560 of file Worker.h.

Referenced by shouldRethrowException().

◆ actReg_

std::shared_ptr<ActivityRegistry> edm::Worker::actReg_
private

◆ cached_exception_

std::exception_ptr edm::Worker::cached_exception_
private

Definition at line 561 of file Worker.h.

Referenced by doWork(), reset(), runModule(), setException(), and skipOnPath().

◆ earlyDeleteHelper_

edm::propagate_const<EarlyDeleteHelper*> edm::Worker::earlyDeleteHelper_
private

Definition at line 565 of file Worker.h.

Referenced by postDoEvent(), setEarlyDeleteHelper(), and skipOnPath().

◆ moduleCallingContext_

ModuleCallingContext edm::Worker::moduleCallingContext_
private

◆ numberOfPathsLeftToRun_

std::atomic<int> edm::Worker::numberOfPathsLeftToRun_
private

Definition at line 556 of file Worker.h.

Referenced by reset(), and skipOnPath().

◆ numberOfPathsOn_

int edm::Worker::numberOfPathsOn_
private

Definition at line 555 of file Worker.h.

Referenced by addedToPath(), and reset().

◆ ranAcquireWithoutException_

bool edm::Worker::ranAcquireWithoutException_
private

Definition at line 569 of file Worker.h.

Referenced by handleExternalWorkException(), and runAcquireAfterAsyncPrefetch().

◆ state_

std::atomic<State> edm::Worker::state_
private

◆ timesExcept_

std::atomic<int> edm::Worker::timesExcept_
private

Definition at line 553 of file Worker.h.

Referenced by clearCounters(), setException(), and timesExcept().

◆ timesFailed_

std::atomic<int> edm::Worker::timesFailed_
private

Definition at line 552 of file Worker.h.

Referenced by clearCounters(), setFailed(), and timesFailed().

◆ timesPassed_

std::atomic<int> edm::Worker::timesPassed_
private

Definition at line 551 of file Worker.h.

Referenced by clearCounters(), setPassed(), and timesPassed().

◆ timesRun_

std::atomic<int> edm::Worker::timesRun_
private

Definition at line 549 of file Worker.h.

Referenced by clearCounters(), prePrefetchSelectionAsync(), runAcquire(), runModule(), and timesRun().

◆ timesVisited_

std::atomic<int> edm::Worker::timesVisited_
private

Definition at line 550 of file Worker.h.

Referenced by clearCounters(), doWork(), doWorkAsync(), runModuleDirectly(), and timesVisited().

◆ waitingTasks_

edm::WaitingTaskList edm::Worker::waitingTasks_
private

◆ workStarted_

std::atomic<bool> edm::Worker::workStarted_
private

Definition at line 568 of file Worker.h.

Referenced by doWork(), doWorkAsync(), doWorkNoPrefetchingAsync(), and reset().

edm::ModuleCallingContext::state
State state() const
Definition: ModuleCallingContext.h:51
edm::ModuleDescription::moduleLabel
std::string const & moduleLabel() const
Definition: ModuleDescription.h:43
edm::Worker::itemsToGetForSelection
virtual void itemsToGetForSelection(std::vector< ProductResolverIndexAndSkipBit > &) const =0
edm::Worker::implDoPrePrefetchSelection
virtual bool implDoPrePrefetchSelection(StreamID, EventPrincipal const &, ModuleCallingContext const *)=0
edm::exceptionContext
void exceptionContext(std::ostream &, GlobalContext const &)
Definition: GlobalContext.cc:71
edm::ProductResolverIndex
unsigned int ProductResolverIndex
Definition: ProductResolverIndex.h:8
mps_fire.i
i
Definition: mps_fire.py:428
edm::Worker::esPrefetchAsync
void esPrefetchAsync(WaitingTask *, EventSetupImpl const &, Transition, ServiceToken const &)
Definition: Worker.cc:247
edm::ModuleCallingContext::State::kPrefetching
cms::Exception::addContext
void addContext(std::string const &context)
Definition: Exception.cc:165
edm::Worker::actReg_
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:563
edm::Worker::implEndJob
virtual void implEndJob()=0
ActivityRegistry
Timestamp
edm::exception_actions::Rethrow
Definition: ExceptionActions.h:11
edm::Worker::timesPassed_
std::atomic< int > timesPassed_
Definition: Worker.h:551
edm::Worker::Fail
Definition: Worker.h:90
edm::Worker::preActionBeforeRunEventAsync
virtual void preActionBeforeRunEventAsync(WaitingTask *iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const =0
edm::printCmsExceptionWarning
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
Definition: ExceptionMessages.cc:25
AlCaHLTBitMon_ParallelJobs.p
p
Definition: AlCaHLTBitMon_ParallelJobs.py:153
edm::Worker::timesVisited_
std::atomic< int > timesVisited_
Definition: Worker.h:550
edm::Worker::Exception
Definition: Worker.h:90
edm::WaitingTaskList::add
void add(WaitingTask *)
Adds task to the waiting list.
Definition: WaitingTaskList.cc:90
edm::ModuleDescription::moduleName
std::string const & moduleName() const
Definition: ModuleDescription.h:42
edm::Worker::actions_
ExceptionToActionTable const * actions_
Definition: Worker.h:560
edm::Worker::implBeginJob
virtual void implBeginJob()=0
edm::make_functor_task
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
edm::ParentContext::Type::kStream
cms::cuda::assert
assert(be >=bs)
edm::Worker::numberOfPathsLeftToRun_
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:556
info
static const TGPicture * info(bool iBackgroundIsBlack)
Definition: FWCollectionSummaryWidget.cc:153
edm::ParentContext::Type::kGlobal
edm::Worker::kProducer
Definition: Worker.h:91
edm::ParentContext::Type::kPlaceInPath
edm::ModuleCallingContext::State::kInvalid
edm::ModuleCallingContext::moduleDescription
ModuleDescription const * moduleDescription() const
Definition: ModuleCallingContext.h:50
edm::esTaskArena
tbb::task_arena & esTaskArena()
Definition: esTaskArenas.cc:8
groupFilesInBlocks.temp
list temp
Definition: groupFilesInBlocks.py:142
CMS_SA_ALLOW
#define CMS_SA_ALLOW
Definition: thread_safety_macros.h:5
edm::Worker::implEndStream
virtual void implEndStream(StreamID)=0
edm::Worker::workStarted_
std::atomic< bool > workStarted_
Definition: Worker.h:568
edm::ModuleCallingContext::parent
ParentContext const & parent() const
Definition: ModuleCallingContext.h:53
mps_monitormerge.items
list items
Definition: mps_monitormerge.py:29
edm::WaitingTaskList::reset
void reset()
Resets access to the resource so that added tasks will wait.
Definition: WaitingTaskList.cc:52
edm::Worker::implRespondToCloseInputFile
virtual void implRespondToCloseInputFile(FileBlock const &fb)=0
UNLIKELY
#define UNLIKELY(x)
Definition: Likely.h:21
edm::Worker::kFilter
Definition: Worker.h:91
edm::Worker::edPrefetchAsync
void edPrefetchAsync(WaitingTask *, ServiceToken const &, Principal const &) const
Definition: Worker.cc:326
edm::Worker::earlyDeleteHelper_
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:565
WaitingTask
ESRecordIndex
edm::Worker::timesRun_
std::atomic< int > timesRun_
Definition: Worker.h:549
edm::Worker::kAnalyzer
Definition: Worker.h:91
edm::convertException::wrap
auto wrap(F iFunc) -> decltype(iFunc())
Definition: ConvertException.h:19
edm::StreamContext::Transition::kBeginStream
edm::ParentContext::Type::kInternal
TrackValidation_cff.task
task
Definition: TrackValidation_cff.py:252
edm::Worker::state_
std::atomic< State > state_
Definition: Worker.h:554
edm::exception_actions::FailPath
Definition: ExceptionActions.h:11
edm::WaitingTaskList::doneWaiting
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
Definition: WaitingTaskList.cc:170
WaitingTaskHolder
edm::InEvent
Definition: BranchType.h:11
edm::Worker::Pass
Definition: Worker.h:90
edm::exception_actions::SkipEvent
Definition: ExceptionActions.h:11
edm::ParentContext::Type::kModule
edm::make_waiting_task
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:87
edm::Worker::timesExcept_
std::atomic< int > timesExcept_
Definition: Worker.h:553
createBeamHaloJobs.queue
queue
Definition: createBeamHaloJobs.py:343
edm::Worker::runAcquire
void runAcquire(EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder &)
Definition: Worker.cc:451
edm::ServiceRegistry::Operate
friend class Operate
Definition: ServiceRegistry.h:54
edm::Worker::serializeRunModule
virtual TaskQueueAdaptor serializeRunModule()=0
edm::Worker::implDoAcquire
virtual void implDoAcquire(EventTransitionInfo const &, ModuleCallingContext const *, WaitingTaskWithArenaHolder &)=0
edm::Worker::waitingTasks_
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:567
edm::LuminosityBlockIndex::invalidLuminosityBlockIndex
static LuminosityBlockIndex invalidLuminosityBlockIndex()
Definition: LuminosityBlockIndex.cc:9
fetchall_from_DQM_v2.release
release
Definition: fetchall_from_DQM_v2.py:92
edm::WaitingTaskHolder
Definition: WaitingTaskHolder.h:30
edm::ModuleCallingContext::previousModuleOnThread
ModuleCallingContext const * previousModuleOnThread() const
Definition: ModuleCallingContext.h:75
edm::Worker::implRegisterThinnedAssociations
virtual void implRegisterThinnedAssociations(ProductRegistry const &, ThinnedAssociationsHelper &)=0
edm::ServiceRegistry::presentToken
ServiceToken presentToken() const
Definition: ServiceRegistry.cc:63
edm::ModuleCallingContext::getStreamContext
StreamContext const * getStreamContext() const
Definition: ModuleCallingContext.cc:32
edm::ModuleCallingContext::setState
void setState(State state)
Definition: ModuleCallingContext.h:48
Exception
helper
Definition: helper.py:1
edm::Worker::prePrefetchSelectionAsync
void prePrefetchSelectionAsync(WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *)
Definition: Worker.cc:204
edm::ModuleCallingContext::setContext
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
Definition: ModuleCallingContext.cc:24
iEvent
int iEvent
Definition: GenABIO.cc:224
edm::Worker::ranAcquireWithoutException_
bool ranAcquireWithoutException_
Definition: Worker.h:569
edm::Worker::exceptionContext
static void exceptionContext(cms::Exception &ex, ModuleCallingContext const *mcc)
Definition: Worker.cc:112
edm::Worker::implRespondToOpenInputFile
virtual void implRespondToOpenInputFile(FileBlock const &fb)=0
edm::Worker::numberOfPathsOn_
int numberOfPathsOn_
Definition: Worker.h:555
writedatasetfile.action
action
Definition: writedatasetfile.py:8
edm::Worker::moduleCallingContext_
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:558
B2GTnPMonitor_cfi.item
item
Definition: B2GTnPMonitor_cfi.py:147
edm::Worker::timesFailed_
std::atomic< int > timesFailed_
Definition: Worker.h:552
edm::Worker::description
ModuleDescription const & description() const
Definition: Worker.h:188
edm::Worker::state
State state() const
Definition: Worker.h:230
edm::Worker::cached_exception_
std::exception_ptr cached_exception_
Definition: Worker.h:561
edm::ServiceRegistry::instance
static ServiceRegistry & instance()
Definition: ServiceRegistry.cc:90
edm::RunIndex::invalidRunIndex
static RunIndex invalidRunIndex()
Definition: RunIndex.cc:9
edm::Worker::needsESPrefetching
bool needsESPrefetching(Transition iTrans) const noexcept
Definition: Worker.h:333
edm::Worker::esRecordsToGetFrom
virtual std::vector< ESRecordIndex > const & esRecordsToGetFrom(Transition) const =0
edm::make_empty_waiting_task
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
Definition: WaitingTaskList.h:96
edm::Worker::timesPassed
int timesPassed() const
Definition: Worker.h:227
edm::ProductResolverIndexAmbiguous
Definition: ProductResolverIndex.h:18
edm::Worker::itemsToGetFrom
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom(BranchType) const =0
cms::cuda::device::unique_ptr
std::unique_ptr< T, impl::DeviceDeleter > unique_ptr
Definition: device_unique_ptr.h:33
edm::Worker::hasAcquire
virtual bool hasAcquire() const =0
edm::WaitingTask
Definition: WaitingTask.h:36
eostools.move
def move(src, dest)
Definition: eostools.py:511
edm::Worker::esItemsToGetFrom
virtual std::vector< ESProxyIndex > const & esItemsToGetFrom(Transition) const =0
or
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::ExceptionToActionTable::find
exception_actions::ActionCodes find(const std::string &category) const
Definition: ExceptionActions.cc:85
edm::Worker::implBeginStream
virtual void implBeginStream(StreamID)=0
edm::Worker::Ready
Definition: Worker.h:90
edm::Worker::emitPostModuleEventPrefetchingSignal
void emitPostModuleEventPrefetchingSignal()
Definition: Worker.h:337
cms::Exception
Definition: Exception.h:70
HerwigMaxPtPartonFilter_cfi.moduleLabel
moduleLabel
Definition: HerwigMaxPtPartonFilter_cfi.py:4
edm::exception_actions::IgnoreCompletely
Definition: ExceptionActions.h:11
cms::Exception::category
std::string const & category() const
Definition: Exception.cc:143
submitPVValidationJobs.t
string t
Definition: submitPVValidationJobs.py:644
edm::Worker::kOutputModule
Definition: Worker.h:91
benchmark_cfg.fb
fb
Definition: benchmark_cfg.py:14
edm::StreamContext::Transition::kEndStream
edm::exception_actions::ActionCodes
ActionCodes
Definition: ExceptionActions.h:11
edm::Worker::shouldRethrowException
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent) const
Definition: Worker.cc:164
edm::Transition::NumberOfEventSetupTransitions
edm::ModuleCallingContext::State::kRunning
unpackBuffers-CaloStage2.token
token
Definition: unpackBuffers-CaloStage2.py:318