CMS 3D CMS Logo

List of all members | Classes | Public Types | Public Member Functions | Protected Member Functions | Private Member Functions | Static Private Member Functions | Private Attributes | Friends
edm::Worker Class Referenceabstract

#include <Worker.h>

Inheritance diagram for edm::Worker:
edm::WorkerT< T >

Classes

class  AcquireTask
 
class  AcquireTask< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin >, DUMMY >
 
class  HandleExternalWorkExceptionTask
 
class  RunModuleTask
 
struct  TaskQueueAdaptor
 
class  TransitionIDValue
 
class  TransitionIDValueBase
 

Public Types

enum  State { Ready, Pass, Fail, Exception }
 
enum  Types { kAnalyzer, kFilter, kProducer, kOutputModule }
 

Public Member Functions

void addedToPath ()
 
void beginJob ()
 
void beginStream (StreamID id, StreamContext &streamContext)
 
void callWhenDoneAsync (WaitingTask *task)
 
void clearCounters ()
 
virtual std::vector< ConsumesInfoconsumesInfo () const =0
 
virtual void convertCurrentProcessAlias (std::string const &processName)=0
 
ModuleDescription const * descPtr () const
 
ModuleDescription const & description () const
 
template<typename T >
bool doWork (typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
 
template<typename T >
void doWorkAsync (WaitingTask *task, typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
 
template<typename T >
void doWorkNoPrefetchingAsync (WaitingTask *task, typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
 
void endJob ()
 
void endStream (StreamID id, StreamContext &streamContext)
 
virtual void modulesWhoseProductsAreConsumed (std::vector< ModuleDescription const * > &modules, ProductRegistry const &preg, std::map< std::string, ModuleDescription const * > const &labelsToDesc) const =0
 
virtual Types moduleType () const =0
 
Workeroperator= (Worker const &)=delete
 
void postDoEvent (EventPrincipal const &)
 
void prePrefetchSelectionAsync (WaitingTask *task, StreamID stream, EventPrincipal const *)
 
void prePrefetchSelectionAsync (WaitingTask *task, StreamID stream, void const *)
 
void registerThinnedAssociations (ProductRegistry const &registry, ThinnedAssociationsHelper &helper)
 
void reset ()
 
virtual void resolvePutIndicies (BranchType iBranchType, std::unordered_multimap< std::string, std::tuple< TypeID const *, const char *, edm::ProductResolverIndex >> const &iIndicies)=0
 
void respondToCloseInputFile (FileBlock const &fb)
 
void respondToOpenInputFile (FileBlock const &fb)
 
template<typename T >
std::exception_ptr runModuleDirectly (typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
 
void setActivityRegistry (std::shared_ptr< ActivityRegistry > areg)
 
void setEarlyDeleteHelper (EarlyDeleteHelper *iHelper)
 
void skipOnPath ()
 
State state () const
 
int timesExcept () const
 
int timesFailed () const
 
int timesPass () const
 
int timesPassed () const
 
int timesRun () const
 
int timesVisited () const
 
virtual void updateLookup (BranchType iBranchType, ProductResolverIndexHelper const &)=0
 
virtual bool wantsGlobalLuminosityBlocks () const =0
 
virtual bool wantsGlobalRuns () const =0
 
virtual bool wantsStreamLuminosityBlocks () const =0
 
virtual bool wantsStreamRuns () const =0
 
 Worker (ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
 
 Worker (Worker const &)=delete
 
virtual ~Worker ()
 

Protected Member Functions

ActivityRegistryactivityRegistry ()
 
virtual void implBeginJob ()=0
 
virtual void implBeginStream (StreamID)=0
 
virtual bool implDo (EventPrincipal const &, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual void implDoAcquire (EventPrincipal const &, EventSetup const &c, ModuleCallingContext const *mcc, WaitingTaskWithArenaHolder &holder)=0
 
virtual bool implDoBegin (RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoBegin (LuminosityBlockPrincipal const &lbp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoEnd (RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoEnd (LuminosityBlockPrincipal const &lbp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoPrePrefetchSelection (StreamID id, EventPrincipal const &ep, ModuleCallingContext const *mcc)=0
 
virtual bool implDoStreamBegin (StreamID id, RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoStreamBegin (StreamID id, LuminosityBlockPrincipal const &lbp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoStreamEnd (StreamID id, RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoStreamEnd (StreamID id, LuminosityBlockPrincipal const &lbp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual void implEndJob ()=0
 
virtual void implEndStream (StreamID)=0
 
virtual bool implNeedToRunSelection () const =0
 
virtual void itemsToGetForSelection (std::vector< ProductResolverIndexAndSkipBit > &) const =0
 
void resetModuleDescription (ModuleDescription const *)
 
virtual std::string workerType () const =0
 

Private Member Functions

void emitPostModuleEventPrefetchingSignal ()
 
std::exception_ptr handleExternalWorkException (std::exception_ptr const *iEPtr, ParentContext const &parentContext)
 
virtual bool hasAcquire () const =0
 
virtual void implRegisterThinnedAssociations (ProductRegistry const &, ThinnedAssociationsHelper &)=0
 
virtual void implRespondToCloseInputFile (FileBlock const &fb)=0
 
virtual void implRespondToOpenInputFile (FileBlock const &fb)=0
 
virtual void itemsMayGet (BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
 
virtual std::vector< ProductResolverIndex > const & itemsShouldPutInEvent () const =0
 
virtual void itemsToGet (BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
 
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom (BranchType) const =0
 
virtual void preActionBeforeRunEventAsync (WaitingTask *iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const =0
 
void prefetchAsync (WaitingTask *, ParentContext const &parentContext, Principal const &)
 
void runAcquire (EventPrincipal const &ep, EventSetup const &es, ParentContext const &parentContext, WaitingTaskWithArenaHolder &holder)
 
void runAcquireAfterAsyncPrefetch (std::exception_ptr const *iEPtr, EventPrincipal const &ep, EventSetup const &es, ParentContext const &parentContext, WaitingTaskWithArenaHolder holder)
 
template<typename T >
bool runModule (typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
 
template<typename T >
std::exception_ptr runModuleAfterAsyncPrefetch (std::exception_ptr const *iEPtr, typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
 
virtual TaskQueueAdaptor serializeRunModule ()=0
 
template<bool IS_EVENT>
std::exception_ptr setException (std::exception_ptr iException)
 
template<bool IS_EVENT>
bool setFailed ()
 
template<bool IS_EVENT>
bool setPassed ()
 
bool shouldRethrowException (std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
 

Static Private Member Functions

static void exceptionContext (cms::Exception &ex, ModuleCallingContext const *mcc)
 

Private Attributes

ExceptionToActionTable const * actions_
 
std::shared_ptr< ActivityRegistryactReg_
 
std::exception_ptr cached_exception_
 
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
 
ModuleCallingContext moduleCallingContext_
 
std::atomic< int > numberOfPathsLeftToRun_
 
int numberOfPathsOn_
 
bool ranAcquireWithoutException_
 
std::atomic< Statestate_
 
std::atomic< int > timesExcept_
 
std::atomic< int > timesFailed_
 
std::atomic< int > timesPassed_
 
std::atomic< int > timesRun_
 
std::atomic< int > timesVisited_
 
edm::WaitingTaskList waitingTasks_
 
std::atomic< bool > workStarted_
 

Friends

template<typename O >
class workerhelper::CallImpl
 

Detailed Description

Definition at line 79 of file Worker.h.

Member Enumeration Documentation

Enumerator
Ready 
Pass 
Fail 
Exception 

Definition at line 81 of file Worker.h.

Enumerator
kAnalyzer 
kFilter 
kProducer 
kOutputModule 

Definition at line 82 of file Worker.h.

Constructor & Destructor Documentation

edm::Worker::Worker ( ModuleDescription const &  iMD,
ExceptionToActionTable const *  iActions 
)

Definition at line 73 of file Worker.cc.

74  :
75  timesRun_(0),
76  timesVisited_(0),
77  timesPassed_(0),
78  timesFailed_(0),
79  timesExcept_(0),
80  state_(Ready),
84  actions_(iActions),
86  actReg_(),
87  earlyDeleteHelper_(nullptr),
88  workStarted_(false),
90  {
91  }
std::atomic< int > timesVisited_
Definition: Worker.h:572
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:578
std::atomic< int > timesExcept_
Definition: Worker.h:575
std::atomic< bool > workStarted_
Definition: Worker.h:590
std::atomic< int > timesFailed_
Definition: Worker.h:574
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:585
ExceptionToActionTable const * actions_
Definition: Worker.h:582
bool ranAcquireWithoutException_
Definition: Worker.h:591
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:580
std::exception_ptr cached_exception_
Definition: Worker.h:583
std::atomic< State > state_
Definition: Worker.h:576
int numberOfPathsOn_
Definition: Worker.h:577
std::atomic< int > timesPassed_
Definition: Worker.h:573
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:587
std::atomic< int > timesRun_
Definition: Worker.h:571
edm::Worker::~Worker ( )
virtual

Definition at line 93 of file Worker.cc.

93  {
94  }
edm::Worker::Worker ( Worker const &  )
delete

Member Function Documentation

ActivityRegistry* edm::Worker::activityRegistry ( )
inlineprotected

Definition at line 266 of file Worker.h.

References edm::exceptionContext(), and benchmark_cfg::fb.

Referenced by edm::WorkerT< T >::implDo(), and edm::WorkerT< T >::implDoAcquire().

266 { return actReg_.get(); }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:585
void edm::Worker::addedToPath ( )
inline

Definition at line 214 of file Worker.h.

Referenced by edm::WorkerInPath::WorkerInPath().

214  {
216  }
int numberOfPathsOn_
Definition: Worker.h:577
void edm::Worker::beginJob ( void  )

Definition at line 291 of file Worker.cc.

References actReg_, cms::Exception::addContext(), description(), Exception, implBeginJob(), edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), state_, and edm::convertException::wrap().

Referenced by edm::WorkerManager::beginJob(), and edm::GlobalSchedule::replaceModule().

291  {
292  try {
293  convertException::wrap([&]() {
294  ModuleBeginJobSignalSentry cpp(actReg_.get(), description());
295  implBeginJob();
296  });
297  }
298  catch(cms::Exception& ex) {
299  state_ = Exception;
300  std::ostringstream ost;
301  ost << "Calling beginJob for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
302  ex.addContext(ost.str());
303  throw;
304  }
305  }
ModuleDescription const & description() const
Definition: Worker.h:182
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:585
std::string const & moduleName() const
std::string const & moduleLabel() const
std::atomic< State > state_
Definition: Worker.h:576
virtual void implBeginJob()=0
void addContext(std::string const &context)
Definition: Exception.cc:227
auto wrap(F iFunc) -> decltype(iFunc())
void edm::Worker::beginStream ( StreamID  id,
StreamContext streamContext 
)

Definition at line 323 of file Worker.cc.

References actReg_, cms::Exception::addContext(), description(), Exception, implBeginStream(), edm::LuminosityBlockIndex::invalidLuminosityBlockIndex(), edm::RunIndex::invalidRunIndex(), edm::StreamContext::kBeginStream, edm::ModuleCallingContext::kRunning, moduleCallingContext_, edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), edm::StreamContext::setEventID(), edm::StreamContext::setLuminosityBlockIndex(), edm::StreamContext::setRunIndex(), edm::ModuleCallingContext::setState(), edm::StreamContext::setTimestamp(), edm::StreamContext::setTransition(), state_, and edm::convertException::wrap().

Referenced by edm::StreamSchedule::replaceModule().

323  {
324  try {
325  convertException::wrap([&]() {
326  streamContext.setTransition(StreamContext::Transition::kBeginStream);
327  streamContext.setEventID(EventID(0, 0, 0));
328  streamContext.setRunIndex(RunIndex::invalidRunIndex());
329  streamContext.setLuminosityBlockIndex(LuminosityBlockIndex::invalidLuminosityBlockIndex());
330  streamContext.setTimestamp(Timestamp());
331  ParentContext parentContext(&streamContext);
332  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
334  ModuleBeginStreamSignalSentry beginSentry(actReg_.get(), streamContext, moduleCallingContext_);
335  implBeginStream(id);
336  });
337  }
338  catch(cms::Exception& ex) {
339  state_ = Exception;
340  std::ostringstream ost;
341  ost << "Calling beginStream for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
342  ex.addContext(ost.str());
343  throw;
344  }
345  }
ModuleDescription const & description() const
Definition: Worker.h:182
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:585
std::string const & moduleName() const
std::string const & moduleLabel() const
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:580
static RunIndex invalidRunIndex()
Definition: RunIndex.cc:9
std::atomic< State > state_
Definition: Worker.h:576
static LuminosityBlockIndex invalidLuminosityBlockIndex()
void addContext(std::string const &context)
Definition: Exception.cc:227
virtual void implBeginStream(StreamID)=0
auto wrap(F iFunc) -> decltype(iFunc())
void edm::Worker::callWhenDoneAsync ( WaitingTask task)
inline

Definition at line 159 of file Worker.h.

References bk::beginJob().

159  {
160  waitingTasks_.add(task);
161  }
void add(WaitingTask *)
Adds task to the waiting list.
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:589
void edm::Worker::clearCounters ( )
inline

Definition at line 206 of file Worker.h.

Referenced by edm::StreamSchedule::clearCounters().

206  {
207  timesRun_.store(0,std::memory_order_release);
208  timesVisited_.store(0,std::memory_order_release);
209  timesPassed_.store(0,std::memory_order_release);
210  timesFailed_.store(0,std::memory_order_release);
211  timesExcept_.store(0,std::memory_order_release);
212  }
std::atomic< int > timesVisited_
Definition: Worker.h:572
std::atomic< int > timesExcept_
Definition: Worker.h:575
std::atomic< int > timesFailed_
Definition: Worker.h:574
std::atomic< int > timesPassed_
Definition: Worker.h:573
std::atomic< int > timesRun_
Definition: Worker.h:571
virtual std::vector<ConsumesInfo> edm::Worker::consumesInfo ( ) const
pure virtual
virtual void edm::Worker::convertCurrentProcessAlias ( std::string const &  processName)
pure virtual

Implemented in edm::WorkerT< T >.

ModuleDescription const* edm::Worker::descPtr ( ) const
inline

Definition at line 183 of file Worker.h.

References modifiedElectrons_cfi::processName, and AlCaHLTBitMon_QueryRunRegistry::string.

ModuleCallingContext moduleCallingContext_
Definition: Worker.h:580
ModuleDescription const * moduleDescription() const
ModuleDescription const& edm::Worker::description ( ) const
inline
template<typename T >
bool edm::Worker::doWork ( typename T::MyPrincipal const &  ep,
EventSetup const &  c,
StreamID  stream,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

Definition at line 939 of file Worker.h.

References Fail, edm::ServiceRegistry::instance(), edm::ModuleCallingContext::kInvalid, edm::ModuleCallingContext::kPrefetching, edm::make_empty_waiting_task(), Pass, edm::ServiceRegistry::presentToken(), createBeamHaloJobs::queue, Ready, and edm::ModuleCallingContext::setContext().

943  {
944 
945  if (T::isEvent_) {
946  timesVisited_.fetch_add(1,std::memory_order_relaxed);
947  }
948  bool rc = false;
949 
950  switch(state_) {
951  case Ready: break;
952  case Pass: return true;
953  case Fail: return false;
954  case Exception: {
955  std::rethrow_exception(cached_exception_);
956  }
957  }
958 
959  bool expected = false;
960  if(not workStarted_.compare_exchange_strong(expected, true) ) {
961  //another thread beat us here
962  auto waitTask = edm::make_empty_waiting_task();
963  waitTask->increment_ref_count();
964 
965  waitingTasks_.add(waitTask.get());
966 
967  waitTask->wait_for_all();
968 
969  switch(state_) {
970  case Ready: assert(false);
971  case Pass: return true;
972  case Fail: return false;
973  case Exception: {
974  std::rethrow_exception(cached_exception_);
975  }
976  }
977  }
978 
979  //Need the context to be set until after any exception is resolved
981 
982  auto resetContext = [](ModuleCallingContext* iContext) {iContext->setContext(ModuleCallingContext::State::kInvalid,ParentContext(),nullptr); };
983  std::unique_ptr<ModuleCallingContext, decltype(resetContext)> prefetchSentry(&moduleCallingContext_,resetContext);
984 
985  if (T::isEvent_) {
986  //if have TriggerResults based selection we want to reject the event before doing prefetching
987  if ( workerhelper::CallImpl<T>::needToRunSelection(this)) {
988  auto waitTask = edm::make_empty_waiting_task();
989  waitTask->set_ref_count(2);
990  prePrefetchSelectionAsync(waitTask.get(), streamID, &ep);
991  waitTask->decrement_ref_count();
992  waitTask->wait_for_all();
993 
994  if(state() != Ready) {
995  //The selection must have rejected this event
996  return true;
997  }
998  }
999  auto waitTask = edm::make_empty_waiting_task();
1000  {
1001  //Make sure signal is sent once the prefetching is done
1002  // [the 'pre' signal was sent in prefetchAsync]
1003  //The purpose of this block is to send the signal after wait_for_all
1004  auto sentryFunc = [this](void*) {
1006  };
1007  std::unique_ptr<ActivityRegistry, decltype(sentryFunc)> signalSentry(actReg_.get(),sentryFunc);
1008 
1009  //set count to 2 since wait_for_all requires value to not go to 0
1010  waitTask->set_ref_count(2);
1011 
1012  prefetchAsync(waitTask.get(),parentContext, ep);
1013  waitTask->decrement_ref_count();
1014  waitTask->wait_for_all();
1015  }
1016  if(waitTask->exceptionPtr() != nullptr) {
1017  TransitionIDValue<typename T::MyPrincipal> idValue(ep);
1018  if(shouldRethrowException(*waitTask->exceptionPtr(), parentContext, T::isEvent_, idValue)) {
1019  setException<T::isEvent_>(*waitTask->exceptionPtr());
1021  std::rethrow_exception(cached_exception_);
1022  } else {
1023  setPassed<T::isEvent_>();
1024  waitingTasks_.doneWaiting(nullptr);
1025  return true;
1026  }
1027  }
1028  }
1029 
1030  //successful prefetch so no reset necessary
1031  prefetchSentry.release();
1032  if(auto queue = serializeRunModule()) {
1033  auto serviceToken = ServiceRegistry::instance().presentToken();
1034  queue.pushAndWait([&]() {
1035  //Need to make the services available
1036  ServiceRegistry::Operate guard(serviceToken);
1037  try {
1038  rc = runModule<T>(ep,es,streamID,parentContext,context);
1039  } catch(...) {
1040  }
1041  });
1042  } else {
1043  try {
1044  rc = runModule<T>(ep,es,streamID,parentContext,context);
1045  } catch(...) {
1046  }
1047  }
1048  if(state_ == Exception) {
1050  std::rethrow_exception(cached_exception_);
1051  }
1052 
1053  waitingTasks_.doneWaiting(nullptr);
1054  return rc;
1055  }
void prePrefetchSelectionAsync(WaitingTask *task, StreamID stream, EventPrincipal const *)
Definition: Worker.cc:237
std::atomic< int > timesVisited_
Definition: Worker.h:572
std::atomic< bool > workStarted_
Definition: Worker.h:590
void add(WaitingTask *)
Adds task to the waiting list.
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:585
State state() const
Definition: Worker.h:223
ServiceToken presentToken() const
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:580
std::exception_ptr cached_exception_
Definition: Worker.h:583
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
Definition: Worker.cc:159
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:589
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
static ServiceRegistry & instance()
std::atomic< State > state_
Definition: Worker.h:576
virtual TaskQueueAdaptor serializeRunModule()=0
void prefetchAsync(WaitingTask *, ParentContext const &parentContext, Principal const &)
Definition: Worker.cc:207
void emitPostModuleEventPrefetchingSignal()
Definition: Worker.h:358
template<typename T >
void edm::Worker::doWorkAsync ( WaitingTask task,
typename T::MyPrincipal const &  ep,
EventSetup const &  c,
StreamID  stream,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

Definition at line 796 of file Worker.h.

References pyrootRender::destroy(), edm::ServiceRegistry::instance(), edm::ModuleCallingContext::kPrefetching, edm::make_waiting_task(), eostools::move(), AlCaHLTBitMon_ParallelJobs::p, edm::ServiceRegistry::presentToken(), fetchall_from_DQM_v2::release, edm::ModuleCallingContext::setContext(), and lumiQTWidget::t.

Referenced by edm::WorkerInPath::runWorkerAsync().

801  {
802  if (not workerhelper::CallImpl<T>::wantsTransition(this)) {
803  return;
804  }
805 
806  waitingTasks_.add(task);
807  if(T::isEvent_) {
808  timesVisited_.fetch_add(1,std::memory_order_relaxed);
809  }
810 
811  bool expected = false;
812  if(workStarted_.compare_exchange_strong(expected,true)) {
814 
815  //if have TriggerResults based selection we want to reject the event before doing prefetching
816  if(workerhelper::CallImpl<T>::needToRunSelection(this)) {
817  //We need to run the selection in a different task so that
818  // we can prefetch the data needed for the selection
819  auto runTask = new (tbb::task::allocate_root()) RunModuleTask<T>(
820  this, ep,es,streamID,parentContext,context);
821 
822  //make sure the task is either run or destroyed
823  struct DestroyTask {
824  DestroyTask(edm::WaitingTask* iTask):
825  m_task(iTask) {}
826 
827  ~DestroyTask() {
828  auto p = m_task.load();
829  if(p) {
831  }
832  }
833 
835  auto t = m_task.load();
836  m_task.store(nullptr);
837  return t;
838  }
839 
840  std::atomic<edm::WaitingTask*> m_task;
841  };
842 
843  auto ownRunTask = std::make_shared<DestroyTask>(runTask);
844  auto token = ServiceRegistry::instance().presentToken();
845  auto selectionTask = make_waiting_task(tbb::task::allocate_root(), [ownRunTask,parentContext,&ep,token, this] (std::exception_ptr const* ) mutable {
846 
847  ServiceRegistry::Operate guard(token);
848  prefetchAsync(ownRunTask->release(), parentContext, ep);
849  });
850  prePrefetchSelectionAsync(selectionTask,streamID, &ep);
851  } else {
852  WaitingTask* moduleTask = new (tbb::task::allocate_root()) RunModuleTask<T>(
853  this, ep, es, streamID, parentContext, context);
854  if (T::isEvent_ && hasAcquire()) {
855  WaitingTaskWithArenaHolder runTaskHolder(
856  new (tbb::task::allocate_root())
857  HandleExternalWorkExceptionTask(this,
858  moduleTask,
859  parentContext));
860  moduleTask = new (tbb::task::allocate_root()) AcquireTask<T>(
861  this, ep, es, parentContext, std::move(runTaskHolder));
862  }
863  prefetchAsync(moduleTask, parentContext, ep);
864  }
865  }
866  }
void prePrefetchSelectionAsync(WaitingTask *task, StreamID stream, EventPrincipal const *)
Definition: Worker.cc:237
std::atomic< int > timesVisited_
Definition: Worker.h:572
def destroy(e)
Definition: pyrootRender.py:13
std::atomic< bool > workStarted_
Definition: Worker.h:590
void add(WaitingTask *)
Adds task to the waiting list.
ServiceToken presentToken() const
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:580
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:589
static ServiceRegistry & instance()
virtual bool hasAcquire() const =0
void prefetchAsync(WaitingTask *, ParentContext const &parentContext, Principal const &)
Definition: Worker.cc:207
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:92
def move(src, dest)
Definition: eostools.py:510
template<typename T >
void edm::Worker::doWorkNoPrefetchingAsync ( WaitingTask task,
typename T::MyPrincipal const &  principal,
EventSetup const &  c,
StreamID  stream,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

Definition at line 898 of file Worker.h.

References edm::ServiceRegistry::instance(), edm::make_functor_task(), edm::ServiceRegistry::presentToken(), cmsRelvalreport::principal(), and createBeamHaloJobs::queue.

Referenced by edm::WorkerInPath::runWorkerAsync().

903  {
904  if (not workerhelper::CallImpl<T>::wantsTransition(this)) {
905  return;
906  }
907  waitingTasks_.add(task);
908  bool expected = false;
909  if(workStarted_.compare_exchange_strong(expected,true)) {
910  auto serviceToken = ServiceRegistry::instance().presentToken();
911 
912  auto toDo =[this, &principal, &es, streamID,parentContext,context, serviceToken]()
913  {
914  std::exception_ptr exceptionPtr;
915  try {
916  //Need to make the services available
917  ServiceRegistry::Operate guard(serviceToken);
918 
919  this->runModule<T>(principal,
920  es,
921  streamID,
922  parentContext,
923  context);
924  } catch( ... ) {
925  exceptionPtr = std::current_exception();
926  }
927  this->waitingTasks_.doneWaiting(exceptionPtr);
928  };
929  if(auto queue = this->serializeRunModule()) {
930  queue.push( toDo);
931  } else {
932  auto task = make_functor_task( tbb::task::allocate_root(), toDo);
933  tbb::task::spawn(*task);
934  }
935  }
936  }
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
std::atomic< bool > workStarted_
Definition: Worker.h:590
void add(WaitingTask *)
Adds task to the waiting list.
ServiceToken presentToken() const
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
def principal(options)
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:589
static ServiceRegistry & instance()
virtual TaskQueueAdaptor serializeRunModule()=0
void edm::Worker::emitPostModuleEventPrefetchingSignal ( )
inlineprivate

Definition at line 358 of file Worker.h.

358  {
359  actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),moduleCallingContext_);
360  }
StreamContext const * getStreamContext() const
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:585
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:580
void edm::Worker::endJob ( void  )

Definition at line 307 of file Worker.cc.

References actReg_, cms::Exception::addContext(), description(), Exception, implEndJob(), edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), state_, and edm::convertException::wrap().

307  {
308  try {
309  convertException::wrap([&]() {
310  ModuleEndJobSignalSentry cpp(actReg_.get(), description());
311  implEndJob();
312  });
313  }
314  catch(cms::Exception& ex) {
315  state_ = Exception;
316  std::ostringstream ost;
317  ost << "Calling endJob for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
318  ex.addContext(ost.str());
319  throw;
320  }
321  }
ModuleDescription const & description() const
Definition: Worker.h:182
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:585
std::string const & moduleName() const
std::string const & moduleLabel() const
std::atomic< State > state_
Definition: Worker.h:576
virtual void implEndJob()=0
void addContext(std::string const &context)
Definition: Exception.cc:227
auto wrap(F iFunc) -> decltype(iFunc())
void edm::Worker::endStream ( StreamID  id,
StreamContext streamContext 
)

Definition at line 347 of file Worker.cc.

References actReg_, cms::Exception::addContext(), description(), Exception, implEndStream(), edm::LuminosityBlockIndex::invalidLuminosityBlockIndex(), edm::RunIndex::invalidRunIndex(), edm::StreamContext::kEndStream, edm::ModuleCallingContext::kRunning, moduleCallingContext_, edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), edm::StreamContext::setEventID(), edm::StreamContext::setLuminosityBlockIndex(), edm::StreamContext::setRunIndex(), edm::ModuleCallingContext::setState(), edm::StreamContext::setTimestamp(), edm::StreamContext::setTransition(), state_, and edm::convertException::wrap().

347  {
348  try {
349  convertException::wrap([&]() {
350  streamContext.setTransition(StreamContext::Transition::kEndStream);
351  streamContext.setEventID(EventID(0, 0, 0));
352  streamContext.setRunIndex(RunIndex::invalidRunIndex());
353  streamContext.setLuminosityBlockIndex(LuminosityBlockIndex::invalidLuminosityBlockIndex());
354  streamContext.setTimestamp(Timestamp());
355  ParentContext parentContext(&streamContext);
356  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
358  ModuleEndStreamSignalSentry endSentry(actReg_.get(), streamContext, moduleCallingContext_);
359  implEndStream(id);
360  });
361  }
362  catch(cms::Exception& ex) {
363  state_ = Exception;
364  std::ostringstream ost;
365  ost << "Calling endStream for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
366  ex.addContext(ost.str());
367  throw;
368  }
369  }
ModuleDescription const & description() const
Definition: Worker.h:182
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:585
std::string const & moduleName() const
std::string const & moduleLabel() const
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:580
static RunIndex invalidRunIndex()
Definition: RunIndex.cc:9
std::atomic< State > state_
Definition: Worker.h:576
static LuminosityBlockIndex invalidLuminosityBlockIndex()
void addContext(std::string const &context)
Definition: Exception.cc:227
virtual void implEndStream(StreamID)=0
auto wrap(F iFunc) -> decltype(iFunc())
void edm::Worker::exceptionContext ( cms::Exception ex,
ModuleCallingContext const *  mcc 
)
staticprivate

Definition at line 101 of file Worker.cc.

References cms::Exception::addContext(), edm::exceptionContext(), edm::ModuleCallingContext::globalContext(), edm::ModuleCallingContext::internalContext(), edm::ParentContext::kGlobal, edm::ParentContext::kInternal, edm::ParentContext::kModule, edm::ParentContext::kPlaceInPath, edm::ModuleCallingContext::kPrefetching, edm::ParentContext::kStream, edm::InternalContext::moduleCallingContext(), edm::ModuleCallingContext::moduleCallingContext(), edm::ModuleCallingContext::moduleDescription(), edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), or, edm::PlaceInPathContext::pathContext(), edm::PathContext::pathName(), edm::ModuleCallingContext::placeInPathContext(), edm::ModuleCallingContext::state(), edm::PathContext::streamContext(), edm::ModuleCallingContext::streamContext(), and edm::ModuleCallingContext::type().

Referenced by handleExternalWorkException(), and runAcquire().

103  {
104 
105  ModuleCallingContext const* imcc = mcc;
106  while( (imcc->type() == ParentContext::Type::kModule) or
107  (imcc->type() == ParentContext::Type::kInternal) ) {
108  std::ostringstream iost;
109  if( imcc->state() == ModuleCallingContext::State::kPrefetching ) {
110  iost << "Prefetching for module ";
111  } else {
112  iost << "Calling method for module ";
113  }
114  iost << imcc->moduleDescription()->moduleName() << "/'"
115  << imcc->moduleDescription()->moduleLabel() << "'";
116 
117  if(imcc->type() == ParentContext::Type::kInternal) {
118  iost << " (probably inside some kind of mixing module)";
119  imcc = imcc->internalContext()->moduleCallingContext();
120  } else {
121  imcc = imcc->moduleCallingContext();
122  }
123  ex.addContext(iost.str());
124  }
125  std::ostringstream ost;
126  if( imcc->state() == ModuleCallingContext::State::kPrefetching ) {
127  ost << "Prefetching for module ";
128  } else {
129  ost << "Calling method for module ";
130  }
131  ost << imcc->moduleDescription()->moduleName() << "/'"
132  << imcc->moduleDescription()->moduleLabel() << "'";
133  ex.addContext(ost.str());
134 
135  if (imcc->type() == ParentContext::Type::kPlaceInPath) {
136  ost.str("");
137  ost << "Running path '";
138  ost << imcc->placeInPathContext()->pathContext()->pathName() << "'";
139  ex.addContext(ost.str());
140  auto streamContext =imcc->placeInPathContext()->pathContext()->streamContext();
141  if(streamContext) {
142  ost.str("");
143  edm::exceptionContext(ost,*streamContext);
144  ex.addContext(ost.str());
145  }
146  } else {
147  if (imcc->type() == ParentContext::Type::kStream) {
148  ost.str("");
149  edm::exceptionContext(ost, *(imcc->streamContext()) );
150  ex.addContext(ost.str());
151  } else if(imcc->type() == ParentContext::Type::kGlobal) {
152  ost.str("");
153  edm::exceptionContext(ost, *(imcc->globalContext()) );
154  ex.addContext(ost.str());
155  }
156  }
157  }
void exceptionContext(std::ostream &, GlobalContext const &)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
void addContext(std::string const &context)
Definition: Exception.cc:227
std::exception_ptr edm::Worker::handleExternalWorkException ( std::exception_ptr const *  iEPtr,
ParentContext const &  parentContext 
)
private

Definition at line 430 of file Worker.cc.

References exceptionContext(), edm::Worker::HandleExternalWorkExceptionTask::HandleExternalWorkExceptionTask(), moduleCallingContext_, ranAcquireWithoutException_, and edm::convertException::wrap().

Referenced by edm::Worker::HandleExternalWorkExceptionTask::execute().

431  {
433  try {
434  convertException::wrap([iEPtr]() {
435  std::rethrow_exception(*iEPtr);
436  });
437  } catch(cms::Exception &ex) {
438  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
440  return std::current_exception();
441  }
442  }
443  return *iEPtr;
444  }
static void exceptionContext(cms::Exception &ex, ModuleCallingContext const *mcc)
Definition: Worker.cc:101
bool ranAcquireWithoutException_
Definition: Worker.h:591
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:580
auto wrap(F iFunc) -> decltype(iFunc())
virtual bool edm::Worker::hasAcquire ( ) const
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::implBeginJob ( )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by beginJob().

virtual void edm::Worker::implBeginStream ( StreamID  )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by beginStream().

virtual bool edm::Worker::implDo ( EventPrincipal const &  ,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual
virtual void edm::Worker::implDoAcquire ( EventPrincipal const &  ,
EventSetup const &  c,
ModuleCallingContext const *  mcc,
WaitingTaskWithArenaHolder holder 
)
protectedpure virtual
virtual bool edm::Worker::implDoBegin ( RunPrincipal const &  rp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual
virtual bool edm::Worker::implDoBegin ( LuminosityBlockPrincipal const &  lbp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

virtual bool edm::Worker::implDoEnd ( RunPrincipal const &  rp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual
virtual bool edm::Worker::implDoEnd ( LuminosityBlockPrincipal const &  lbp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

virtual bool edm::Worker::implDoPrePrefetchSelection ( StreamID  id,
EventPrincipal const &  ep,
ModuleCallingContext const *  mcc 
)
protectedpure virtual
virtual bool edm::Worker::implDoStreamBegin ( StreamID  id,
RunPrincipal const &  rp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual
virtual bool edm::Worker::implDoStreamBegin ( StreamID  id,
LuminosityBlockPrincipal const &  lbp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

virtual bool edm::Worker::implDoStreamEnd ( StreamID  id,
RunPrincipal const &  rp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual
virtual bool edm::Worker::implDoStreamEnd ( StreamID  id,
LuminosityBlockPrincipal const &  lbp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::implEndJob ( )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by endJob().

virtual void edm::Worker::implEndStream ( StreamID  )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by endStream().

virtual bool edm::Worker::implNeedToRunSelection ( ) const
protectedpure virtual
virtual void edm::Worker::implRegisterThinnedAssociations ( ProductRegistry const &  ,
ThinnedAssociationsHelper  
)
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::implRespondToCloseInputFile ( FileBlock const &  fb)
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::implRespondToOpenInputFile ( FileBlock const &  fb)
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::itemsMayGet ( BranchType  ,
std::vector< ProductResolverIndexAndSkipBit > &   
) const
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual std::vector<ProductResolverIndex> const& edm::Worker::itemsShouldPutInEvent ( ) const
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::itemsToGet ( BranchType  ,
std::vector< ProductResolverIndexAndSkipBit > &   
) const
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::itemsToGetForSelection ( std::vector< ProductResolverIndexAndSkipBit > &  ) const
protectedpure virtual
virtual std::vector<ProductResolverIndexAndSkipBit> const& edm::Worker::itemsToGetFrom ( BranchType  ) const
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by prefetchAsync().

virtual void edm::Worker::modulesWhoseProductsAreConsumed ( std::vector< ModuleDescription const * > &  modules,
ProductRegistry const &  preg,
std::map< std::string, ModuleDescription const * > const &  labelsToDesc 
) const
pure virtual

Implemented in edm::WorkerT< T >.

virtual Types edm::Worker::moduleType ( ) const
pure virtual
Worker& edm::Worker::operator= ( Worker const &  )
delete
void edm::Worker::postDoEvent ( EventPrincipal const &  iEvent)

Definition at line 377 of file Worker.cc.

References earlyDeleteHelper_.

Referenced by edm::WorkerT< T >::implDo().

377  {
378  if(earlyDeleteHelper_) {
379  earlyDeleteHelper_->moduleRan(iEvent);
380  }
381  }
int iEvent
Definition: GenABIO.cc:230
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:587
virtual void edm::Worker::preActionBeforeRunEventAsync ( WaitingTask iTask,
ModuleCallingContext const &  moduleCallingContext,
Principal const &  iPrincipal 
) const
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by prefetchAsync().

void edm::Worker::prefetchAsync ( WaitingTask iTask,
ParentContext const &  parentContext,
Principal const &  iPrincipal 
)
private

Definition at line 207 of file Worker.cc.

References actReg_, edm::Principal::branchType(), edm::ModuleCallingContext::getStreamContext(), edm::InEvent, mps_monitormerge::items, itemsToGetFrom(), edm::ModuleCallingContext::kPrefetching, moduleCallingContext_, preActionBeforeRunEventAsync(), edm::Principal::prefetchAsync(), edm::ProductResolverIndexAmbiguous, and edm::ModuleCallingContext::setContext().

207  {
208  // Prefetch products the module declares it consumes (not including the products it maybe consumes)
209  std::vector<ProductResolverIndexAndSkipBit> const& items = itemsToGetFrom(iPrincipal.branchType());
210 
212 
213  if(iPrincipal.branchType()==InEvent) {
214  actReg_->preModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),moduleCallingContext_);
215  }
216 
217  //Need to be sure the ref count isn't set to 0 immediately
218  iTask->increment_ref_count();
219  for(auto const& item : items) {
220  ProductResolverIndex productResolverIndex = item.productResolverIndex();
221  bool skipCurrentProcess = item.skipCurrentProcess();
222  if(productResolverIndex != ProductResolverIndexAmbiguous) {
223  iPrincipal.prefetchAsync(iTask,productResolverIndex, skipCurrentProcess, &moduleCallingContext_);
224  }
225  }
226 
227  if(iPrincipal.branchType()==InEvent) {
229  }
230 
231  if(0 == iTask->decrement_ref_count()) {
232  //if everything finishes before we leave this routine, we need to launch the task
233  tbb::task::spawn(*iTask);
234  }
235  }
unsigned int ProductResolverIndex
StreamContext const * getStreamContext() const
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:585
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:580
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom(BranchType) const =0
virtual void preActionBeforeRunEventAsync(WaitingTask *iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const =0
void edm::Worker::prePrefetchSelectionAsync ( WaitingTask task,
StreamID  stream,
EventPrincipal const *  iPrincipal 
)

Definition at line 237 of file Worker.cc.

References pyrootRender::destroy(), edm::WaitingTaskList::doneWaiting(), implDoPrePrefetchSelection(), edm::ServiceRegistry::instance(), mps_monitormerge::items, itemsToGetForSelection(), edm::make_waiting_task(), moduleCallingContext_, edm::Principal::prefetchAsync(), edm::ServiceRegistry::presentToken(), edm::ProductResolverIndexAmbiguous, timesRun_, and waitingTasks_.

239  {
240  std::vector<ProductResolverIndexAndSkipBit> items;
241  itemsToGetForSelection(items);
242 
243  successTask->increment_ref_count();
244  auto token = ServiceRegistry::instance().presentToken();
245 
246  auto choiceTask = edm::make_waiting_task(tbb::task::allocate_root(),
247  [id,successTask,iPrincipal,this,token](std::exception_ptr const*) {
248  ServiceRegistry::Operate guard(token);
249  try {
250  if( not implDoPrePrefetchSelection(id,*iPrincipal,&moduleCallingContext_) ) {
251  timesRun_.fetch_add(1,std::memory_order_relaxed);
252  setPassed<true>();
253  waitingTasks_.doneWaiting(nullptr);
254  //TBB requires that destroyed tasks have count 0
255  if ( 0 == successTask->decrement_ref_count() ) {
256  tbb::task::destroy(*successTask);
257  }
258  return;
259  }
260  } catch(...) {}
261  if(0 == successTask->decrement_ref_count()) {
262  tbb::task::spawn(*successTask);
263  }
264  });
265 
266  choiceTask->increment_ref_count();
267  for(auto const& item : items) {
268  ProductResolverIndex productResolverIndex = item.productResolverIndex();
269  bool skipCurrentProcess = item.skipCurrentProcess();
270  if(productResolverIndex != ProductResolverIndexAmbiguous) {
271  iPrincipal->prefetchAsync(choiceTask,productResolverIndex, skipCurrentProcess, &moduleCallingContext_);
272  }
273  }
274 
275  if(0 == choiceTask->decrement_ref_count()) {
276  tbb::task::spawn(*choiceTask);
277  }
278  }
unsigned int ProductResolverIndex
def destroy(e)
Definition: pyrootRender.py:13
ServiceToken presentToken() const
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:580
virtual void itemsToGetForSelection(std::vector< ProductResolverIndexAndSkipBit > &) const =0
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:589
static ServiceRegistry & instance()
virtual bool implDoPrePrefetchSelection(StreamID id, EventPrincipal const &ep, ModuleCallingContext const *mcc)=0
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:92
std::atomic< int > timesRun_
Definition: Worker.h:571
void edm::Worker::prePrefetchSelectionAsync ( WaitingTask task,
StreamID  stream,
void const *   
)
inline

Definition at line 133 of file Worker.h.

135  {assert(false);}
void edm::Worker::registerThinnedAssociations ( ProductRegistry const &  registry,
ThinnedAssociationsHelper helper 
)
inline

Definition at line 170 of file Worker.h.

Definition: helper.py:1
virtual void implRegisterThinnedAssociations(ProductRegistry const &, ThinnedAssociationsHelper &)=0
static Interceptor::Registry registry("Interceptor")
void edm::Worker::reset ( void  )
inline

Definition at line 172 of file Worker.h.

References Ready.

Referenced by edm::WorkerManager::resetAll().

172  {
173  cached_exception_ = std::exception_ptr();
174  state_ = Ready;
176  workStarted_ = false;
178  }
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:578
std::atomic< bool > workStarted_
Definition: Worker.h:590
void reset()
Resets access to the resource so that added tasks will wait.
std::exception_ptr cached_exception_
Definition: Worker.h:583
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:589
std::atomic< State > state_
Definition: Worker.h:576
int numberOfPathsOn_
Definition: Worker.h:577
void edm::Worker::resetModuleDescription ( ModuleDescription const *  iDesc)
protected
virtual void edm::Worker::resolvePutIndicies ( BranchType  iBranchType,
std::unordered_multimap< std::string, std::tuple< TypeID const *, const char *, edm::ProductResolverIndex >> const &  iIndicies 
)
pure virtual

Implemented in edm::WorkerT< T >.

Referenced by edm::Schedule::changeModule().

void edm::Worker::respondToCloseInputFile ( FileBlock const &  fb)
inline

Definition at line 168 of file Worker.h.

Referenced by edm::Schedule::respondToCloseInputFile().

virtual void implRespondToCloseInputFile(FileBlock const &fb)=0
void edm::Worker::respondToOpenInputFile ( FileBlock const &  fb)
inline

Definition at line 167 of file Worker.h.

Referenced by edm::Schedule::respondToOpenInputFile().

virtual void implRespondToOpenInputFile(FileBlock const &fb)=0
void edm::Worker::runAcquire ( EventPrincipal const &  ep,
EventSetup const &  es,
ParentContext const &  parentContext,
WaitingTaskWithArenaHolder holder 
)
private

Definition at line 383 of file Worker.cc.

References exceptionContext(), implDoAcquire(), moduleCallingContext_, shouldRethrowException(), timesRun_, and edm::convertException::wrap().

Referenced by runAcquireAfterAsyncPrefetch().

386  {
387 
388  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
389  try {
391  {
392  this->implDoAcquire(ep, es, &moduleCallingContext_, holder);
393  });
394  } catch(cms::Exception& ex) {
396  TransitionIDValue<EventPrincipal> idValue(ep);
397  if(shouldRethrowException(std::current_exception(), parentContext, true, idValue)) {
398  timesRun_.fetch_add(1,std::memory_order_relaxed);
399  throw;
400  }
401  }
402  }
static void exceptionContext(cms::Exception &ex, ModuleCallingContext const *mcc)
Definition: Worker.cc:101
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:580
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
Definition: Worker.cc:159
virtual void implDoAcquire(EventPrincipal const &, EventSetup const &c, ModuleCallingContext const *mcc, WaitingTaskWithArenaHolder &holder)=0
auto wrap(F iFunc) -> decltype(iFunc())
std::atomic< int > timesRun_
Definition: Worker.h:571
void edm::Worker::runAcquireAfterAsyncPrefetch ( std::exception_ptr const *  iEPtr,
EventPrincipal const &  ep,
EventSetup const &  es,
ParentContext const &  parentContext,
WaitingTaskWithArenaHolder  holder 
)
private

Definition at line 404 of file Worker.cc.

References edm::WaitingTaskWithArenaHolder::doneWaiting(), edm::ModuleCallingContext::kInvalid, moduleCallingContext_, ranAcquireWithoutException_, runAcquire(), edm::ModuleCallingContext::setContext(), and shouldRethrowException().

408  {
410  std::exception_ptr exceptionPtr;
411  if(iEPtr) {
412  assert(*iEPtr);
413  TransitionIDValue<EventPrincipal> idValue(ep);
414  if(shouldRethrowException(*iEPtr, parentContext, true, idValue)) {
415  exceptionPtr = *iEPtr;
416  }
418  } else {
419  try {
420  runAcquire(ep, es, parentContext, holder);
422  } catch(...) {
423  exceptionPtr = std::current_exception();
424  }
425  }
426  // It is important this is after runAcquire completely finishes
427  holder.doneWaiting(exceptionPtr);
428  }
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
bool ranAcquireWithoutException_
Definition: Worker.h:591
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:580
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
Definition: Worker.cc:159
void runAcquire(EventPrincipal const &ep, EventSetup const &es, ParentContext const &parentContext, WaitingTaskWithArenaHolder &holder)
Definition: Worker.cc:383
template<typename T >
bool edm::Worker::runModule ( typename T::MyPrincipal const &  ep,
EventSetup const &  c,
StreamID  stream,
ParentContext const &  parentContext,
typename T::Context const *  context 
)
private

Definition at line 1059 of file Worker.h.

References edm::exceptionContext(), edm::EventSetup::get(), and edm::convertException::wrap().

1063  {
1064  //unscheduled producers should advance this
1065  //if (T::isEvent_) {
1066  // ++timesVisited_;
1067  //}
1068  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
1069  if (T::isEvent_) {
1070  timesRun_.fetch_add(1,std::memory_order_relaxed);
1071  }
1072 
1073  bool rc = true;
1074  try {
1076  {
1077  rc = workerhelper::CallImpl<T>::call(this,streamID,ep,es, actReg_.get(), &moduleCallingContext_, context);
1078 
1079  if (rc) {
1080  setPassed<T::isEvent_>();
1081  } else {
1082  setFailed<T::isEvent_>();
1083  }
1084  });
1085  } catch(cms::Exception& ex) {
1087  TransitionIDValue<typename T::MyPrincipal> idValue(ep);
1088  if(shouldRethrowException(std::current_exception(), parentContext, T::isEvent_, idValue)) {
1089  assert(not cached_exception_);
1090  setException<T::isEvent_>(std::current_exception());
1091  std::rethrow_exception(cached_exception_);
1092  } else {
1093  rc = setPassed<T::isEvent_>();
1094  }
1095  }
1096 
1097  return rc;
1098  }
static void exceptionContext(cms::Exception &ex, ModuleCallingContext const *mcc)
Definition: Worker.cc:101
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:585
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:580
std::exception_ptr cached_exception_
Definition: Worker.h:583
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
Definition: Worker.cc:159
auto wrap(F iFunc) -> decltype(iFunc())
std::atomic< int > timesRun_
Definition: Worker.h:571
template<typename T >
std::exception_ptr edm::Worker::runModuleAfterAsyncPrefetch ( std::exception_ptr const *  iEPtr,
typename T::MyPrincipal const &  ep,
EventSetup const &  es,
StreamID  streamID,
ParentContext const &  parentContext,
typename T::Context const *  context 
)
private

Definition at line 869 of file Worker.h.

References edm::ModuleCallingContext::kInvalid, and edm::ModuleCallingContext::setContext().

874  {
875  std::exception_ptr exceptionPtr;
876  if(iEPtr) {
877  assert(*iEPtr);
878  TransitionIDValue<typename T::MyPrincipal> idValue(ep);
879  if(shouldRethrowException(*iEPtr, parentContext, T::isEvent_, idValue)) {
880  exceptionPtr = *iEPtr;
881  setException<T::isEvent_>(exceptionPtr);
882  } else {
883  setPassed<T::isEvent_>();
884  }
886  } else {
887  try {
888  runModule<T>(ep,es,streamID,parentContext,context);
889  } catch(...) {
890  exceptionPtr = std::current_exception();
891  }
892  }
893  waitingTasks_.doneWaiting(exceptionPtr);
894  return exceptionPtr;
895  }
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:580
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
Definition: Worker.cc:159
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:589
template<typename T >
std::exception_ptr edm::Worker::runModuleDirectly ( typename T::MyPrincipal const &  ep,
EventSetup const &  es,
StreamID  streamID,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

Definition at line 1101 of file Worker.h.

Referenced by edm::Path::finished().

1105  {
1106 
1107  timesVisited_.fetch_add(1,std::memory_order_relaxed);
1108  std::exception_ptr const* prefetchingException = nullptr; // null because there was no prefetching to do
1109  return runModuleAfterAsyncPrefetch<T>(prefetchingException, ep, es, streamID, parentContext, context);
1110  }
std::atomic< int > timesVisited_
Definition: Worker.h:572
virtual TaskQueueAdaptor edm::Worker::serializeRunModule ( )
privatepure virtual
void edm::Worker::setActivityRegistry ( std::shared_ptr< ActivityRegistry areg)

The signals are required to live longer than the last call to 'doWork' this was done to improve performance based on profiling

Definition at line 96 of file Worker.cc.

References actReg_.

96  {
97  actReg_ = areg;
98  }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:585
void edm::Worker::setEarlyDeleteHelper ( EarlyDeleteHelper iHelper)

Definition at line 281 of file Worker.cc.

References earlyDeleteHelper_.

281  {
282  earlyDeleteHelper_=iHelper;
283  }
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:587
template<bool IS_EVENT>
std::exception_ptr edm::Worker::setException ( std::exception_ptr  iException)
inlineprivate

Definition at line 345 of file Worker.h.

References Exception.

345  {
346  if (IS_EVENT) {
347  timesExcept_.fetch_add(1,std::memory_order_relaxed);
348  }
349  cached_exception_ = iException; // propagate_const<T> has no reset() function
350  state_ = Exception;
351  return cached_exception_;
352  }
std::atomic< int > timesExcept_
Definition: Worker.h:575
std::exception_ptr cached_exception_
Definition: Worker.h:583
std::atomic< State > state_
Definition: Worker.h:576
template<bool IS_EVENT>
bool edm::Worker::setFailed ( )
inlineprivate

Definition at line 336 of file Worker.h.

References Fail.

336  {
337  if(IS_EVENT) {
338  timesFailed_.fetch_add(1,std::memory_order_relaxed);
339  }
340  state_ = Fail;
341  return false;
342  }
std::atomic< int > timesFailed_
Definition: Worker.h:574
std::atomic< State > state_
Definition: Worker.h:576
template<bool IS_EVENT>
bool edm::Worker::setPassed ( )
inlineprivate

Definition at line 327 of file Worker.h.

References Pass.

327  {
328  if(IS_EVENT) {
329  timesPassed_.fetch_add(1,std::memory_order_relaxed);
330  }
331  state_ = Pass;
332  return true;
333  }
std::atomic< State > state_
Definition: Worker.h:576
std::atomic< int > timesPassed_
Definition: Worker.h:573
bool edm::Worker::shouldRethrowException ( std::exception_ptr  iPtr,
ParentContext const &  parentContext,
bool  isEvent,
TransitionIDValueBase const &  iID 
) const
private

Definition at line 159 of file Worker.cc.

References mps_fire::action, actions_, cms::Exception::category(), description(), edm::exception_actions::FailPath, edm::ExceptionToActionTable::find(), edm::ModuleCallingContext::getTopModuleCallingContext(), edm::exception_actions::IgnoreCompletely, edm::PathContext::isEndPath(), edm::ModuleCallingContext::kInvalid, edm::ParentContext::kPlaceInPath, edm::PlaceInPathContext::pathContext(), edm::ModuleCallingContext::placeInPathContext(), edm::printCmsExceptionWarning(), edm::exception_actions::Rethrow, edm::exception_actions::SkipEvent, edm::ModuleCallingContext::type(), and edm::convertException::wrap().

Referenced by runAcquire(), and runAcquireAfterAsyncPrefetch().

162  {
163 
164  // NOTE: the warning printed as a result of ignoring or failing
165  // a module will only be printed during the full true processing
166  // pass of this module
167 
168  // Get the action corresponding to this exception. However, if processing
169  // something other than an event (e.g. run, lumi) always rethrow.
170  if(not isEvent) {
171  return true;
172  }
173  try {
174  convertException::wrap([&]() {
175  std::rethrow_exception(iPtr);
176  });
177  } catch(cms::Exception &ex) {
179 
180  if(action == exception_actions::Rethrow) {
181  return true;
182  }
183 
184  ModuleCallingContext tempContext(&description(),ModuleCallingContext::State::kInvalid, parentContext, nullptr);
185 
186  // If we are processing an endpath and the module was scheduled, treat SkipEvent or FailPath
187  // as IgnoreCompletely, so any subsequent OutputModules are still run.
188  // For unscheduled modules only treat FailPath as IgnoreCompletely but still allow SkipEvent to throw
189  ModuleCallingContext const* top_mcc = tempContext.getTopModuleCallingContext();
190  if(top_mcc->type() == ParentContext::Type::kPlaceInPath &&
191  top_mcc->placeInPathContext()->pathContext()->isEndPath()) {
192 
193  if ((action == exception_actions::SkipEvent && tempContext.type() == ParentContext::Type::kPlaceInPath) ||
194  action == exception_actions::FailPath) {
196  }
197  }
199  edm::printCmsExceptionWarning("IgnoreCompletely", ex);
200  return false;
201  }
202  }
203  return true;
204  }
ModuleDescription const & description() const
Definition: Worker.h:182
std::string const & category() const
Definition: Exception.cc:183
exception_actions::ActionCodes find(const std::string &category) const
ExceptionToActionTable const * actions_
Definition: Worker.h:582
auto wrap(F iFunc) -> decltype(iFunc())
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
void edm::Worker::skipOnPath ( )

Definition at line 371 of file Worker.cc.

References cached_exception_, edm::WaitingTaskList::doneWaiting(), numberOfPathsLeftToRun_, and waitingTasks_.

Referenced by edm::WorkerInPath::skipWorker().

371  {
372  if( 0 == --numberOfPathsLeftToRun_) {
374  }
375  }
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:578
std::exception_ptr cached_exception_
Definition: Worker.h:583
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:589
State edm::Worker::state ( ) const
inline

Definition at line 223 of file Worker.h.

Referenced by edm::WorkerInPath::checkResultsOfRunWorker().

223 { return state_; }
std::atomic< State > state_
Definition: Worker.h:576
int edm::Worker::timesExcept ( ) const
inline

Definition at line 222 of file Worker.h.

Referenced by edm::fillWorkerSummaryAux().

222 { return timesExcept_.load(std::memory_order_acquire); }
std::atomic< int > timesExcept_
Definition: Worker.h:575
int edm::Worker::timesFailed ( ) const
inline

Definition at line 221 of file Worker.h.

Referenced by edm::fillWorkerSummaryAux().

221 { return timesFailed_.load(std::memory_order_acquire); }
std::atomic< int > timesFailed_
Definition: Worker.h:574
int edm::Worker::timesPass ( ) const
inline

Definition at line 225 of file Worker.h.

225 { return timesPassed(); } // for backward compatibility only - to be removed soon
int timesPassed() const
Definition: Worker.h:220
int edm::Worker::timesPassed ( ) const
inline

Definition at line 220 of file Worker.h.

Referenced by edm::fillWorkerSummaryAux().

220 { return timesPassed_.load(std::memory_order_acquire); }
std::atomic< int > timesPassed_
Definition: Worker.h:573
int edm::Worker::timesRun ( ) const
inline

Definition at line 218 of file Worker.h.

Referenced by edm::fillWorkerSummaryAux().

218 { return timesRun_.load(std::memory_order_acquire); }
std::atomic< int > timesRun_
Definition: Worker.h:571
int edm::Worker::timesVisited ( ) const
inline

Definition at line 219 of file Worker.h.

Referenced by edm::fillWorkerSummaryAux().

219 { return timesVisited_.load(std::memory_order_acquire); }
std::atomic< int > timesVisited_
Definition: Worker.h:572
virtual void edm::Worker::updateLookup ( BranchType  iBranchType,
ProductResolverIndexHelper const &   
)
pure virtual

Implemented in edm::WorkerT< T >.

Referenced by edm::Schedule::changeModule().

virtual bool edm::Worker::wantsGlobalLuminosityBlocks ( ) const
pure virtual
virtual bool edm::Worker::wantsGlobalRuns ( ) const
pure virtual
virtual bool edm::Worker::wantsStreamLuminosityBlocks ( ) const
pure virtual
virtual bool edm::Worker::wantsStreamRuns ( ) const
pure virtual
virtual std::string edm::Worker::workerType ( ) const
protectedpure virtual

Implemented in edm::WorkerT< T >.

Friends And Related Function Documentation

template<typename O >
friend class workerhelper::CallImpl
friend

Definition at line 228 of file Worker.h.

Member Data Documentation

ExceptionToActionTable const* edm::Worker::actions_
private

Definition at line 582 of file Worker.h.

Referenced by shouldRethrowException().

std::shared_ptr<ActivityRegistry> edm::Worker::actReg_
private

Definition at line 585 of file Worker.h.

Referenced by beginJob(), beginStream(), endJob(), endStream(), prefetchAsync(), and setActivityRegistry().

std::exception_ptr edm::Worker::cached_exception_
private

Definition at line 583 of file Worker.h.

Referenced by skipOnPath().

edm::propagate_const<EarlyDeleteHelper*> edm::Worker::earlyDeleteHelper_
private

Definition at line 587 of file Worker.h.

Referenced by postDoEvent(), and setEarlyDeleteHelper().

ModuleCallingContext edm::Worker::moduleCallingContext_
private
std::atomic<int> edm::Worker::numberOfPathsLeftToRun_
private

Definition at line 578 of file Worker.h.

Referenced by skipOnPath().

int edm::Worker::numberOfPathsOn_
private

Definition at line 577 of file Worker.h.

bool edm::Worker::ranAcquireWithoutException_
private

Definition at line 591 of file Worker.h.

Referenced by handleExternalWorkException(), and runAcquireAfterAsyncPrefetch().

std::atomic<State> edm::Worker::state_
private

Definition at line 576 of file Worker.h.

Referenced by beginJob(), beginStream(), endJob(), and endStream().

std::atomic<int> edm::Worker::timesExcept_
private

Definition at line 575 of file Worker.h.

std::atomic<int> edm::Worker::timesFailed_
private

Definition at line 574 of file Worker.h.

std::atomic<int> edm::Worker::timesPassed_
private

Definition at line 573 of file Worker.h.

std::atomic<int> edm::Worker::timesRun_
private

Definition at line 571 of file Worker.h.

Referenced by prePrefetchSelectionAsync(), and runAcquire().

std::atomic<int> edm::Worker::timesVisited_
private

Definition at line 572 of file Worker.h.

edm::WaitingTaskList edm::Worker::waitingTasks_
private

Definition at line 589 of file Worker.h.

Referenced by prePrefetchSelectionAsync(), and skipOnPath().

std::atomic<bool> edm::Worker::workStarted_
private

Definition at line 590 of file Worker.h.