CMS 3D CMS Logo

List of all members | Classes | Public Types | Public Member Functions | Protected Member Functions | Private Member Functions | Static Private Member Functions | Private Attributes | Friends
edm::Worker Class Referenceabstract

#include <Worker.h>

Inheritance diagram for edm::Worker:
edm::WorkerT< T >

Classes

class  AcquireTask
 
class  AcquireTask< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin >, DUMMY >
 
class  HandleExternalWorkExceptionTask
 
class  RunModuleTask
 
struct  TaskQueueAdaptor
 
class  TransitionIDValue
 
class  TransitionIDValueBase
 

Public Types

enum  State { Ready, Pass, Fail, Exception }
 
enum  Types { kAnalyzer, kFilter, kProducer, kOutputModule }
 

Public Member Functions

void addedToPath ()
 
void beginJob ()
 
void beginStream (StreamID id, StreamContext &streamContext)
 
void callWhenDoneAsync (WaitingTask *task)
 
void clearCounters ()
 
virtual std::vector< ConsumesInfoconsumesInfo () const =0
 
virtual void convertCurrentProcessAlias (std::string const &processName)=0
 
ModuleDescription const * descPtr () const
 
ModuleDescription const & description () const
 
template<typename T >
bool doWork (typename T::MyPrincipal const &, EventSetupImpl const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
 
template<typename T >
void doWorkAsync (WaitingTask *task, typename T::MyPrincipal const &, EventSetupImpl const &c, ServiceToken const &token, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
 
template<typename T >
void doWorkNoPrefetchingAsync (WaitingTask *task, typename T::MyPrincipal const &, EventSetupImpl const &c, ServiceToken const &token, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
 
void endJob ()
 
void endStream (StreamID id, StreamContext &streamContext)
 
virtual SerialTaskQueueglobalLuminosityBlocksQueue ()=0
 
virtual SerialTaskQueueglobalRunsQueue ()=0
 
virtual bool hasAccumulator () const =0
 
virtual void modulesWhoseProductsAreConsumed (std::vector< ModuleDescription const * > &modules, ProductRegistry const &preg, std::map< std::string, ModuleDescription const * > const &labelsToDesc) const =0
 
virtual Types moduleType () const =0
 
Workeroperator= (Worker const &)=delete
 
void postDoEvent (EventPrincipal const &)
 
void prePrefetchSelectionAsync (WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *)
 
void prePrefetchSelectionAsync (WaitingTask *task, ServiceToken const &, StreamID stream, void const *)
 
void registerThinnedAssociations (ProductRegistry const &registry, ThinnedAssociationsHelper &helper)
 
void reset ()
 
virtual void resolvePutIndicies (BranchType iBranchType, std::unordered_multimap< std::string, std::tuple< TypeID const *, const char *, edm::ProductResolverIndex >> const &iIndicies)=0
 
void respondToCloseInputFile (FileBlock const &fb)
 
void respondToOpenInputFile (FileBlock const &fb)
 
template<typename T >
std::exception_ptr runModuleDirectly (typename T::MyPrincipal const &ep, EventSetupImpl const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
 
void setActivityRegistry (std::shared_ptr< ActivityRegistry > areg)
 
void setEarlyDeleteHelper (EarlyDeleteHelper *iHelper)
 
void skipOnPath ()
 
State state () const
 
int timesExcept () const
 
int timesFailed () const
 
int timesPass () const
 
int timesPassed () const
 
int timesRun () const
 
int timesVisited () const
 
virtual void updateLookup (BranchType iBranchType, ProductResolverIndexHelper const &)=0
 
virtual void updateLookup (eventsetup::ESRecordsToProxyIndices const &)=0
 
virtual bool wantsGlobalLuminosityBlocks () const =0
 
virtual bool wantsGlobalRuns () const =0
 
virtual bool wantsStreamLuminosityBlocks () const =0
 
virtual bool wantsStreamRuns () const =0
 
 Worker (ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
 
 Worker (Worker const &)=delete
 
virtual ~Worker ()
 

Protected Member Functions

ActivityRegistryactivityRegistry ()
 
virtual void implBeginJob ()=0
 
virtual void implBeginStream (StreamID)=0
 
virtual bool implDo (EventPrincipal const &, EventSetupImpl const &c, ModuleCallingContext const *mcc)=0
 
virtual void implDoAcquire (EventPrincipal const &, EventSetupImpl const &c, ModuleCallingContext const *mcc, WaitingTaskWithArenaHolder &holder)=0
 
virtual bool implDoBegin (RunPrincipal const &rp, EventSetupImpl const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoBegin (LuminosityBlockPrincipal const &lbp, EventSetupImpl const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoEnd (RunPrincipal const &rp, EventSetupImpl const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoEnd (LuminosityBlockPrincipal const &lbp, EventSetupImpl const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoPrePrefetchSelection (StreamID id, EventPrincipal const &ep, ModuleCallingContext const *mcc)=0
 
virtual bool implDoStreamBegin (StreamID id, RunPrincipal const &rp, EventSetupImpl const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoStreamBegin (StreamID id, LuminosityBlockPrincipal const &lbp, EventSetupImpl const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoStreamEnd (StreamID id, RunPrincipal const &rp, EventSetupImpl const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoStreamEnd (StreamID id, LuminosityBlockPrincipal const &lbp, EventSetupImpl const &c, ModuleCallingContext const *mcc)=0
 
virtual void implEndJob ()=0
 
virtual void implEndStream (StreamID)=0
 
virtual bool implNeedToRunSelection () const =0
 
virtual void itemsToGetForSelection (std::vector< ProductResolverIndexAndSkipBit > &) const =0
 
void resetModuleDescription (ModuleDescription const *)
 
virtual std::string workerType () const =0
 

Private Member Functions

void emitPostModuleEventPrefetchingSignal ()
 
std::exception_ptr handleExternalWorkException (std::exception_ptr const *iEPtr, ParentContext const &parentContext)
 
virtual bool hasAcquire () const =0
 
virtual void implRegisterThinnedAssociations (ProductRegistry const &, ThinnedAssociationsHelper &)=0
 
virtual void implRespondToCloseInputFile (FileBlock const &fb)=0
 
virtual void implRespondToOpenInputFile (FileBlock const &fb)=0
 
virtual void itemsMayGet (BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
 
virtual std::vector< ProductResolverIndex > const & itemsShouldPutInEvent () const =0
 
virtual void itemsToGet (BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
 
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom (BranchType) const =0
 
virtual void preActionBeforeRunEventAsync (WaitingTask *iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const =0
 
void prefetchAsync (WaitingTask *, ServiceToken const &, ParentContext const &parentContext, Principal const &)
 
void runAcquire (EventPrincipal const &ep, EventSetupImpl const &es, ParentContext const &parentContext, WaitingTaskWithArenaHolder &holder)
 
void runAcquireAfterAsyncPrefetch (std::exception_ptr const *iEPtr, EventPrincipal const &ep, EventSetupImpl const &es, ParentContext const &parentContext, WaitingTaskWithArenaHolder holder)
 
template<typename T >
bool runModule (typename T::MyPrincipal const &, EventSetupImpl const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
 
template<typename T >
std::exception_ptr runModuleAfterAsyncPrefetch (std::exception_ptr const *iEPtr, typename T::MyPrincipal const &ep, EventSetupImpl const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
 
virtual TaskQueueAdaptor serializeRunModule ()=0
 
template<bool IS_EVENT>
std::exception_ptr setException (std::exception_ptr iException)
 
template<bool IS_EVENT>
bool setFailed ()
 
template<bool IS_EVENT>
bool setPassed ()
 
bool shouldRethrowException (std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
 

Static Private Member Functions

static void exceptionContext (cms::Exception &ex, ModuleCallingContext const *mcc)
 

Private Attributes

ExceptionToActionTable const * actions_
 
std::shared_ptr< ActivityRegistryactReg_
 
std::exception_ptr cached_exception_
 
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
 
ModuleCallingContext moduleCallingContext_
 
std::atomic< int > numberOfPathsLeftToRun_
 
int numberOfPathsOn_
 
bool ranAcquireWithoutException_
 
std::atomic< Statestate_
 
std::atomic< int > timesExcept_
 
std::atomic< int > timesFailed_
 
std::atomic< int > timesPassed_
 
std::atomic< int > timesRun_
 
std::atomic< int > timesVisited_
 
edm::WaitingTaskList waitingTasks_
 
std::atomic< bool > workStarted_
 

Friends

template<typename O >
class workerhelper::CallImpl
 

Detailed Description

Definition at line 83 of file Worker.h.

Member Enumeration Documentation

Enumerator
Ready 
Pass 
Fail 
Exception 

Definition at line 85 of file Worker.h.

Enumerator
kAnalyzer 
kFilter 
kProducer 
kOutputModule 

Definition at line 86 of file Worker.h.

Constructor & Destructor Documentation

edm::Worker::Worker ( ModuleDescription const &  iMD,
ExceptionToActionTable const *  iActions 
)

Definition at line 83 of file Worker.cc.

84  : timesRun_(0),
85  timesVisited_(0),
86  timesPassed_(0),
87  timesFailed_(0),
88  timesExcept_(0),
89  state_(Ready),
93  actions_(iActions),
95  actReg_(),
96  earlyDeleteHelper_(nullptr),
97  workStarted_(false),
std::atomic< int > timesVisited_
Definition: Worker.h:601
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:607
std::atomic< int > timesExcept_
Definition: Worker.h:604
std::atomic< bool > workStarted_
Definition: Worker.h:619
std::atomic< int > timesFailed_
Definition: Worker.h:603
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:614
ExceptionToActionTable const * actions_
Definition: Worker.h:611
bool ranAcquireWithoutException_
Definition: Worker.h:620
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:609
std::exception_ptr cached_exception_
Definition: Worker.h:612
std::atomic< State > state_
Definition: Worker.h:605
int numberOfPathsOn_
Definition: Worker.h:606
std::atomic< int > timesPassed_
Definition: Worker.h:602
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:616
std::atomic< int > timesRun_
Definition: Worker.h:600
edm::Worker::~Worker ( )
virtual

Definition at line 100 of file Worker.cc.

100 {}
edm::Worker::Worker ( Worker const &  )
delete

Member Function Documentation

ActivityRegistry* edm::Worker::activityRegistry ( )
inlineprotected

Definition at line 284 of file Worker.h.

References edm::exceptionContext(), and benchmark_cfg::fb.

Referenced by edm::WorkerT< T >::implDo(), and edm::WorkerT< T >::implDoAcquire().

284 { return actReg_.get(); }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:614
void edm::Worker::addedToPath ( )
inline

Definition at line 225 of file Worker.h.

Referenced by edm::WorkerInPath::WorkerInPath().

225 { ++numberOfPathsOn_; }
int numberOfPathsOn_
Definition: Worker.h:606
void edm::Worker::beginJob ( void  )

Definition at line 284 of file Worker.cc.

References actReg_, cms::Exception::addContext(), description(), Exception, implBeginJob(), edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), state_, and edm::convertException::wrap().

Referenced by edm::WorkerManager::beginJob(), and edm::GlobalSchedule::replaceModule().

284  {
285  try {
286  convertException::wrap([&]() {
287  ModuleBeginJobSignalSentry cpp(actReg_.get(), description());
288  implBeginJob();
289  });
290  } catch (cms::Exception& ex) {
291  state_ = Exception;
292  std::ostringstream ost;
293  ost << "Calling beginJob for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
294  ex.addContext(ost.str());
295  throw;
296  }
297  }
ModuleDescription const & description() const
Definition: Worker.h:190
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:614
std::string const & moduleName() const
std::string const & moduleLabel() const
std::atomic< State > state_
Definition: Worker.h:605
virtual void implBeginJob()=0
void addContext(std::string const &context)
Definition: Exception.cc:165
auto wrap(F iFunc) -> decltype(iFunc())
void edm::Worker::beginStream ( StreamID  id,
StreamContext streamContext 
)

Definition at line 314 of file Worker.cc.

References actReg_, cms::Exception::addContext(), description(), Exception, implBeginStream(), edm::LuminosityBlockIndex::invalidLuminosityBlockIndex(), edm::RunIndex::invalidRunIndex(), edm::StreamContext::kBeginStream, edm::ModuleCallingContext::kRunning, moduleCallingContext_, edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), edm::StreamContext::setEventID(), edm::StreamContext::setLuminosityBlockIndex(), edm::StreamContext::setRunIndex(), edm::ModuleCallingContext::setState(), edm::StreamContext::setTimestamp(), edm::StreamContext::setTransition(), state_, and edm::convertException::wrap().

Referenced by edm::StreamSchedule::replaceModule().

314  {
315  try {
316  convertException::wrap([&]() {
317  streamContext.setTransition(StreamContext::Transition::kBeginStream);
318  streamContext.setEventID(EventID(0, 0, 0));
319  streamContext.setRunIndex(RunIndex::invalidRunIndex());
320  streamContext.setLuminosityBlockIndex(LuminosityBlockIndex::invalidLuminosityBlockIndex());
321  streamContext.setTimestamp(Timestamp());
322  ParentContext parentContext(&streamContext);
323  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
325  ModuleBeginStreamSignalSentry beginSentry(actReg_.get(), streamContext, moduleCallingContext_);
326  implBeginStream(id);
327  });
328  } catch (cms::Exception& ex) {
329  state_ = Exception;
330  std::ostringstream ost;
331  ost << "Calling beginStream for module " << description().moduleName() << "/'" << description().moduleLabel()
332  << "'";
333  ex.addContext(ost.str());
334  throw;
335  }
336  }
ModuleDescription const & description() const
Definition: Worker.h:190
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:614
std::string const & moduleName() const
std::string const & moduleLabel() const
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:609
static RunIndex invalidRunIndex()
Definition: RunIndex.cc:9
std::atomic< State > state_
Definition: Worker.h:605
static LuminosityBlockIndex invalidLuminosityBlockIndex()
void addContext(std::string const &context)
Definition: Exception.cc:165
virtual void implBeginStream(StreamID)=0
auto wrap(F iFunc) -> decltype(iFunc())
void edm::Worker::callWhenDoneAsync ( WaitingTask task)
inline

Definition at line 167 of file Worker.h.

References bk::beginJob().

Referenced by edm::SwitchProducerProductResolver::prefetchAsync_().

167 { waitingTasks_.add(task); }
void add(WaitingTask *)
Adds task to the waiting list.
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:618
void edm::Worker::clearCounters ( )
inline

Definition at line 217 of file Worker.h.

Referenced by edm::StreamSchedule::clearCounters().

217  {
218  timesRun_.store(0, std::memory_order_release);
219  timesVisited_.store(0, std::memory_order_release);
220  timesPassed_.store(0, std::memory_order_release);
221  timesFailed_.store(0, std::memory_order_release);
222  timesExcept_.store(0, std::memory_order_release);
223  }
std::atomic< int > timesVisited_
Definition: Worker.h:601
std::atomic< int > timesExcept_
Definition: Worker.h:604
std::atomic< int > timesFailed_
Definition: Worker.h:603
std::atomic< int > timesPassed_
Definition: Worker.h:602
std::atomic< int > timesRun_
Definition: Worker.h:600
virtual std::vector<ConsumesInfo> edm::Worker::consumesInfo ( ) const
pure virtual
virtual void edm::Worker::convertCurrentProcessAlias ( std::string const &  processName)
pure virtual

Implemented in edm::WorkerT< T >.

ModuleDescription const* edm::Worker::descPtr ( ) const
inline

Definition at line 191 of file Worker.h.

References modifiedElectrons_cfi::processName, and AlCaHLTBitMon_QueryRunRegistry::string.

ModuleCallingContext moduleCallingContext_
Definition: Worker.h:609
ModuleDescription const * moduleDescription() const
ModuleDescription const& edm::Worker::description ( ) const
inline
template<typename T >
bool edm::Worker::doWork ( typename T::MyPrincipal const &  ep,
EventSetupImpl const &  c,
StreamID  stream,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

Definition at line 961 of file Worker.h.

References Fail, edm::ServiceRegistry::instance(), edm::ModuleCallingContext::kInvalid, edm::ModuleCallingContext::kPrefetching, edm::make_empty_waiting_task(), Pass, edm::ServiceRegistry::presentToken(), createBeamHaloJobs::queue, Ready, and edm::ModuleCallingContext::setContext().

965  {
966  if (T::isEvent_) {
967  timesVisited_.fetch_add(1, std::memory_order_relaxed);
968  }
969  bool rc = false;
970 
971  switch (state_) {
972  case Ready:
973  break;
974  case Pass:
975  return true;
976  case Fail:
977  return false;
978  case Exception: {
979  std::rethrow_exception(cached_exception_);
980  }
981  }
982 
983  bool expected = false;
984  if (not workStarted_.compare_exchange_strong(expected, true)) {
985  //another thread beat us here
986  auto waitTask = edm::make_empty_waiting_task();
987  waitTask->increment_ref_count();
988 
989  waitingTasks_.add(waitTask.get());
990 
991  waitTask->wait_for_all();
992 
993  switch (state_) {
994  case Ready:
995  assert(false);
996  case Pass:
997  return true;
998  case Fail:
999  return false;
1000  case Exception: {
1001  std::rethrow_exception(cached_exception_);
1002  }
1003  }
1004  }
1005 
1006  //Need the context to be set until after any exception is resolved
1008 
1009  auto resetContext = [](ModuleCallingContext* iContext) {
1010  iContext->setContext(ModuleCallingContext::State::kInvalid, ParentContext(), nullptr);
1011  };
1012  std::unique_ptr<ModuleCallingContext, decltype(resetContext)> prefetchSentry(&moduleCallingContext_, resetContext);
1013 
1014  if (T::isEvent_) {
1015  //if have TriggerResults based selection we want to reject the event before doing prefetching
1016  if (workerhelper::CallImpl<T>::needToRunSelection(this)) {
1017  auto waitTask = edm::make_empty_waiting_task();
1018  waitTask->set_ref_count(2);
1019  prePrefetchSelectionAsync(waitTask.get(), ServiceRegistry::instance().presentToken(), streamID, &ep);
1020  waitTask->decrement_ref_count();
1021  waitTask->wait_for_all();
1022 
1023  if (state() != Ready) {
1024  //The selection must have rejected this event
1025  return true;
1026  }
1027  }
1028  auto waitTask = edm::make_empty_waiting_task();
1029  {
1030  //Make sure signal is sent once the prefetching is done
1031  // [the 'pre' signal was sent in prefetchAsync]
1032  //The purpose of this block is to send the signal after wait_for_all
1033  auto sentryFunc = [this](void*) { emitPostModuleEventPrefetchingSignal(); };
1034  std::unique_ptr<ActivityRegistry, decltype(sentryFunc)> signalSentry(actReg_.get(), sentryFunc);
1035 
1036  //set count to 2 since wait_for_all requires value to not go to 0
1037  waitTask->set_ref_count(2);
1038 
1039  prefetchAsync(waitTask.get(), ServiceRegistry::instance().presentToken(), parentContext, ep);
1040  waitTask->decrement_ref_count();
1041  waitTask->wait_for_all();
1042  }
1043  if (waitTask->exceptionPtr() != nullptr) {
1044  TransitionIDValue<typename T::MyPrincipal> idValue(ep);
1045  if (shouldRethrowException(*waitTask->exceptionPtr(), parentContext, T::isEvent_, idValue)) {
1046  setException<T::isEvent_>(*waitTask->exceptionPtr());
1048  std::rethrow_exception(cached_exception_);
1049  } else {
1050  setPassed<T::isEvent_>();
1051  waitingTasks_.doneWaiting(nullptr);
1052  return true;
1053  }
1054  }
1055  }
1056 
1057  //successful prefetch so no reset necessary
1058  prefetchSentry.release();
1059  if (auto queue = serializeRunModule()) {
1060  auto serviceToken = ServiceRegistry::instance().presentToken();
1061  queue.pushAndWait([&]() {
1062  //Need to make the services available
1063  ServiceRegistry::Operate guard(serviceToken);
1064  try {
1065  rc = runModule<T>(ep, es, streamID, parentContext, context);
1066  } catch (...) {
1067  }
1068  });
1069  } else {
1070  try {
1071  rc = runModule<T>(ep, es, streamID, parentContext, context);
1072  } catch (...) {
1073  }
1074  }
1075  if (state_ == Exception) {
1077  std::rethrow_exception(cached_exception_);
1078  }
1079 
1080  waitingTasks_.doneWaiting(nullptr);
1081  return rc;
1082  }
std::atomic< int > timesVisited_
Definition: Worker.h:601
std::atomic< bool > workStarted_
Definition: Worker.h:619
void add(WaitingTask *)
Adds task to the waiting list.
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:614
State state() const
Definition: Worker.h:232
ServiceToken presentToken() const
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:609
std::exception_ptr cached_exception_
Definition: Worker.h:612
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
Definition: Worker.cc:156
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:618
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
static ServiceRegistry & instance()
std::atomic< State > state_
Definition: Worker.h:605
void prefetchAsync(WaitingTask *, ServiceToken const &, ParentContext const &parentContext, Principal const &)
Definition: Worker.cc:199
virtual TaskQueueAdaptor serializeRunModule()=0
void prePrefetchSelectionAsync(WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *)
Definition: Worker.cc:232
void emitPostModuleEventPrefetchingSignal()
Definition: Worker.h:374
template<typename T >
void edm::Worker::doWorkAsync ( WaitingTask task,
typename T::MyPrincipal const &  ep,
EventSetupImpl const &  c,
ServiceToken const &  token,
StreamID  stream,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

Definition at line 827 of file Worker.h.

References pyrootRender::destroy(), edm::ModuleCallingContext::kPrefetching, edm::make_waiting_task(), eostools::move(), AlCaHLTBitMon_ParallelJobs::p, fetchall_from_DQM_v2::release, edm::ModuleCallingContext::setContext(), and protons_cff::t.

Referenced by edm::WorkerInPath::runWorkerAsync().

833  {
834  if (not workerhelper::CallImpl<T>::wantsTransition(this)) {
835  return;
836  }
837 
838  waitingTasks_.add(task);
839  if (T::isEvent_) {
840  timesVisited_.fetch_add(1, std::memory_order_relaxed);
841  }
842 
843  bool expected = false;
844  if (workStarted_.compare_exchange_strong(expected, true)) {
846 
847  //if have TriggerResults based selection we want to reject the event before doing prefetching
848  if (workerhelper::CallImpl<T>::needToRunSelection(this)) {
849  //We need to run the selection in a different task so that
850  // we can prefetch the data needed for the selection
851  auto runTask =
852  new (tbb::task::allocate_root()) RunModuleTask<T>(this, ep, es, token, streamID, parentContext, context);
853 
854  //make sure the task is either run or destroyed
855  struct DestroyTask {
856  DestroyTask(edm::WaitingTask* iTask) : m_task(iTask) {}
857 
858  ~DestroyTask() {
859  auto p = m_task.load();
860  if (p) {
862  }
863  }
864 
866  auto t = m_task.load();
867  m_task.store(nullptr);
868  return t;
869  }
870 
871  std::atomic<edm::WaitingTask*> m_task;
872  };
873 
874  auto ownRunTask = std::make_shared<DestroyTask>(runTask);
875  auto selectionTask =
876  make_waiting_task(tbb::task::allocate_root(),
877  [ownRunTask, parentContext, &ep, token, this](std::exception_ptr const*) mutable {
878  ServiceRegistry::Operate guard(token);
879  prefetchAsync(ownRunTask->release(), token, parentContext, ep);
880  });
881  prePrefetchSelectionAsync(selectionTask, token, streamID, &ep);
882  } else {
883  WaitingTask* moduleTask =
884  new (tbb::task::allocate_root()) RunModuleTask<T>(this, ep, es, token, streamID, parentContext, context);
885  if (T::isEvent_ && hasAcquire()) {
886  WaitingTaskWithArenaHolder runTaskHolder(
887  new (tbb::task::allocate_root()) HandleExternalWorkExceptionTask(this, moduleTask, parentContext));
888  moduleTask = new (tbb::task::allocate_root())
889  AcquireTask<T>(this, ep, es, token, parentContext, std::move(runTaskHolder));
890  }
891  prefetchAsync(moduleTask, token, parentContext, ep);
892  }
893  }
894  }
std::atomic< int > timesVisited_
Definition: Worker.h:601
def destroy(e)
Definition: pyrootRender.py:15
std::atomic< bool > workStarted_
Definition: Worker.h:619
void add(WaitingTask *)
Adds task to the waiting list.
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:609
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:618
virtual bool hasAcquire() const =0
void prefetchAsync(WaitingTask *, ServiceToken const &, ParentContext const &parentContext, Principal const &)
Definition: Worker.cc:199
void prePrefetchSelectionAsync(WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *)
Definition: Worker.cc:232
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:87
def move(src, dest)
Definition: eostools.py:511
template<typename T >
void edm::Worker::doWorkNoPrefetchingAsync ( WaitingTask task,
typename T::MyPrincipal const &  principal,
EventSetupImpl const &  c,
ServiceToken const &  token,
StreamID  stream,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

Definition at line 926 of file Worker.h.

References edm::make_functor_task(), cmsRelvalreport::principal(), and createBeamHaloJobs::queue.

Referenced by edm::WorkerInPath::runWorkerAsync().

932  {
933  if (not workerhelper::CallImpl<T>::wantsTransition(this)) {
934  return;
935  }
936  waitingTasks_.add(task);
937  bool expected = false;
938  if (workStarted_.compare_exchange_strong(expected, true)) {
939  auto toDo = [this, &principal, &es, streamID, parentContext, context, serviceToken]() {
940  std::exception_ptr exceptionPtr;
941  try {
942  //Need to make the services available
943  ServiceRegistry::Operate guard(serviceToken);
944 
945  this->runModule<T>(principal, es, streamID, parentContext, context);
946  } catch (...) {
947  exceptionPtr = std::current_exception();
948  }
949  this->waitingTasks_.doneWaiting(exceptionPtr);
950  };
951  if (auto queue = this->serializeRunModule()) {
952  queue.push(toDo);
953  } else {
954  auto task = make_functor_task(tbb::task::allocate_root(), toDo);
955  tbb::task::spawn(*task);
956  }
957  }
958  }
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
std::atomic< bool > workStarted_
Definition: Worker.h:619
void add(WaitingTask *)
Adds task to the waiting list.
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
def principal(options)
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:618
virtual TaskQueueAdaptor serializeRunModule()=0
void edm::Worker::emitPostModuleEventPrefetchingSignal ( )
inlineprivate

Definition at line 374 of file Worker.h.

374  {
375  actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
376  }
StreamContext const * getStreamContext() const
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:614
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:609
void edm::Worker::endJob ( void  )

Definition at line 299 of file Worker.cc.

References actReg_, cms::Exception::addContext(), description(), Exception, implEndJob(), edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), state_, and edm::convertException::wrap().

299  {
300  try {
301  convertException::wrap([&]() {
302  ModuleEndJobSignalSentry cpp(actReg_.get(), description());
303  implEndJob();
304  });
305  } catch (cms::Exception& ex) {
306  state_ = Exception;
307  std::ostringstream ost;
308  ost << "Calling endJob for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
309  ex.addContext(ost.str());
310  throw;
311  }
312  }
ModuleDescription const & description() const
Definition: Worker.h:190
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:614
std::string const & moduleName() const
std::string const & moduleLabel() const
std::atomic< State > state_
Definition: Worker.h:605
virtual void implEndJob()=0
void addContext(std::string const &context)
Definition: Exception.cc:165
auto wrap(F iFunc) -> decltype(iFunc())
void edm::Worker::endStream ( StreamID  id,
StreamContext streamContext 
)

Definition at line 338 of file Worker.cc.

References actReg_, cms::Exception::addContext(), description(), Exception, implEndStream(), edm::LuminosityBlockIndex::invalidLuminosityBlockIndex(), edm::RunIndex::invalidRunIndex(), edm::StreamContext::kEndStream, edm::ModuleCallingContext::kRunning, moduleCallingContext_, edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), edm::StreamContext::setEventID(), edm::StreamContext::setLuminosityBlockIndex(), edm::StreamContext::setRunIndex(), edm::ModuleCallingContext::setState(), edm::StreamContext::setTimestamp(), edm::StreamContext::setTransition(), state_, and edm::convertException::wrap().

338  {
339  try {
340  convertException::wrap([&]() {
341  streamContext.setTransition(StreamContext::Transition::kEndStream);
342  streamContext.setEventID(EventID(0, 0, 0));
343  streamContext.setRunIndex(RunIndex::invalidRunIndex());
344  streamContext.setLuminosityBlockIndex(LuminosityBlockIndex::invalidLuminosityBlockIndex());
345  streamContext.setTimestamp(Timestamp());
346  ParentContext parentContext(&streamContext);
347  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
349  ModuleEndStreamSignalSentry endSentry(actReg_.get(), streamContext, moduleCallingContext_);
350  implEndStream(id);
351  });
352  } catch (cms::Exception& ex) {
353  state_ = Exception;
354  std::ostringstream ost;
355  ost << "Calling endStream for module " << description().moduleName() << "/'" << description().moduleLabel()
356  << "'";
357  ex.addContext(ost.str());
358  throw;
359  }
360  }
ModuleDescription const & description() const
Definition: Worker.h:190
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:614
std::string const & moduleName() const
std::string const & moduleLabel() const
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:609
static RunIndex invalidRunIndex()
Definition: RunIndex.cc:9
std::atomic< State > state_
Definition: Worker.h:605
static LuminosityBlockIndex invalidLuminosityBlockIndex()
void addContext(std::string const &context)
Definition: Exception.cc:165
virtual void implEndStream(StreamID)=0
auto wrap(F iFunc) -> decltype(iFunc())
void edm::Worker::exceptionContext ( cms::Exception ex,
ModuleCallingContext const *  mcc 
)
staticprivate

Definition at line 104 of file Worker.cc.

References cms::Exception::addContext(), edm::exceptionContext(), edm::ModuleCallingContext::globalContext(), edm::ModuleCallingContext::internalContext(), edm::ParentContext::kGlobal, edm::ParentContext::kInternal, edm::ParentContext::kModule, edm::ParentContext::kPlaceInPath, edm::ModuleCallingContext::kPrefetching, edm::ParentContext::kStream, edm::InternalContext::moduleCallingContext(), edm::ModuleCallingContext::moduleCallingContext(), edm::ModuleCallingContext::moduleDescription(), edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), or, edm::PlaceInPathContext::pathContext(), edm::PathContext::pathName(), edm::ModuleCallingContext::placeInPathContext(), edm::ModuleCallingContext::state(), edm::PathContext::streamContext(), edm::ModuleCallingContext::streamContext(), and edm::ModuleCallingContext::type().

Referenced by handleExternalWorkException(), and runAcquire().

104  {
105  ModuleCallingContext const* imcc = mcc;
106  while ((imcc->type() == ParentContext::Type::kModule) or (imcc->type() == ParentContext::Type::kInternal)) {
107  std::ostringstream iost;
108  if (imcc->state() == ModuleCallingContext::State::kPrefetching) {
109  iost << "Prefetching for module ";
110  } else {
111  iost << "Calling method for module ";
112  }
113  iost << imcc->moduleDescription()->moduleName() << "/'" << imcc->moduleDescription()->moduleLabel() << "'";
114 
115  if (imcc->type() == ParentContext::Type::kInternal) {
116  iost << " (probably inside some kind of mixing module)";
117  imcc = imcc->internalContext()->moduleCallingContext();
118  } else {
119  imcc = imcc->moduleCallingContext();
120  }
121  ex.addContext(iost.str());
122  }
123  std::ostringstream ost;
124  if (imcc->state() == ModuleCallingContext::State::kPrefetching) {
125  ost << "Prefetching for module ";
126  } else {
127  ost << "Calling method for module ";
128  }
129  ost << imcc->moduleDescription()->moduleName() << "/'" << imcc->moduleDescription()->moduleLabel() << "'";
130  ex.addContext(ost.str());
131 
132  if (imcc->type() == ParentContext::Type::kPlaceInPath) {
133  ost.str("");
134  ost << "Running path '";
135  ost << imcc->placeInPathContext()->pathContext()->pathName() << "'";
136  ex.addContext(ost.str());
137  auto streamContext = imcc->placeInPathContext()->pathContext()->streamContext();
138  if (streamContext) {
139  ost.str("");
140  edm::exceptionContext(ost, *streamContext);
141  ex.addContext(ost.str());
142  }
143  } else {
144  if (imcc->type() == ParentContext::Type::kStream) {
145  ost.str("");
146  edm::exceptionContext(ost, *(imcc->streamContext()));
147  ex.addContext(ost.str());
148  } else if (imcc->type() == ParentContext::Type::kGlobal) {
149  ost.str("");
150  edm::exceptionContext(ost, *(imcc->globalContext()));
151  ex.addContext(ost.str());
152  }
153  }
154  }
void exceptionContext(std::ostream &, GlobalContext const &)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
void addContext(std::string const &context)
Definition: Exception.cc:165
virtual SerialTaskQueue* edm::Worker::globalLuminosityBlocksQueue ( )
pure virtual
virtual SerialTaskQueue* edm::Worker::globalRunsQueue ( )
pure virtual
std::exception_ptr edm::Worker::handleExternalWorkException ( std::exception_ptr const *  iEPtr,
ParentContext const &  parentContext 
)
private

Definition at line 417 of file Worker.cc.

References exceptionContext(), moduleCallingContext_, ranAcquireWithoutException_, and edm::convertException::wrap().

Referenced by edm::Worker::HandleExternalWorkExceptionTask::execute().

418  {
420  try {
421  convertException::wrap([iEPtr]() { std::rethrow_exception(*iEPtr); });
422  } catch (cms::Exception& ex) {
423  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
425  return std::current_exception();
426  }
427  }
428  return *iEPtr;
429  }
static void exceptionContext(cms::Exception &ex, ModuleCallingContext const *mcc)
Definition: Worker.cc:104
bool ranAcquireWithoutException_
Definition: Worker.h:620
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:609
auto wrap(F iFunc) -> decltype(iFunc())
virtual bool edm::Worker::hasAccumulator ( ) const
pure virtual
virtual bool edm::Worker::hasAcquire ( ) const
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::implBeginJob ( )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by beginJob().

virtual void edm::Worker::implBeginStream ( StreamID  )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by beginStream().

virtual bool edm::Worker::implDo ( EventPrincipal const &  ,
EventSetupImpl const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual
virtual void edm::Worker::implDoAcquire ( EventPrincipal const &  ,
EventSetupImpl const &  c,
ModuleCallingContext const *  mcc,
WaitingTaskWithArenaHolder holder 
)
protectedpure virtual
virtual bool edm::Worker::implDoBegin ( RunPrincipal const &  rp,
EventSetupImpl const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual
virtual bool edm::Worker::implDoBegin ( LuminosityBlockPrincipal const &  lbp,
EventSetupImpl const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

virtual bool edm::Worker::implDoEnd ( RunPrincipal const &  rp,
EventSetupImpl const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual
virtual bool edm::Worker::implDoEnd ( LuminosityBlockPrincipal const &  lbp,
EventSetupImpl const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

virtual bool edm::Worker::implDoPrePrefetchSelection ( StreamID  id,
EventPrincipal const &  ep,
ModuleCallingContext const *  mcc 
)
protectedpure virtual
virtual bool edm::Worker::implDoStreamBegin ( StreamID  id,
RunPrincipal const &  rp,
EventSetupImpl const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual
virtual bool edm::Worker::implDoStreamBegin ( StreamID  id,
LuminosityBlockPrincipal const &  lbp,
EventSetupImpl const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

virtual bool edm::Worker::implDoStreamEnd ( StreamID  id,
RunPrincipal const &  rp,
EventSetupImpl const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual
virtual bool edm::Worker::implDoStreamEnd ( StreamID  id,
LuminosityBlockPrincipal const &  lbp,
EventSetupImpl const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::implEndJob ( )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by endJob().

virtual void edm::Worker::implEndStream ( StreamID  )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by endStream().

virtual bool edm::Worker::implNeedToRunSelection ( ) const
protectedpure virtual
virtual void edm::Worker::implRegisterThinnedAssociations ( ProductRegistry const &  ,
ThinnedAssociationsHelper  
)
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::implRespondToCloseInputFile ( FileBlock const &  fb)
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::implRespondToOpenInputFile ( FileBlock const &  fb)
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::itemsMayGet ( BranchType  ,
std::vector< ProductResolverIndexAndSkipBit > &   
) const
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual std::vector<ProductResolverIndex> const& edm::Worker::itemsShouldPutInEvent ( ) const
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::itemsToGet ( BranchType  ,
std::vector< ProductResolverIndexAndSkipBit > &   
) const
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::itemsToGetForSelection ( std::vector< ProductResolverIndexAndSkipBit > &  ) const
protectedpure virtual
virtual std::vector<ProductResolverIndexAndSkipBit> const& edm::Worker::itemsToGetFrom ( BranchType  ) const
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by prefetchAsync().

virtual void edm::Worker::modulesWhoseProductsAreConsumed ( std::vector< ModuleDescription const * > &  modules,
ProductRegistry const &  preg,
std::map< std::string, ModuleDescription const * > const &  labelsToDesc 
) const
pure virtual

Implemented in edm::WorkerT< T >.

virtual Types edm::Worker::moduleType ( ) const
pure virtual
Worker& edm::Worker::operator= ( Worker const &  )
delete
void edm::Worker::postDoEvent ( EventPrincipal const &  iEvent)

Definition at line 368 of file Worker.cc.

References earlyDeleteHelper_.

Referenced by edm::WorkerT< T >::implDo().

368  {
369  if (earlyDeleteHelper_) {
370  earlyDeleteHelper_->moduleRan(iEvent);
371  }
372  }
int iEvent
Definition: GenABIO.cc:224
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:616
virtual void edm::Worker::preActionBeforeRunEventAsync ( WaitingTask iTask,
ModuleCallingContext const &  moduleCallingContext,
Principal const &  iPrincipal 
) const
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by prefetchAsync().

void edm::Worker::prefetchAsync ( WaitingTask iTask,
ServiceToken const &  token,
ParentContext const &  parentContext,
Principal const &  iPrincipal 
)
private

Definition at line 199 of file Worker.cc.

References actReg_, edm::Principal::branchType(), edm::ModuleCallingContext::getStreamContext(), edm::InEvent, mps_monitormerge::items, itemsToGetFrom(), edm::ModuleCallingContext::kPrefetching, moduleCallingContext_, preActionBeforeRunEventAsync(), edm::Principal::prefetchAsync(), edm::ProductResolverIndexAmbiguous, and edm::ModuleCallingContext::setContext().

202  {
203  // Prefetch products the module declares it consumes (not including the products it maybe consumes)
204  std::vector<ProductResolverIndexAndSkipBit> const& items = itemsToGetFrom(iPrincipal.branchType());
205 
207 
208  if (iPrincipal.branchType() == InEvent) {
209  actReg_->preModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
210  }
211 
212  //Need to be sure the ref count isn't set to 0 immediately
213  iTask->increment_ref_count();
214  for (auto const& item : items) {
215  ProductResolverIndex productResolverIndex = item.productResolverIndex();
216  bool skipCurrentProcess = item.skipCurrentProcess();
217  if (productResolverIndex != ProductResolverIndexAmbiguous) {
218  iPrincipal.prefetchAsync(iTask, productResolverIndex, skipCurrentProcess, token, &moduleCallingContext_);
219  }
220  }
221 
222  if (iPrincipal.branchType() == InEvent) {
224  }
225 
226  if (0 == iTask->decrement_ref_count()) {
227  //if everything finishes before we leave this routine, we need to launch the task
228  tbb::task::spawn(*iTask);
229  }
230  }
unsigned int ProductResolverIndex
StreamContext const * getStreamContext() const
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:614
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:609
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom(BranchType) const =0
virtual void preActionBeforeRunEventAsync(WaitingTask *iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const =0
void edm::Worker::prePrefetchSelectionAsync ( WaitingTask task,
ServiceToken const &  token,
StreamID  stream,
EventPrincipal const *  iPrincipal 
)

Definition at line 232 of file Worker.cc.

References pyrootRender::destroy(), edm::WaitingTaskList::doneWaiting(), implDoPrePrefetchSelection(), mps_monitormerge::items, itemsToGetForSelection(), edm::make_waiting_task(), moduleCallingContext_, edm::Principal::prefetchAsync(), edm::ProductResolverIndexAmbiguous, timesRun_, and waitingTasks_.

235  {
236  successTask->increment_ref_count();
237 
238  auto choiceTask = edm::make_waiting_task(
239  tbb::task::allocate_root(), [id, successTask, iPrincipal, this, token](std::exception_ptr const*) {
240  ServiceRegistry::Operate guard(token);
241  try {
242  if (not implDoPrePrefetchSelection(id, *iPrincipal, &moduleCallingContext_)) {
243  timesRun_.fetch_add(1, std::memory_order_relaxed);
244  setPassed<true>();
245  waitingTasks_.doneWaiting(nullptr);
246  //TBB requires that destroyed tasks have count 0
247  if (0 == successTask->decrement_ref_count()) {
248  tbb::task::destroy(*successTask);
249  }
250  return;
251  }
252  } catch (...) {
253  }
254  if (0 == successTask->decrement_ref_count()) {
255  tbb::task::spawn(*successTask);
256  }
257  });
258 
259  WaitingTaskHolder choiceHolder{choiceTask};
260 
261  std::vector<ProductResolverIndexAndSkipBit> items;
262  itemsToGetForSelection(items);
263 
264  for (auto const& item : items) {
265  ProductResolverIndex productResolverIndex = item.productResolverIndex();
266  bool skipCurrentProcess = item.skipCurrentProcess();
267  if (productResolverIndex != ProductResolverIndexAmbiguous) {
268  iPrincipal->prefetchAsync(choiceTask, productResolverIndex, skipCurrentProcess, token, &moduleCallingContext_);
269  }
270  }
271  choiceHolder.doneWaiting(std::exception_ptr{});
272  }
unsigned int ProductResolverIndex
def destroy(e)
Definition: pyrootRender.py:15
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:609
virtual void itemsToGetForSelection(std::vector< ProductResolverIndexAndSkipBit > &) const =0
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:618
virtual bool implDoPrePrefetchSelection(StreamID id, EventPrincipal const &ep, ModuleCallingContext const *mcc)=0
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:87
std::atomic< int > timesRun_
Definition: Worker.h:600
void edm::Worker::prePrefetchSelectionAsync ( WaitingTask task,
ServiceToken const &  ,
StreamID  stream,
void const *   
)
inline

Definition at line 138 of file Worker.h.

138  {
139  assert(false);
140  }
void edm::Worker::registerThinnedAssociations ( ProductRegistry const &  registry,
ThinnedAssociationsHelper helper 
)
inline

Definition at line 176 of file Worker.h.

176  {
178  }
Definition: helper.py:1
virtual void implRegisterThinnedAssociations(ProductRegistry const &, ThinnedAssociationsHelper &)=0
static Interceptor::Registry registry("Interceptor")
void edm::Worker::reset ( void  )
inline

Definition at line 180 of file Worker.h.

References Ready.

Referenced by edm::WorkerManager::resetAll().

180  {
181  cached_exception_ = std::exception_ptr();
182  state_ = Ready;
184  workStarted_ = false;
186  }
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:607
std::atomic< bool > workStarted_
Definition: Worker.h:619
void reset()
Resets access to the resource so that added tasks will wait.
std::exception_ptr cached_exception_
Definition: Worker.h:612
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:618
std::atomic< State > state_
Definition: Worker.h:605
int numberOfPathsOn_
Definition: Worker.h:606
void edm::Worker::resetModuleDescription ( ModuleDescription const *  iDesc)
protected
virtual void edm::Worker::resolvePutIndicies ( BranchType  iBranchType,
std::unordered_multimap< std::string, std::tuple< TypeID const *, const char *, edm::ProductResolverIndex >> const &  iIndicies 
)
pure virtual

Implemented in edm::WorkerT< T >.

Referenced by edm::Schedule::changeModule().

void edm::Worker::respondToCloseInputFile ( FileBlock const &  fb)
inline

Definition at line 174 of file Worker.h.

Referenced by edm::Schedule::respondToCloseInputFile().

virtual void implRespondToCloseInputFile(FileBlock const &fb)=0
void edm::Worker::respondToOpenInputFile ( FileBlock const &  fb)
inline

Definition at line 173 of file Worker.h.

Referenced by edm::Schedule::respondToOpenInputFile().

virtual void implRespondToOpenInputFile(FileBlock const &fb)=0
void edm::Worker::runAcquire ( EventPrincipal const &  ep,
EventSetupImpl const &  es,
ParentContext const &  parentContext,
WaitingTaskWithArenaHolder holder 
)
private

Definition at line 374 of file Worker.cc.

References exceptionContext(), implDoAcquire(), moduleCallingContext_, shouldRethrowException(), timesRun_, and edm::convertException::wrap().

Referenced by runAcquireAfterAsyncPrefetch().

377  {
378  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
379  try {
380  convertException::wrap([&]() { this->implDoAcquire(ep, es, &moduleCallingContext_, holder); });
381  } catch (cms::Exception& ex) {
383  TransitionIDValue<EventPrincipal> idValue(ep);
384  if (shouldRethrowException(std::current_exception(), parentContext, true, idValue)) {
385  timesRun_.fetch_add(1, std::memory_order_relaxed);
386  throw;
387  }
388  }
389  }
static void exceptionContext(cms::Exception &ex, ModuleCallingContext const *mcc)
Definition: Worker.cc:104
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:609
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
Definition: Worker.cc:156
virtual void implDoAcquire(EventPrincipal const &, EventSetupImpl const &c, ModuleCallingContext const *mcc, WaitingTaskWithArenaHolder &holder)=0
auto wrap(F iFunc) -> decltype(iFunc())
std::atomic< int > timesRun_
Definition: Worker.h:600
void edm::Worker::runAcquireAfterAsyncPrefetch ( std::exception_ptr const *  iEPtr,
EventPrincipal const &  ep,
EventSetupImpl const &  es,
ParentContext const &  parentContext,
WaitingTaskWithArenaHolder  holder 
)
private

Definition at line 391 of file Worker.cc.

References edm::WaitingTaskWithArenaHolder::doneWaiting(), edm::ModuleCallingContext::kInvalid, moduleCallingContext_, ranAcquireWithoutException_, runAcquire(), edm::ModuleCallingContext::setContext(), and shouldRethrowException().

395  {
397  std::exception_ptr exceptionPtr;
398  if (iEPtr) {
399  assert(*iEPtr);
400  TransitionIDValue<EventPrincipal> idValue(ep);
401  if (shouldRethrowException(*iEPtr, parentContext, true, idValue)) {
402  exceptionPtr = *iEPtr;
403  }
405  } else {
406  try {
407  runAcquire(ep, es, parentContext, holder);
409  } catch (...) {
410  exceptionPtr = std::current_exception();
411  }
412  }
413  // It is important this is after runAcquire completely finishes
414  holder.doneWaiting(exceptionPtr);
415  }
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
bool ranAcquireWithoutException_
Definition: Worker.h:620
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:609
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
Definition: Worker.cc:156
void runAcquire(EventPrincipal const &ep, EventSetupImpl const &es, ParentContext const &parentContext, WaitingTaskWithArenaHolder &holder)
Definition: Worker.cc:374
template<typename T >
bool edm::Worker::runModule ( typename T::MyPrincipal const &  ep,
EventSetupImpl const &  c,
StreamID  stream,
ParentContext const &  parentContext,
typename T::Context const *  context 
)
private

Definition at line 1085 of file Worker.h.

References edm::exceptionContext(), and edm::convertException::wrap().

1089  {
1090  //unscheduled producers should advance this
1091  //if (T::isEvent_) {
1092  // ++timesVisited_;
1093  //}
1094  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
1095  if (T::isEvent_) {
1096  timesRun_.fetch_add(1, std::memory_order_relaxed);
1097  }
1098 
1099  bool rc = true;
1100  try {
1101  convertException::wrap([&]() {
1102  rc = workerhelper::CallImpl<T>::call(this, streamID, ep, es, actReg_.get(), &moduleCallingContext_, context);
1103 
1104  if (rc) {
1105  setPassed<T::isEvent_>();
1106  } else {
1107  setFailed<T::isEvent_>();
1108  }
1109  });
1110  } catch (cms::Exception& ex) {
1112  TransitionIDValue<typename T::MyPrincipal> idValue(ep);
1113  if (shouldRethrowException(std::current_exception(), parentContext, T::isEvent_, idValue)) {
1114  assert(not cached_exception_);
1115  setException<T::isEvent_>(std::current_exception());
1116  std::rethrow_exception(cached_exception_);
1117  } else {
1118  rc = setPassed<T::isEvent_>();
1119  }
1120  }
1121 
1122  return rc;
1123  }
static void exceptionContext(cms::Exception &ex, ModuleCallingContext const *mcc)
Definition: Worker.cc:104
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:614
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:609
std::exception_ptr cached_exception_
Definition: Worker.h:612
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
Definition: Worker.cc:156
auto wrap(F iFunc) -> decltype(iFunc())
std::atomic< int > timesRun_
Definition: Worker.h:600
template<typename T >
std::exception_ptr edm::Worker::runModuleAfterAsyncPrefetch ( std::exception_ptr const *  iEPtr,
typename T::MyPrincipal const &  ep,
EventSetupImpl const &  es,
StreamID  streamID,
ParentContext const &  parentContext,
typename T::Context const *  context 
)
private

Definition at line 897 of file Worker.h.

References edm::ModuleCallingContext::kInvalid, and edm::ModuleCallingContext::setContext().

902  {
903  std::exception_ptr exceptionPtr;
904  if (iEPtr) {
905  assert(*iEPtr);
906  TransitionIDValue<typename T::MyPrincipal> idValue(ep);
907  if (shouldRethrowException(*iEPtr, parentContext, T::isEvent_, idValue)) {
908  exceptionPtr = *iEPtr;
909  setException<T::isEvent_>(exceptionPtr);
910  } else {
911  setPassed<T::isEvent_>();
912  }
914  } else {
915  try {
916  runModule<T>(ep, es, streamID, parentContext, context);
917  } catch (...) {
918  exceptionPtr = std::current_exception();
919  }
920  }
921  waitingTasks_.doneWaiting(exceptionPtr);
922  return exceptionPtr;
923  }
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:609
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
Definition: Worker.cc:156
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:618
template<typename T >
std::exception_ptr edm::Worker::runModuleDirectly ( typename T::MyPrincipal const &  ep,
EventSetupImpl const &  es,
StreamID  streamID,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

Definition at line 1126 of file Worker.h.

Referenced by edm::Path::finished().

1130  {
1131  timesVisited_.fetch_add(1, std::memory_order_relaxed);
1132  std::exception_ptr const* prefetchingException = nullptr; // null because there was no prefetching to do
1133  return runModuleAfterAsyncPrefetch<T>(prefetchingException, ep, es, streamID, parentContext, context);
1134  }
std::atomic< int > timesVisited_
Definition: Worker.h:601
virtual TaskQueueAdaptor edm::Worker::serializeRunModule ( )
privatepure virtual
void edm::Worker::setActivityRegistry ( std::shared_ptr< ActivityRegistry areg)

The signals are required to live longer than the last call to 'doWork' this was done to improve performance based on profiling

Definition at line 102 of file Worker.cc.

References actReg_.

102 { actReg_ = areg; }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:614
void edm::Worker::setEarlyDeleteHelper ( EarlyDeleteHelper iHelper)

Definition at line 274 of file Worker.cc.

References earlyDeleteHelper_.

274 { earlyDeleteHelper_ = iHelper; }
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:616
template<bool IS_EVENT>
std::exception_ptr edm::Worker::setException ( std::exception_ptr  iException)
inlineprivate

Definition at line 363 of file Worker.h.

References Exception.

363  {
364  if (IS_EVENT) {
365  timesExcept_.fetch_add(1, std::memory_order_relaxed);
366  }
367  cached_exception_ = iException; // propagate_const<T> has no reset() function
368  state_ = Exception;
369  return cached_exception_;
370  }
std::atomic< int > timesExcept_
Definition: Worker.h:604
std::exception_ptr cached_exception_
Definition: Worker.h:612
std::atomic< State > state_
Definition: Worker.h:605
template<bool IS_EVENT>
bool edm::Worker::setFailed ( )
inlineprivate

Definition at line 354 of file Worker.h.

References Fail.

354  {
355  if (IS_EVENT) {
356  timesFailed_.fetch_add(1, std::memory_order_relaxed);
357  }
358  state_ = Fail;
359  return false;
360  }
std::atomic< int > timesFailed_
Definition: Worker.h:603
std::atomic< State > state_
Definition: Worker.h:605
template<bool IS_EVENT>
bool edm::Worker::setPassed ( )
inlineprivate

Definition at line 345 of file Worker.h.

References Pass.

345  {
346  if (IS_EVENT) {
347  timesPassed_.fetch_add(1, std::memory_order_relaxed);
348  }
349  state_ = Pass;
350  return true;
351  }
std::atomic< State > state_
Definition: Worker.h:605
std::atomic< int > timesPassed_
Definition: Worker.h:602
bool edm::Worker::shouldRethrowException ( std::exception_ptr  iPtr,
ParentContext const &  parentContext,
bool  isEvent,
TransitionIDValueBase const &  iID 
) const
private

Definition at line 156 of file Worker.cc.

References writedatasetfile::action, actions_, cms::Exception::category(), description(), edm::exception_actions::FailPath, edm::ExceptionToActionTable::find(), edm::ModuleCallingContext::getTopModuleCallingContext(), edm::exception_actions::IgnoreCompletely, edm::PathContext::isEndPath(), edm::ModuleCallingContext::kInvalid, edm::ParentContext::kPlaceInPath, edm::PlaceInPathContext::pathContext(), edm::ModuleCallingContext::placeInPathContext(), edm::printCmsExceptionWarning(), edm::exception_actions::Rethrow, edm::exception_actions::SkipEvent, edm::ModuleCallingContext::type(), and edm::convertException::wrap().

Referenced by runAcquire(), and runAcquireAfterAsyncPrefetch().

159  {
160  // NOTE: the warning printed as a result of ignoring or failing
161  // a module will only be printed during the full true processing
162  // pass of this module
163 
164  // Get the action corresponding to this exception. However, if processing
165  // something other than an event (e.g. run, lumi) always rethrow.
166  if (not isEvent) {
167  return true;
168  }
169  try {
170  convertException::wrap([&]() { std::rethrow_exception(iPtr); });
171  } catch (cms::Exception& ex) {
173 
174  if (action == exception_actions::Rethrow) {
175  return true;
176  }
177 
178  ModuleCallingContext tempContext(&description(), ModuleCallingContext::State::kInvalid, parentContext, nullptr);
179 
180  // If we are processing an endpath and the module was scheduled, treat SkipEvent or FailPath
181  // as IgnoreCompletely, so any subsequent OutputModules are still run.
182  // For unscheduled modules only treat FailPath as IgnoreCompletely but still allow SkipEvent to throw
183  ModuleCallingContext const* top_mcc = tempContext.getTopModuleCallingContext();
184  if (top_mcc->type() == ParentContext::Type::kPlaceInPath &&
185  top_mcc->placeInPathContext()->pathContext()->isEndPath()) {
186  if ((action == exception_actions::SkipEvent && tempContext.type() == ParentContext::Type::kPlaceInPath) ||
187  action == exception_actions::FailPath) {
189  }
190  }
191  if (action == exception_actions::IgnoreCompletely) {
192  edm::printCmsExceptionWarning("IgnoreCompletely", ex);
193  return false;
194  }
195  }
196  return true;
197  }
ModuleDescription const & description() const
Definition: Worker.h:190
std::string const & category() const
Definition: Exception.cc:143
exception_actions::ActionCodes find(const std::string &category) const
ExceptionToActionTable const * actions_
Definition: Worker.h:611
auto wrap(F iFunc) -> decltype(iFunc())
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
void edm::Worker::skipOnPath ( )

Definition at line 362 of file Worker.cc.

References cached_exception_, edm::WaitingTaskList::doneWaiting(), numberOfPathsLeftToRun_, and waitingTasks_.

Referenced by edm::WorkerInPath::skipWorker().

362  {
363  if (0 == --numberOfPathsLeftToRun_) {
365  }
366  }
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:607
std::exception_ptr cached_exception_
Definition: Worker.h:612
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:618
State edm::Worker::state ( ) const
inline

Definition at line 232 of file Worker.h.

Referenced by edm::WorkerInPath::checkResultsOfRunWorker().

232 { return state_; }
std::atomic< State > state_
Definition: Worker.h:605
int edm::Worker::timesExcept ( ) const
inline

Definition at line 231 of file Worker.h.

Referenced by edm::fillWorkerSummaryAux().

231 { return timesExcept_.load(std::memory_order_acquire); }
std::atomic< int > timesExcept_
Definition: Worker.h:604
int edm::Worker::timesFailed ( ) const
inline

Definition at line 230 of file Worker.h.

Referenced by edm::fillWorkerSummaryAux().

230 { return timesFailed_.load(std::memory_order_acquire); }
std::atomic< int > timesFailed_
Definition: Worker.h:603
int edm::Worker::timesPass ( ) const
inline

Definition at line 234 of file Worker.h.

234 { return timesPassed(); } // for backward compatibility only - to be removed soon
int timesPassed() const
Definition: Worker.h:229
int edm::Worker::timesPassed ( ) const
inline

Definition at line 229 of file Worker.h.

Referenced by edm::fillWorkerSummaryAux().

229 { return timesPassed_.load(std::memory_order_acquire); }
std::atomic< int > timesPassed_
Definition: Worker.h:602
int edm::Worker::timesRun ( ) const
inline

Definition at line 227 of file Worker.h.

Referenced by edm::fillWorkerSummaryAux().

227 { return timesRun_.load(std::memory_order_acquire); }
std::atomic< int > timesRun_
Definition: Worker.h:600
int edm::Worker::timesVisited ( ) const
inline

Definition at line 228 of file Worker.h.

Referenced by edm::fillWorkerSummaryAux().

228 { return timesVisited_.load(std::memory_order_acquire); }
std::atomic< int > timesVisited_
Definition: Worker.h:601
virtual void edm::Worker::updateLookup ( BranchType  iBranchType,
ProductResolverIndexHelper const &   
)
pure virtual

Implemented in edm::WorkerT< T >.

Referenced by edm::Schedule::changeModule().

virtual void edm::Worker::updateLookup ( eventsetup::ESRecordsToProxyIndices const &  )
pure virtual

Implemented in edm::WorkerT< T >.

virtual bool edm::Worker::wantsGlobalLuminosityBlocks ( ) const
pure virtual
virtual bool edm::Worker::wantsGlobalRuns ( ) const
pure virtual
virtual bool edm::Worker::wantsStreamLuminosityBlocks ( ) const
pure virtual
virtual bool edm::Worker::wantsStreamRuns ( ) const
pure virtual
virtual std::string edm::Worker::workerType ( ) const
protectedpure virtual

Implemented in edm::WorkerT< T >.

Friends And Related Function Documentation

template<typename O >
friend class workerhelper::CallImpl
friend

Definition at line 240 of file Worker.h.

Member Data Documentation

ExceptionToActionTable const* edm::Worker::actions_
private

Definition at line 611 of file Worker.h.

Referenced by shouldRethrowException().

std::shared_ptr<ActivityRegistry> edm::Worker::actReg_
private

Definition at line 614 of file Worker.h.

Referenced by beginJob(), beginStream(), endJob(), endStream(), prefetchAsync(), and setActivityRegistry().

std::exception_ptr edm::Worker::cached_exception_
private

Definition at line 612 of file Worker.h.

Referenced by skipOnPath().

edm::propagate_const<EarlyDeleteHelper*> edm::Worker::earlyDeleteHelper_
private

Definition at line 616 of file Worker.h.

Referenced by postDoEvent(), and setEarlyDeleteHelper().

ModuleCallingContext edm::Worker::moduleCallingContext_
private
std::atomic<int> edm::Worker::numberOfPathsLeftToRun_
private

Definition at line 607 of file Worker.h.

Referenced by skipOnPath().

int edm::Worker::numberOfPathsOn_
private

Definition at line 606 of file Worker.h.

bool edm::Worker::ranAcquireWithoutException_
private

Definition at line 620 of file Worker.h.

Referenced by handleExternalWorkException(), and runAcquireAfterAsyncPrefetch().

std::atomic<State> edm::Worker::state_
private

Definition at line 605 of file Worker.h.

Referenced by beginJob(), beginStream(), endJob(), and endStream().

std::atomic<int> edm::Worker::timesExcept_
private

Definition at line 604 of file Worker.h.

std::atomic<int> edm::Worker::timesFailed_
private

Definition at line 603 of file Worker.h.

std::atomic<int> edm::Worker::timesPassed_
private

Definition at line 602 of file Worker.h.

std::atomic<int> edm::Worker::timesRun_
private

Definition at line 600 of file Worker.h.

Referenced by prePrefetchSelectionAsync(), and runAcquire().

std::atomic<int> edm::Worker::timesVisited_
private

Definition at line 601 of file Worker.h.

edm::WaitingTaskList edm::Worker::waitingTasks_
private

Definition at line 618 of file Worker.h.

Referenced by prePrefetchSelectionAsync(), and skipOnPath().

std::atomic<bool> edm::Worker::workStarted_
private

Definition at line 619 of file Worker.h.