CMS 3D CMS Logo

List of all members | Classes | Public Types | Public Member Functions | Protected Member Functions | Private Member Functions | Static Private Member Functions | Private Attributes | Friends
edm::Worker Class Referenceabstract

#include <Worker.h>

Inheritance diagram for edm::Worker:
edm::WorkerT< T >

Classes

class  AcquireTask
 
class  AcquireTask< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin >, DUMMY >
 
class  HandleExternalWorkExceptionTask
 
class  RunModuleTask
 
struct  TaskQueueAdaptor
 
class  TransitionIDValue
 
class  TransitionIDValueBase
 

Public Types

enum  State { Ready, Pass, Fail, Exception }
 
enum  Types { kAnalyzer, kFilter, kProducer, kOutputModule }
 

Public Member Functions

void addedToPath ()
 
void beginJob ()
 
void beginStream (StreamID id, StreamContext &streamContext)
 
void callWhenDoneAsync (WaitingTask *task)
 
void clearCounters ()
 
virtual std::vector< ConsumesInfoconsumesInfo () const =0
 
virtual void convertCurrentProcessAlias (std::string const &processName)=0
 
ModuleDescription const * descPtr () const
 
ModuleDescription const & description () const
 
template<typename T >
bool doWork (typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
 
template<typename T >
void doWorkAsync (WaitingTask *task, typename T::MyPrincipal const &, EventSetup const &c, ServiceToken const &token, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
 
template<typename T >
void doWorkNoPrefetchingAsync (WaitingTask *task, typename T::MyPrincipal const &, EventSetup const &c, ServiceToken const &token, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
 
void endJob ()
 
void endStream (StreamID id, StreamContext &streamContext)
 
virtual SerialTaskQueueglobalLuminosityBlocksQueue ()=0
 
virtual SerialTaskQueueglobalRunsQueue ()=0
 
virtual bool hasAccumulator () const =0
 
virtual void modulesWhoseProductsAreConsumed (std::vector< ModuleDescription const * > &modules, ProductRegistry const &preg, std::map< std::string, ModuleDescription const * > const &labelsToDesc) const =0
 
virtual Types moduleType () const =0
 
Workeroperator= (Worker const &)=delete
 
void postDoEvent (EventPrincipal const &)
 
void prePrefetchSelectionAsync (WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *)
 
void prePrefetchSelectionAsync (WaitingTask *task, ServiceToken const &, StreamID stream, void const *)
 
void registerThinnedAssociations (ProductRegistry const &registry, ThinnedAssociationsHelper &helper)
 
void reset ()
 
virtual void resolvePutIndicies (BranchType iBranchType, std::unordered_multimap< std::string, std::tuple< TypeID const *, const char *, edm::ProductResolverIndex >> const &iIndicies)=0
 
void respondToCloseInputFile (FileBlock const &fb)
 
void respondToOpenInputFile (FileBlock const &fb)
 
template<typename T >
std::exception_ptr runModuleDirectly (typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
 
void setActivityRegistry (std::shared_ptr< ActivityRegistry > areg)
 
void setEarlyDeleteHelper (EarlyDeleteHelper *iHelper)
 
void skipOnPath ()
 
State state () const
 
int timesExcept () const
 
int timesFailed () const
 
int timesPass () const
 
int timesPassed () const
 
int timesRun () const
 
int timesVisited () const
 
virtual void updateLookup (BranchType iBranchType, ProductResolverIndexHelper const &)=0
 
virtual bool wantsGlobalLuminosityBlocks () const =0
 
virtual bool wantsGlobalRuns () const =0
 
virtual bool wantsStreamLuminosityBlocks () const =0
 
virtual bool wantsStreamRuns () const =0
 
 Worker (ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
 
 Worker (Worker const &)=delete
 
virtual ~Worker ()
 

Protected Member Functions

ActivityRegistryactivityRegistry ()
 
virtual void implBeginJob ()=0
 
virtual void implBeginStream (StreamID)=0
 
virtual bool implDo (EventPrincipal const &, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual void implDoAcquire (EventPrincipal const &, EventSetup const &c, ModuleCallingContext const *mcc, WaitingTaskWithArenaHolder &holder)=0
 
virtual bool implDoBegin (RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoBegin (LuminosityBlockPrincipal const &lbp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoEnd (RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoEnd (LuminosityBlockPrincipal const &lbp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoPrePrefetchSelection (StreamID id, EventPrincipal const &ep, ModuleCallingContext const *mcc)=0
 
virtual bool implDoStreamBegin (StreamID id, RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoStreamBegin (StreamID id, LuminosityBlockPrincipal const &lbp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoStreamEnd (StreamID id, RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoStreamEnd (StreamID id, LuminosityBlockPrincipal const &lbp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual void implEndJob ()=0
 
virtual void implEndStream (StreamID)=0
 
virtual bool implNeedToRunSelection () const =0
 
virtual void itemsToGetForSelection (std::vector< ProductResolverIndexAndSkipBit > &) const =0
 
void resetModuleDescription (ModuleDescription const *)
 
virtual std::string workerType () const =0
 

Private Member Functions

void emitPostModuleEventPrefetchingSignal ()
 
std::exception_ptr handleExternalWorkException (std::exception_ptr const *iEPtr, ParentContext const &parentContext)
 
virtual bool hasAcquire () const =0
 
virtual void implRegisterThinnedAssociations (ProductRegistry const &, ThinnedAssociationsHelper &)=0
 
virtual void implRespondToCloseInputFile (FileBlock const &fb)=0
 
virtual void implRespondToOpenInputFile (FileBlock const &fb)=0
 
virtual void itemsMayGet (BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
 
virtual std::vector< ProductResolverIndex > const & itemsShouldPutInEvent () const =0
 
virtual void itemsToGet (BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
 
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom (BranchType) const =0
 
virtual void preActionBeforeRunEventAsync (WaitingTask *iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const =0
 
void prefetchAsync (WaitingTask *, ServiceToken const &, ParentContext const &parentContext, Principal const &)
 
void runAcquire (EventPrincipal const &ep, EventSetup const &es, ParentContext const &parentContext, WaitingTaskWithArenaHolder &holder)
 
void runAcquireAfterAsyncPrefetch (std::exception_ptr const *iEPtr, EventPrincipal const &ep, EventSetup const &es, ParentContext const &parentContext, WaitingTaskWithArenaHolder holder)
 
template<typename T >
bool runModule (typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
 
template<typename T >
std::exception_ptr runModuleAfterAsyncPrefetch (std::exception_ptr const *iEPtr, typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
 
virtual TaskQueueAdaptor serializeRunModule ()=0
 
template<bool IS_EVENT>
std::exception_ptr setException (std::exception_ptr iException)
 
template<bool IS_EVENT>
bool setFailed ()
 
template<bool IS_EVENT>
bool setPassed ()
 
bool shouldRethrowException (std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
 

Static Private Member Functions

static void exceptionContext (cms::Exception &ex, ModuleCallingContext const *mcc)
 

Private Attributes

ExceptionToActionTable const * actions_
 
std::shared_ptr< ActivityRegistryactReg_
 
std::exception_ptr cached_exception_
 
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
 
ModuleCallingContext moduleCallingContext_
 
std::atomic< int > numberOfPathsLeftToRun_
 
int numberOfPathsOn_
 
bool ranAcquireWithoutException_
 
std::atomic< Statestate_
 
std::atomic< int > timesExcept_
 
std::atomic< int > timesFailed_
 
std::atomic< int > timesPassed_
 
std::atomic< int > timesRun_
 
std::atomic< int > timesVisited_
 
edm::WaitingTaskList waitingTasks_
 
std::atomic< bool > workStarted_
 

Friends

template<typename O >
class workerhelper::CallImpl
 

Detailed Description

Definition at line 79 of file Worker.h.

Member Enumeration Documentation

Enumerator
Ready 
Pass 
Fail 
Exception 

Definition at line 81 of file Worker.h.

Enumerator
kAnalyzer 
kFilter 
kProducer 
kOutputModule 

Definition at line 82 of file Worker.h.

Constructor & Destructor Documentation

edm::Worker::Worker ( ModuleDescription const &  iMD,
ExceptionToActionTable const *  iActions 
)

Definition at line 73 of file Worker.cc.

74  :
75  timesRun_(0),
76  timesVisited_(0),
77  timesPassed_(0),
78  timesFailed_(0),
79  timesExcept_(0),
80  state_(Ready),
84  actions_(iActions),
86  actReg_(),
87  earlyDeleteHelper_(nullptr),
88  workStarted_(false),
90  {
91  }
std::atomic< int > timesVisited_
Definition: Worker.h:607
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:613
std::atomic< int > timesExcept_
Definition: Worker.h:610
std::atomic< bool > workStarted_
Definition: Worker.h:625
std::atomic< int > timesFailed_
Definition: Worker.h:609
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:620
ExceptionToActionTable const * actions_
Definition: Worker.h:617
bool ranAcquireWithoutException_
Definition: Worker.h:626
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:615
std::exception_ptr cached_exception_
Definition: Worker.h:618
std::atomic< State > state_
Definition: Worker.h:611
int numberOfPathsOn_
Definition: Worker.h:612
std::atomic< int > timesPassed_
Definition: Worker.h:608
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:622
std::atomic< int > timesRun_
Definition: Worker.h:606
edm::Worker::~Worker ( )
virtual

Definition at line 93 of file Worker.cc.

93  {
94  }
edm::Worker::Worker ( Worker const &  )
delete

Member Function Documentation

ActivityRegistry* edm::Worker::activityRegistry ( )
inlineprotected

Definition at line 274 of file Worker.h.

References edm::exceptionContext(), and benchmark_cfg::fb.

Referenced by edm::WorkerT< T >::implDo(), and edm::WorkerT< T >::implDoAcquire().

274 { return actReg_.get(); }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:620
void edm::Worker::addedToPath ( )
inline

Definition at line 220 of file Worker.h.

Referenced by edm::WorkerInPath::WorkerInPath().

220  {
222  }
int numberOfPathsOn_
Definition: Worker.h:612
void edm::Worker::beginJob ( void  )

Definition at line 289 of file Worker.cc.

References actReg_, cms::Exception::addContext(), description(), Exception, implBeginJob(), edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), state_, and edm::convertException::wrap().

Referenced by edm::WorkerManager::beginJob(), and edm::GlobalSchedule::replaceModule().

289  {
290  try {
291  convertException::wrap([&]() {
292  ModuleBeginJobSignalSentry cpp(actReg_.get(), description());
293  implBeginJob();
294  });
295  }
296  catch(cms::Exception& ex) {
297  state_ = Exception;
298  std::ostringstream ost;
299  ost << "Calling beginJob for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
300  ex.addContext(ost.str());
301  throw;
302  }
303  }
ModuleDescription const & description() const
Definition: Worker.h:188
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:620
std::string const & moduleName() const
std::string const & moduleLabel() const
std::atomic< State > state_
Definition: Worker.h:611
virtual void implBeginJob()=0
void addContext(std::string const &context)
Definition: Exception.cc:227
auto wrap(F iFunc) -> decltype(iFunc())
void edm::Worker::beginStream ( StreamID  id,
StreamContext streamContext 
)

Definition at line 321 of file Worker.cc.

References actReg_, cms::Exception::addContext(), description(), Exception, implBeginStream(), edm::LuminosityBlockIndex::invalidLuminosityBlockIndex(), edm::RunIndex::invalidRunIndex(), edm::StreamContext::kBeginStream, edm::ModuleCallingContext::kRunning, moduleCallingContext_, edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), edm::StreamContext::setEventID(), edm::StreamContext::setLuminosityBlockIndex(), edm::StreamContext::setRunIndex(), edm::ModuleCallingContext::setState(), edm::StreamContext::setTimestamp(), edm::StreamContext::setTransition(), state_, and edm::convertException::wrap().

Referenced by edm::StreamSchedule::replaceModule().

321  {
322  try {
323  convertException::wrap([&]() {
324  streamContext.setTransition(StreamContext::Transition::kBeginStream);
325  streamContext.setEventID(EventID(0, 0, 0));
326  streamContext.setRunIndex(RunIndex::invalidRunIndex());
327  streamContext.setLuminosityBlockIndex(LuminosityBlockIndex::invalidLuminosityBlockIndex());
328  streamContext.setTimestamp(Timestamp());
329  ParentContext parentContext(&streamContext);
330  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
332  ModuleBeginStreamSignalSentry beginSentry(actReg_.get(), streamContext, moduleCallingContext_);
333  implBeginStream(id);
334  });
335  }
336  catch(cms::Exception& ex) {
337  state_ = Exception;
338  std::ostringstream ost;
339  ost << "Calling beginStream for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
340  ex.addContext(ost.str());
341  throw;
342  }
343  }
ModuleDescription const & description() const
Definition: Worker.h:188
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:620
std::string const & moduleName() const
std::string const & moduleLabel() const
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:615
static RunIndex invalidRunIndex()
Definition: RunIndex.cc:9
std::atomic< State > state_
Definition: Worker.h:611
static LuminosityBlockIndex invalidLuminosityBlockIndex()
void addContext(std::string const &context)
Definition: Exception.cc:227
virtual void implBeginStream(StreamID)=0
auto wrap(F iFunc) -> decltype(iFunc())
void edm::Worker::callWhenDoneAsync ( WaitingTask task)
inline

Definition at line 165 of file Worker.h.

References bk::beginJob().

165  {
166  waitingTasks_.add(task);
167  }
void add(WaitingTask *)
Adds task to the waiting list.
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:624
void edm::Worker::clearCounters ( )
inline

Definition at line 212 of file Worker.h.

Referenced by edm::StreamSchedule::clearCounters().

212  {
213  timesRun_.store(0,std::memory_order_release);
214  timesVisited_.store(0,std::memory_order_release);
215  timesPassed_.store(0,std::memory_order_release);
216  timesFailed_.store(0,std::memory_order_release);
217  timesExcept_.store(0,std::memory_order_release);
218  }
std::atomic< int > timesVisited_
Definition: Worker.h:607
std::atomic< int > timesExcept_
Definition: Worker.h:610
std::atomic< int > timesFailed_
Definition: Worker.h:609
std::atomic< int > timesPassed_
Definition: Worker.h:608
std::atomic< int > timesRun_
Definition: Worker.h:606
virtual std::vector<ConsumesInfo> edm::Worker::consumesInfo ( ) const
pure virtual
virtual void edm::Worker::convertCurrentProcessAlias ( std::string const &  processName)
pure virtual

Implemented in edm::WorkerT< T >.

ModuleDescription const* edm::Worker::descPtr ( ) const
inline

Definition at line 189 of file Worker.h.

References modifiedElectrons_cfi::processName, and AlCaHLTBitMon_QueryRunRegistry::string.

ModuleCallingContext moduleCallingContext_
Definition: Worker.h:615
ModuleDescription const * moduleDescription() const
ModuleDescription const& edm::Worker::description ( ) const
inline
template<typename T >
bool edm::Worker::doWork ( typename T::MyPrincipal const &  ep,
EventSetup const &  c,
StreamID  stream,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

Definition at line 995 of file Worker.h.

References Fail, edm::ServiceRegistry::instance(), edm::ModuleCallingContext::kInvalid, edm::ModuleCallingContext::kPrefetching, edm::make_empty_waiting_task(), Pass, edm::ServiceRegistry::presentToken(), createBeamHaloJobs::queue, Ready, and edm::ModuleCallingContext::setContext().

999  {
1000 
1001  if (T::isEvent_) {
1002  timesVisited_.fetch_add(1,std::memory_order_relaxed);
1003  }
1004  bool rc = false;
1005 
1006  switch(state_) {
1007  case Ready: break;
1008  case Pass: return true;
1009  case Fail: return false;
1010  case Exception: {
1011  std::rethrow_exception(cached_exception_);
1012  }
1013  }
1014 
1015  bool expected = false;
1016  if(not workStarted_.compare_exchange_strong(expected, true) ) {
1017  //another thread beat us here
1018  auto waitTask = edm::make_empty_waiting_task();
1019  waitTask->increment_ref_count();
1020 
1021  waitingTasks_.add(waitTask.get());
1022 
1023  waitTask->wait_for_all();
1024 
1025  switch(state_) {
1026  case Ready: assert(false);
1027  case Pass: return true;
1028  case Fail: return false;
1029  case Exception: {
1030  std::rethrow_exception(cached_exception_);
1031  }
1032  }
1033  }
1034 
1035  //Need the context to be set until after any exception is resolved
1037 
1038  auto resetContext = [](ModuleCallingContext* iContext) {iContext->setContext(ModuleCallingContext::State::kInvalid,ParentContext(),nullptr); };
1039  std::unique_ptr<ModuleCallingContext, decltype(resetContext)> prefetchSentry(&moduleCallingContext_,resetContext);
1040 
1041  if (T::isEvent_) {
1042  //if have TriggerResults based selection we want to reject the event before doing prefetching
1043  if ( workerhelper::CallImpl<T>::needToRunSelection(this)) {
1044  auto waitTask = edm::make_empty_waiting_task();
1045  waitTask->set_ref_count(2);
1046  prePrefetchSelectionAsync(waitTask.get(),
1048  streamID, &ep);
1049  waitTask->decrement_ref_count();
1050  waitTask->wait_for_all();
1051 
1052  if(state() != Ready) {
1053  //The selection must have rejected this event
1054  return true;
1055  }
1056  }
1057  auto waitTask = edm::make_empty_waiting_task();
1058  {
1059  //Make sure signal is sent once the prefetching is done
1060  // [the 'pre' signal was sent in prefetchAsync]
1061  //The purpose of this block is to send the signal after wait_for_all
1062  auto sentryFunc = [this](void*) {
1064  };
1065  std::unique_ptr<ActivityRegistry, decltype(sentryFunc)> signalSentry(actReg_.get(),sentryFunc);
1066 
1067  //set count to 2 since wait_for_all requires value to not go to 0
1068  waitTask->set_ref_count(2);
1069 
1070  prefetchAsync(waitTask.get(),ServiceRegistry::instance().presentToken(), parentContext, ep);
1071  waitTask->decrement_ref_count();
1072  waitTask->wait_for_all();
1073  }
1074  if(waitTask->exceptionPtr() != nullptr) {
1075  TransitionIDValue<typename T::MyPrincipal> idValue(ep);
1076  if(shouldRethrowException(*waitTask->exceptionPtr(), parentContext, T::isEvent_, idValue)) {
1077  setException<T::isEvent_>(*waitTask->exceptionPtr());
1079  std::rethrow_exception(cached_exception_);
1080  } else {
1081  setPassed<T::isEvent_>();
1082  waitingTasks_.doneWaiting(nullptr);
1083  return true;
1084  }
1085  }
1086  }
1087 
1088  //successful prefetch so no reset necessary
1089  prefetchSentry.release();
1090  if(auto queue = serializeRunModule()) {
1091  auto serviceToken = ServiceRegistry::instance().presentToken();
1092  queue.pushAndWait([&]() {
1093  //Need to make the services available
1094  ServiceRegistry::Operate guard(serviceToken);
1095  try {
1096  rc = runModule<T>(ep,es,streamID,parentContext,context);
1097  } catch(...) {
1098  }
1099  });
1100  } else {
1101  try {
1102  rc = runModule<T>(ep,es,streamID,parentContext,context);
1103  } catch(...) {
1104  }
1105  }
1106  if(state_ == Exception) {
1108  std::rethrow_exception(cached_exception_);
1109  }
1110 
1111  waitingTasks_.doneWaiting(nullptr);
1112  return rc;
1113  }
std::atomic< int > timesVisited_
Definition: Worker.h:607
std::atomic< bool > workStarted_
Definition: Worker.h:625
void add(WaitingTask *)
Adds task to the waiting list.
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:620
State state() const
Definition: Worker.h:229
ServiceToken presentToken() const
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:615
std::exception_ptr cached_exception_
Definition: Worker.h:618
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
Definition: Worker.cc:159
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:624
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
static ServiceRegistry & instance()
std::atomic< State > state_
Definition: Worker.h:611
void prefetchAsync(WaitingTask *, ServiceToken const &, ParentContext const &parentContext, Principal const &)
Definition: Worker.cc:207
virtual TaskQueueAdaptor serializeRunModule()=0
void prePrefetchSelectionAsync(WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *)
Definition: Worker.cc:237
void emitPostModuleEventPrefetchingSignal()
Definition: Worker.h:367
template<typename T >
void edm::Worker::doWorkAsync ( WaitingTask task,
typename T::MyPrincipal const &  ep,
EventSetup const &  c,
ServiceToken const &  token,
StreamID  stream,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

Definition at line 852 of file Worker.h.

References pyrootRender::destroy(), edm::ModuleCallingContext::kPrefetching, edm::make_waiting_task(), eostools::move(), AlCaHLTBitMon_ParallelJobs::p, fetchall_from_DQM_v2::release, edm::ModuleCallingContext::setContext(), and lumiQTWidget::t.

Referenced by edm::WorkerInPath::runWorkerAsync().

858  {
859  if (not workerhelper::CallImpl<T>::wantsTransition(this)) {
860  return;
861  }
862 
863  waitingTasks_.add(task);
864  if(T::isEvent_) {
865  timesVisited_.fetch_add(1,std::memory_order_relaxed);
866  }
867 
868  bool expected = false;
869  if(workStarted_.compare_exchange_strong(expected,true)) {
871 
872  //if have TriggerResults based selection we want to reject the event before doing prefetching
873  if(workerhelper::CallImpl<T>::needToRunSelection(this)) {
874  //We need to run the selection in a different task so that
875  // we can prefetch the data needed for the selection
876  auto runTask = new (tbb::task::allocate_root()) RunModuleTask<T>(
877  this, ep,es,token,streamID,parentContext,context);
878 
879  //make sure the task is either run or destroyed
880  struct DestroyTask {
881  DestroyTask(edm::WaitingTask* iTask):
882  m_task(iTask) {}
883 
884  ~DestroyTask() {
885  auto p = m_task.load();
886  if(p) {
888  }
889  }
890 
892  auto t = m_task.load();
893  m_task.store(nullptr);
894  return t;
895  }
896 
897  std::atomic<edm::WaitingTask*> m_task;
898  };
899 
900  auto ownRunTask = std::make_shared<DestroyTask>(runTask);
901  auto selectionTask = make_waiting_task(tbb::task::allocate_root(), [ownRunTask,parentContext,&ep,token, this] (std::exception_ptr const* ) mutable {
902 
903  ServiceRegistry::Operate guard(token);
904  prefetchAsync(ownRunTask->release(), token, parentContext, ep);
905  });
906  prePrefetchSelectionAsync(selectionTask,token,streamID, &ep);
907  } else {
908  WaitingTask* moduleTask = new (tbb::task::allocate_root()) RunModuleTask<T>(
909  this, ep, es, token, streamID, parentContext, context);
910  if (T::isEvent_ && hasAcquire()) {
911  WaitingTaskWithArenaHolder runTaskHolder(
912  new (tbb::task::allocate_root())
913  HandleExternalWorkExceptionTask(this,
914  moduleTask,
915  parentContext));
916  moduleTask = new (tbb::task::allocate_root()) AcquireTask<T>(
917  this, ep, es, token, parentContext, std::move(runTaskHolder));
918  }
919  prefetchAsync(moduleTask, token, parentContext, ep);
920  }
921  }
922  }
std::atomic< int > timesVisited_
Definition: Worker.h:607
def destroy(e)
Definition: pyrootRender.py:14
std::atomic< bool > workStarted_
Definition: Worker.h:625
void add(WaitingTask *)
Adds task to the waiting list.
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:615
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:624
virtual bool hasAcquire() const =0
void prefetchAsync(WaitingTask *, ServiceToken const &, ParentContext const &parentContext, Principal const &)
Definition: Worker.cc:207
void prePrefetchSelectionAsync(WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *)
Definition: Worker.cc:237
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:92
def move(src, dest)
Definition: eostools.py:511
template<typename T >
void edm::Worker::doWorkNoPrefetchingAsync ( WaitingTask task,
typename T::MyPrincipal const &  principal,
EventSetup const &  c,
ServiceToken const &  token,
StreamID  stream,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

Definition at line 954 of file Worker.h.

References edm::make_functor_task(), cmsRelvalreport::principal(), and createBeamHaloJobs::queue.

Referenced by edm::WorkerInPath::runWorkerAsync().

960  {
961  if (not workerhelper::CallImpl<T>::wantsTransition(this)) {
962  return;
963  }
964  waitingTasks_.add(task);
965  bool expected = false;
966  if(workStarted_.compare_exchange_strong(expected,true)) {
967 
968  auto toDo =[this, &principal, &es, streamID,parentContext,context, serviceToken]()
969  {
970  std::exception_ptr exceptionPtr;
971  try {
972  //Need to make the services available
973  ServiceRegistry::Operate guard(serviceToken);
974 
975  this->runModule<T>(principal,
976  es,
977  streamID,
978  parentContext,
979  context);
980  } catch( ... ) {
981  exceptionPtr = std::current_exception();
982  }
983  this->waitingTasks_.doneWaiting(exceptionPtr);
984  };
985  if(auto queue = this->serializeRunModule()) {
986  queue.push( toDo);
987  } else {
988  auto task = make_functor_task( tbb::task::allocate_root(), toDo);
989  tbb::task::spawn(*task);
990  }
991  }
992  }
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
std::atomic< bool > workStarted_
Definition: Worker.h:625
void add(WaitingTask *)
Adds task to the waiting list.
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
def principal(options)
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:624
virtual TaskQueueAdaptor serializeRunModule()=0
void edm::Worker::emitPostModuleEventPrefetchingSignal ( )
inlineprivate

Definition at line 367 of file Worker.h.

367  {
368  actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),moduleCallingContext_);
369  }
StreamContext const * getStreamContext() const
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:620
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:615
void edm::Worker::endJob ( void  )

Definition at line 305 of file Worker.cc.

References actReg_, cms::Exception::addContext(), description(), Exception, implEndJob(), edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), state_, and edm::convertException::wrap().

305  {
306  try {
307  convertException::wrap([&]() {
308  ModuleEndJobSignalSentry cpp(actReg_.get(), description());
309  implEndJob();
310  });
311  }
312  catch(cms::Exception& ex) {
313  state_ = Exception;
314  std::ostringstream ost;
315  ost << "Calling endJob for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
316  ex.addContext(ost.str());
317  throw;
318  }
319  }
ModuleDescription const & description() const
Definition: Worker.h:188
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:620
std::string const & moduleName() const
std::string const & moduleLabel() const
std::atomic< State > state_
Definition: Worker.h:611
virtual void implEndJob()=0
void addContext(std::string const &context)
Definition: Exception.cc:227
auto wrap(F iFunc) -> decltype(iFunc())
void edm::Worker::endStream ( StreamID  id,
StreamContext streamContext 
)

Definition at line 345 of file Worker.cc.

References actReg_, cms::Exception::addContext(), description(), Exception, implEndStream(), edm::LuminosityBlockIndex::invalidLuminosityBlockIndex(), edm::RunIndex::invalidRunIndex(), edm::StreamContext::kEndStream, edm::ModuleCallingContext::kRunning, moduleCallingContext_, edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), edm::StreamContext::setEventID(), edm::StreamContext::setLuminosityBlockIndex(), edm::StreamContext::setRunIndex(), edm::ModuleCallingContext::setState(), edm::StreamContext::setTimestamp(), edm::StreamContext::setTransition(), state_, and edm::convertException::wrap().

345  {
346  try {
347  convertException::wrap([&]() {
348  streamContext.setTransition(StreamContext::Transition::kEndStream);
349  streamContext.setEventID(EventID(0, 0, 0));
350  streamContext.setRunIndex(RunIndex::invalidRunIndex());
351  streamContext.setLuminosityBlockIndex(LuminosityBlockIndex::invalidLuminosityBlockIndex());
352  streamContext.setTimestamp(Timestamp());
353  ParentContext parentContext(&streamContext);
354  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
356  ModuleEndStreamSignalSentry endSentry(actReg_.get(), streamContext, moduleCallingContext_);
357  implEndStream(id);
358  });
359  }
360  catch(cms::Exception& ex) {
361  state_ = Exception;
362  std::ostringstream ost;
363  ost << "Calling endStream for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
364  ex.addContext(ost.str());
365  throw;
366  }
367  }
ModuleDescription const & description() const
Definition: Worker.h:188
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:620
std::string const & moduleName() const
std::string const & moduleLabel() const
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:615
static RunIndex invalidRunIndex()
Definition: RunIndex.cc:9
std::atomic< State > state_
Definition: Worker.h:611
static LuminosityBlockIndex invalidLuminosityBlockIndex()
void addContext(std::string const &context)
Definition: Exception.cc:227
virtual void implEndStream(StreamID)=0
auto wrap(F iFunc) -> decltype(iFunc())
void edm::Worker::exceptionContext ( cms::Exception ex,
ModuleCallingContext const *  mcc 
)
staticprivate

Definition at line 101 of file Worker.cc.

References cms::Exception::addContext(), edm::exceptionContext(), edm::ModuleCallingContext::globalContext(), edm::ModuleCallingContext::internalContext(), edm::ParentContext::kGlobal, edm::ParentContext::kInternal, edm::ParentContext::kModule, edm::ParentContext::kPlaceInPath, edm::ModuleCallingContext::kPrefetching, edm::ParentContext::kStream, edm::InternalContext::moduleCallingContext(), edm::ModuleCallingContext::moduleCallingContext(), edm::ModuleCallingContext::moduleDescription(), edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), or, edm::PlaceInPathContext::pathContext(), edm::PathContext::pathName(), edm::ModuleCallingContext::placeInPathContext(), edm::ModuleCallingContext::state(), edm::PathContext::streamContext(), edm::ModuleCallingContext::streamContext(), and edm::ModuleCallingContext::type().

Referenced by handleExternalWorkException(), and runAcquire().

103  {
104 
105  ModuleCallingContext const* imcc = mcc;
106  while( (imcc->type() == ParentContext::Type::kModule) or
107  (imcc->type() == ParentContext::Type::kInternal) ) {
108  std::ostringstream iost;
109  if( imcc->state() == ModuleCallingContext::State::kPrefetching ) {
110  iost << "Prefetching for module ";
111  } else {
112  iost << "Calling method for module ";
113  }
114  iost << imcc->moduleDescription()->moduleName() << "/'"
115  << imcc->moduleDescription()->moduleLabel() << "'";
116 
117  if(imcc->type() == ParentContext::Type::kInternal) {
118  iost << " (probably inside some kind of mixing module)";
119  imcc = imcc->internalContext()->moduleCallingContext();
120  } else {
121  imcc = imcc->moduleCallingContext();
122  }
123  ex.addContext(iost.str());
124  }
125  std::ostringstream ost;
126  if( imcc->state() == ModuleCallingContext::State::kPrefetching ) {
127  ost << "Prefetching for module ";
128  } else {
129  ost << "Calling method for module ";
130  }
131  ost << imcc->moduleDescription()->moduleName() << "/'"
132  << imcc->moduleDescription()->moduleLabel() << "'";
133  ex.addContext(ost.str());
134 
135  if (imcc->type() == ParentContext::Type::kPlaceInPath) {
136  ost.str("");
137  ost << "Running path '";
138  ost << imcc->placeInPathContext()->pathContext()->pathName() << "'";
139  ex.addContext(ost.str());
140  auto streamContext =imcc->placeInPathContext()->pathContext()->streamContext();
141  if(streamContext) {
142  ost.str("");
143  edm::exceptionContext(ost,*streamContext);
144  ex.addContext(ost.str());
145  }
146  } else {
147  if (imcc->type() == ParentContext::Type::kStream) {
148  ost.str("");
149  edm::exceptionContext(ost, *(imcc->streamContext()) );
150  ex.addContext(ost.str());
151  } else if(imcc->type() == ParentContext::Type::kGlobal) {
152  ost.str("");
153  edm::exceptionContext(ost, *(imcc->globalContext()) );
154  ex.addContext(ost.str());
155  }
156  }
157  }
void exceptionContext(std::ostream &, GlobalContext const &)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
void addContext(std::string const &context)
Definition: Exception.cc:227
virtual SerialTaskQueue* edm::Worker::globalLuminosityBlocksQueue ( )
pure virtual
virtual SerialTaskQueue* edm::Worker::globalRunsQueue ( )
pure virtual
std::exception_ptr edm::Worker::handleExternalWorkException ( std::exception_ptr const *  iEPtr,
ParentContext const &  parentContext 
)
private

Definition at line 428 of file Worker.cc.

References exceptionContext(), edm::Worker::HandleExternalWorkExceptionTask::HandleExternalWorkExceptionTask(), moduleCallingContext_, ranAcquireWithoutException_, and edm::convertException::wrap().

Referenced by edm::Worker::HandleExternalWorkExceptionTask::execute().

429  {
431  try {
432  convertException::wrap([iEPtr]() {
433  std::rethrow_exception(*iEPtr);
434  });
435  } catch(cms::Exception &ex) {
436  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
438  return std::current_exception();
439  }
440  }
441  return *iEPtr;
442  }
static void exceptionContext(cms::Exception &ex, ModuleCallingContext const *mcc)
Definition: Worker.cc:101
bool ranAcquireWithoutException_
Definition: Worker.h:626
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:615
auto wrap(F iFunc) -> decltype(iFunc())
virtual bool edm::Worker::hasAccumulator ( ) const
pure virtual
virtual bool edm::Worker::hasAcquire ( ) const
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::implBeginJob ( )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by beginJob().

virtual void edm::Worker::implBeginStream ( StreamID  )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by beginStream().

virtual bool edm::Worker::implDo ( EventPrincipal const &  ,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual
virtual void edm::Worker::implDoAcquire ( EventPrincipal const &  ,
EventSetup const &  c,
ModuleCallingContext const *  mcc,
WaitingTaskWithArenaHolder holder 
)
protectedpure virtual
virtual bool edm::Worker::implDoBegin ( RunPrincipal const &  rp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual
virtual bool edm::Worker::implDoBegin ( LuminosityBlockPrincipal const &  lbp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

virtual bool edm::Worker::implDoEnd ( RunPrincipal const &  rp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual
virtual bool edm::Worker::implDoEnd ( LuminosityBlockPrincipal const &  lbp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

virtual bool edm::Worker::implDoPrePrefetchSelection ( StreamID  id,
EventPrincipal const &  ep,
ModuleCallingContext const *  mcc 
)
protectedpure virtual
virtual bool edm::Worker::implDoStreamBegin ( StreamID  id,
RunPrincipal const &  rp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual
virtual bool edm::Worker::implDoStreamBegin ( StreamID  id,
LuminosityBlockPrincipal const &  lbp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

virtual bool edm::Worker::implDoStreamEnd ( StreamID  id,
RunPrincipal const &  rp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual
virtual bool edm::Worker::implDoStreamEnd ( StreamID  id,
LuminosityBlockPrincipal const &  lbp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::implEndJob ( )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by endJob().

virtual void edm::Worker::implEndStream ( StreamID  )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by endStream().

virtual bool edm::Worker::implNeedToRunSelection ( ) const
protectedpure virtual
virtual void edm::Worker::implRegisterThinnedAssociations ( ProductRegistry const &  ,
ThinnedAssociationsHelper  
)
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::implRespondToCloseInputFile ( FileBlock const &  fb)
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::implRespondToOpenInputFile ( FileBlock const &  fb)
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::itemsMayGet ( BranchType  ,
std::vector< ProductResolverIndexAndSkipBit > &   
) const
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual std::vector<ProductResolverIndex> const& edm::Worker::itemsShouldPutInEvent ( ) const
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::itemsToGet ( BranchType  ,
std::vector< ProductResolverIndexAndSkipBit > &   
) const
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::itemsToGetForSelection ( std::vector< ProductResolverIndexAndSkipBit > &  ) const
protectedpure virtual
virtual std::vector<ProductResolverIndexAndSkipBit> const& edm::Worker::itemsToGetFrom ( BranchType  ) const
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by prefetchAsync().

virtual void edm::Worker::modulesWhoseProductsAreConsumed ( std::vector< ModuleDescription const * > &  modules,
ProductRegistry const &  preg,
std::map< std::string, ModuleDescription const * > const &  labelsToDesc 
) const
pure virtual

Implemented in edm::WorkerT< T >.

virtual Types edm::Worker::moduleType ( ) const
pure virtual
Worker& edm::Worker::operator= ( Worker const &  )
delete
void edm::Worker::postDoEvent ( EventPrincipal const &  iEvent)

Definition at line 375 of file Worker.cc.

References earlyDeleteHelper_.

Referenced by edm::WorkerT< T >::implDo().

375  {
376  if(earlyDeleteHelper_) {
377  earlyDeleteHelper_->moduleRan(iEvent);
378  }
379  }
int iEvent
Definition: GenABIO.cc:230
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:622
virtual void edm::Worker::preActionBeforeRunEventAsync ( WaitingTask iTask,
ModuleCallingContext const &  moduleCallingContext,
Principal const &  iPrincipal 
) const
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by prefetchAsync().

void edm::Worker::prefetchAsync ( WaitingTask iTask,
ServiceToken const &  token,
ParentContext const &  parentContext,
Principal const &  iPrincipal 
)
private

Definition at line 207 of file Worker.cc.

References actReg_, edm::Principal::branchType(), edm::ModuleCallingContext::getStreamContext(), edm::InEvent, mps_monitormerge::items, itemsToGetFrom(), edm::ModuleCallingContext::kPrefetching, moduleCallingContext_, preActionBeforeRunEventAsync(), edm::Principal::prefetchAsync(), edm::ProductResolverIndexAmbiguous, and edm::ModuleCallingContext::setContext().

207  {
208  // Prefetch products the module declares it consumes (not including the products it maybe consumes)
209  std::vector<ProductResolverIndexAndSkipBit> const& items = itemsToGetFrom(iPrincipal.branchType());
210 
212 
213  if(iPrincipal.branchType()==InEvent) {
214  actReg_->preModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),moduleCallingContext_);
215  }
216 
217  //Need to be sure the ref count isn't set to 0 immediately
218  iTask->increment_ref_count();
219  for(auto const& item : items) {
220  ProductResolverIndex productResolverIndex = item.productResolverIndex();
221  bool skipCurrentProcess = item.skipCurrentProcess();
222  if(productResolverIndex != ProductResolverIndexAmbiguous) {
223  iPrincipal.prefetchAsync(iTask,productResolverIndex, skipCurrentProcess, token, &moduleCallingContext_);
224  }
225  }
226 
227  if(iPrincipal.branchType()==InEvent) {
229  }
230 
231  if(0 == iTask->decrement_ref_count()) {
232  //if everything finishes before we leave this routine, we need to launch the task
233  tbb::task::spawn(*iTask);
234  }
235  }
unsigned int ProductResolverIndex
StreamContext const * getStreamContext() const
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:620
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:615
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom(BranchType) const =0
virtual void preActionBeforeRunEventAsync(WaitingTask *iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const =0
void edm::Worker::prePrefetchSelectionAsync ( WaitingTask task,
ServiceToken const &  token,
StreamID  stream,
EventPrincipal const *  iPrincipal 
)

Definition at line 237 of file Worker.cc.

References pyrootRender::destroy(), edm::WaitingTaskList::doneWaiting(), implDoPrePrefetchSelection(), mps_monitormerge::items, itemsToGetForSelection(), edm::make_waiting_task(), moduleCallingContext_, edm::Principal::prefetchAsync(), edm::ProductResolverIndexAmbiguous, timesRun_, and waitingTasks_.

240  {
241  successTask->increment_ref_count();
242 
243  auto choiceTask = edm::make_waiting_task(tbb::task::allocate_root(),
244  [id,successTask,iPrincipal,this,token](std::exception_ptr const*) {
245  ServiceRegistry::Operate guard(token);
246  try {
247  if( not implDoPrePrefetchSelection(id,*iPrincipal,&moduleCallingContext_) ) {
248  timesRun_.fetch_add(1,std::memory_order_relaxed);
249  setPassed<true>();
250  waitingTasks_.doneWaiting(nullptr);
251  //TBB requires that destroyed tasks have count 0
252  if ( 0 == successTask->decrement_ref_count() ) {
253  tbb::task::destroy(*successTask);
254  }
255  return;
256  }
257  } catch(...) {}
258  if(0 == successTask->decrement_ref_count()) {
259  tbb::task::spawn(*successTask);
260  }
261  });
262 
263  WaitingTaskHolder choiceHolder{choiceTask};
264 
265  std::vector<ProductResolverIndexAndSkipBit> items;
266  itemsToGetForSelection(items);
267 
268  for(auto const& item : items) {
269  ProductResolverIndex productResolverIndex = item.productResolverIndex();
270  bool skipCurrentProcess = item.skipCurrentProcess();
271  if(productResolverIndex != ProductResolverIndexAmbiguous) {
272  iPrincipal->prefetchAsync(choiceTask,productResolverIndex, skipCurrentProcess, token, &moduleCallingContext_);
273  }
274  }
275  choiceHolder.doneWaiting(std::exception_ptr{});
276  }
unsigned int ProductResolverIndex
def destroy(e)
Definition: pyrootRender.py:14
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:615
virtual void itemsToGetForSelection(std::vector< ProductResolverIndexAndSkipBit > &) const =0
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:624
virtual bool implDoPrePrefetchSelection(StreamID id, EventPrincipal const &ep, ModuleCallingContext const *mcc)=0
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:92
std::atomic< int > timesRun_
Definition: Worker.h:606
void edm::Worker::prePrefetchSelectionAsync ( WaitingTask task,
ServiceToken const &  ,
StreamID  stream,
void const *   
)
inline

Definition at line 137 of file Worker.h.

140  {assert(false);}
void edm::Worker::registerThinnedAssociations ( ProductRegistry const &  registry,
ThinnedAssociationsHelper helper 
)
inline

Definition at line 176 of file Worker.h.

Definition: helper.py:1
virtual void implRegisterThinnedAssociations(ProductRegistry const &, ThinnedAssociationsHelper &)=0
static Interceptor::Registry registry("Interceptor")
void edm::Worker::reset ( void  )
inline

Definition at line 178 of file Worker.h.

References Ready.

Referenced by edm::WorkerManager::resetAll().

178  {
179  cached_exception_ = std::exception_ptr();
180  state_ = Ready;
182  workStarted_ = false;
184  }
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:613
std::atomic< bool > workStarted_
Definition: Worker.h:625
void reset()
Resets access to the resource so that added tasks will wait.
std::exception_ptr cached_exception_
Definition: Worker.h:618
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:624
std::atomic< State > state_
Definition: Worker.h:611
int numberOfPathsOn_
Definition: Worker.h:612
void edm::Worker::resetModuleDescription ( ModuleDescription const *  iDesc)
protected
virtual void edm::Worker::resolvePutIndicies ( BranchType  iBranchType,
std::unordered_multimap< std::string, std::tuple< TypeID const *, const char *, edm::ProductResolverIndex >> const &  iIndicies 
)
pure virtual

Implemented in edm::WorkerT< T >.

Referenced by edm::Schedule::changeModule().

void edm::Worker::respondToCloseInputFile ( FileBlock const &  fb)
inline

Definition at line 174 of file Worker.h.

Referenced by edm::Schedule::respondToCloseInputFile().

virtual void implRespondToCloseInputFile(FileBlock const &fb)=0
void edm::Worker::respondToOpenInputFile ( FileBlock const &  fb)
inline

Definition at line 173 of file Worker.h.

Referenced by edm::Schedule::respondToOpenInputFile().

virtual void implRespondToOpenInputFile(FileBlock const &fb)=0
void edm::Worker::runAcquire ( EventPrincipal const &  ep,
EventSetup const &  es,
ParentContext const &  parentContext,
WaitingTaskWithArenaHolder holder 
)
private

Definition at line 381 of file Worker.cc.

References exceptionContext(), implDoAcquire(), moduleCallingContext_, shouldRethrowException(), timesRun_, and edm::convertException::wrap().

Referenced by runAcquireAfterAsyncPrefetch().

384  {
385 
386  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
387  try {
389  {
390  this->implDoAcquire(ep, es, &moduleCallingContext_, holder);
391  });
392  } catch(cms::Exception& ex) {
394  TransitionIDValue<EventPrincipal> idValue(ep);
395  if(shouldRethrowException(std::current_exception(), parentContext, true, idValue)) {
396  timesRun_.fetch_add(1,std::memory_order_relaxed);
397  throw;
398  }
399  }
400  }
static void exceptionContext(cms::Exception &ex, ModuleCallingContext const *mcc)
Definition: Worker.cc:101
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:615
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
Definition: Worker.cc:159
virtual void implDoAcquire(EventPrincipal const &, EventSetup const &c, ModuleCallingContext const *mcc, WaitingTaskWithArenaHolder &holder)=0
auto wrap(F iFunc) -> decltype(iFunc())
std::atomic< int > timesRun_
Definition: Worker.h:606
void edm::Worker::runAcquireAfterAsyncPrefetch ( std::exception_ptr const *  iEPtr,
EventPrincipal const &  ep,
EventSetup const &  es,
ParentContext const &  parentContext,
WaitingTaskWithArenaHolder  holder 
)
private

Definition at line 402 of file Worker.cc.

References edm::WaitingTaskWithArenaHolder::doneWaiting(), edm::ModuleCallingContext::kInvalid, moduleCallingContext_, ranAcquireWithoutException_, runAcquire(), edm::ModuleCallingContext::setContext(), and shouldRethrowException().

406  {
408  std::exception_ptr exceptionPtr;
409  if(iEPtr) {
410  assert(*iEPtr);
411  TransitionIDValue<EventPrincipal> idValue(ep);
412  if(shouldRethrowException(*iEPtr, parentContext, true, idValue)) {
413  exceptionPtr = *iEPtr;
414  }
416  } else {
417  try {
418  runAcquire(ep, es, parentContext, holder);
420  } catch(...) {
421  exceptionPtr = std::current_exception();
422  }
423  }
424  // It is important this is after runAcquire completely finishes
425  holder.doneWaiting(exceptionPtr);
426  }
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
bool ranAcquireWithoutException_
Definition: Worker.h:626
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:615
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
Definition: Worker.cc:159
void runAcquire(EventPrincipal const &ep, EventSetup const &es, ParentContext const &parentContext, WaitingTaskWithArenaHolder &holder)
Definition: Worker.cc:381
template<typename T >
bool edm::Worker::runModule ( typename T::MyPrincipal const &  ep,
EventSetup const &  c,
StreamID  stream,
ParentContext const &  parentContext,
typename T::Context const *  context 
)
private

Definition at line 1117 of file Worker.h.

References edm::exceptionContext(), edm::EventSetup::get(), and edm::convertException::wrap().

1121  {
1122  //unscheduled producers should advance this
1123  //if (T::isEvent_) {
1124  // ++timesVisited_;
1125  //}
1126  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
1127  if (T::isEvent_) {
1128  timesRun_.fetch_add(1,std::memory_order_relaxed);
1129  }
1130 
1131  bool rc = true;
1132  try {
1134  {
1135  rc = workerhelper::CallImpl<T>::call(this,streamID,ep,es, actReg_.get(), &moduleCallingContext_, context);
1136 
1137  if (rc) {
1138  setPassed<T::isEvent_>();
1139  } else {
1140  setFailed<T::isEvent_>();
1141  }
1142  });
1143  } catch(cms::Exception& ex) {
1145  TransitionIDValue<typename T::MyPrincipal> idValue(ep);
1146  if(shouldRethrowException(std::current_exception(), parentContext, T::isEvent_, idValue)) {
1147  assert(not cached_exception_);
1148  setException<T::isEvent_>(std::current_exception());
1149  std::rethrow_exception(cached_exception_);
1150  } else {
1151  rc = setPassed<T::isEvent_>();
1152  }
1153  }
1154 
1155  return rc;
1156  }
static void exceptionContext(cms::Exception &ex, ModuleCallingContext const *mcc)
Definition: Worker.cc:101
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:620
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:615
std::exception_ptr cached_exception_
Definition: Worker.h:618
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
Definition: Worker.cc:159
auto wrap(F iFunc) -> decltype(iFunc())
std::atomic< int > timesRun_
Definition: Worker.h:606
template<typename T >
std::exception_ptr edm::Worker::runModuleAfterAsyncPrefetch ( std::exception_ptr const *  iEPtr,
typename T::MyPrincipal const &  ep,
EventSetup const &  es,
StreamID  streamID,
ParentContext const &  parentContext,
typename T::Context const *  context 
)
private

Definition at line 925 of file Worker.h.

References edm::ModuleCallingContext::kInvalid, and edm::ModuleCallingContext::setContext().

930  {
931  std::exception_ptr exceptionPtr;
932  if(iEPtr) {
933  assert(*iEPtr);
934  TransitionIDValue<typename T::MyPrincipal> idValue(ep);
935  if(shouldRethrowException(*iEPtr, parentContext, T::isEvent_, idValue)) {
936  exceptionPtr = *iEPtr;
937  setException<T::isEvent_>(exceptionPtr);
938  } else {
939  setPassed<T::isEvent_>();
940  }
942  } else {
943  try {
944  runModule<T>(ep,es,streamID,parentContext,context);
945  } catch(...) {
946  exceptionPtr = std::current_exception();
947  }
948  }
949  waitingTasks_.doneWaiting(exceptionPtr);
950  return exceptionPtr;
951  }
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:615
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
Definition: Worker.cc:159
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:624
template<typename T >
std::exception_ptr edm::Worker::runModuleDirectly ( typename T::MyPrincipal const &  ep,
EventSetup const &  es,
StreamID  streamID,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

Definition at line 1159 of file Worker.h.

Referenced by edm::Path::finished().

1163  {
1164 
1165  timesVisited_.fetch_add(1,std::memory_order_relaxed);
1166  std::exception_ptr const* prefetchingException = nullptr; // null because there was no prefetching to do
1167  return runModuleAfterAsyncPrefetch<T>(prefetchingException, ep, es, streamID, parentContext, context);
1168  }
std::atomic< int > timesVisited_
Definition: Worker.h:607
virtual TaskQueueAdaptor edm::Worker::serializeRunModule ( )
privatepure virtual
void edm::Worker::setActivityRegistry ( std::shared_ptr< ActivityRegistry areg)

The signals are required to live longer than the last call to 'doWork' this was done to improve performance based on profiling

Definition at line 96 of file Worker.cc.

References actReg_.

96  {
97  actReg_ = areg;
98  }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:620
void edm::Worker::setEarlyDeleteHelper ( EarlyDeleteHelper iHelper)

Definition at line 279 of file Worker.cc.

References earlyDeleteHelper_.

279  {
280  earlyDeleteHelper_=iHelper;
281  }
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:622
template<bool IS_EVENT>
std::exception_ptr edm::Worker::setException ( std::exception_ptr  iException)
inlineprivate

Definition at line 353 of file Worker.h.

References Exception.

353  {
354  if (IS_EVENT) {
355  timesExcept_.fetch_add(1,std::memory_order_relaxed);
356  }
357  cached_exception_ = iException; // propagate_const<T> has no reset() function
358  state_ = Exception;
359  return cached_exception_;
360  }
std::atomic< int > timesExcept_
Definition: Worker.h:610
std::exception_ptr cached_exception_
Definition: Worker.h:618
std::atomic< State > state_
Definition: Worker.h:611
template<bool IS_EVENT>
bool edm::Worker::setFailed ( )
inlineprivate

Definition at line 344 of file Worker.h.

References Fail.

344  {
345  if(IS_EVENT) {
346  timesFailed_.fetch_add(1,std::memory_order_relaxed);
347  }
348  state_ = Fail;
349  return false;
350  }
std::atomic< int > timesFailed_
Definition: Worker.h:609
std::atomic< State > state_
Definition: Worker.h:611
template<bool IS_EVENT>
bool edm::Worker::setPassed ( )
inlineprivate

Definition at line 335 of file Worker.h.

References Pass.

335  {
336  if(IS_EVENT) {
337  timesPassed_.fetch_add(1,std::memory_order_relaxed);
338  }
339  state_ = Pass;
340  return true;
341  }
std::atomic< State > state_
Definition: Worker.h:611
std::atomic< int > timesPassed_
Definition: Worker.h:608
bool edm::Worker::shouldRethrowException ( std::exception_ptr  iPtr,
ParentContext const &  parentContext,
bool  isEvent,
TransitionIDValueBase const &  iID 
) const
private

Definition at line 159 of file Worker.cc.

References mps_fire::action, actions_, cms::Exception::category(), description(), edm::exception_actions::FailPath, edm::ExceptionToActionTable::find(), edm::ModuleCallingContext::getTopModuleCallingContext(), edm::exception_actions::IgnoreCompletely, edm::PathContext::isEndPath(), edm::ModuleCallingContext::kInvalid, edm::ParentContext::kPlaceInPath, edm::PlaceInPathContext::pathContext(), edm::ModuleCallingContext::placeInPathContext(), edm::printCmsExceptionWarning(), edm::exception_actions::Rethrow, edm::exception_actions::SkipEvent, edm::ModuleCallingContext::type(), and edm::convertException::wrap().

Referenced by runAcquire(), and runAcquireAfterAsyncPrefetch().

162  {
163 
164  // NOTE: the warning printed as a result of ignoring or failing
165  // a module will only be printed during the full true processing
166  // pass of this module
167 
168  // Get the action corresponding to this exception. However, if processing
169  // something other than an event (e.g. run, lumi) always rethrow.
170  if(not isEvent) {
171  return true;
172  }
173  try {
174  convertException::wrap([&]() {
175  std::rethrow_exception(iPtr);
176  });
177  } catch(cms::Exception &ex) {
179 
180  if(action == exception_actions::Rethrow) {
181  return true;
182  }
183 
184  ModuleCallingContext tempContext(&description(),ModuleCallingContext::State::kInvalid, parentContext, nullptr);
185 
186  // If we are processing an endpath and the module was scheduled, treat SkipEvent or FailPath
187  // as IgnoreCompletely, so any subsequent OutputModules are still run.
188  // For unscheduled modules only treat FailPath as IgnoreCompletely but still allow SkipEvent to throw
189  ModuleCallingContext const* top_mcc = tempContext.getTopModuleCallingContext();
190  if(top_mcc->type() == ParentContext::Type::kPlaceInPath &&
191  top_mcc->placeInPathContext()->pathContext()->isEndPath()) {
192 
193  if ((action == exception_actions::SkipEvent && tempContext.type() == ParentContext::Type::kPlaceInPath) ||
194  action == exception_actions::FailPath) {
196  }
197  }
199  edm::printCmsExceptionWarning("IgnoreCompletely", ex);
200  return false;
201  }
202  }
203  return true;
204  }
ModuleDescription const & description() const
Definition: Worker.h:188
std::string const & category() const
Definition: Exception.cc:183
exception_actions::ActionCodes find(const std::string &category) const
ExceptionToActionTable const * actions_
Definition: Worker.h:617
auto wrap(F iFunc) -> decltype(iFunc())
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
void edm::Worker::skipOnPath ( )

Definition at line 369 of file Worker.cc.

References cached_exception_, edm::WaitingTaskList::doneWaiting(), numberOfPathsLeftToRun_, and waitingTasks_.

Referenced by edm::WorkerInPath::skipWorker().

369  {
370  if( 0 == --numberOfPathsLeftToRun_) {
372  }
373  }
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:613
std::exception_ptr cached_exception_
Definition: Worker.h:618
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:624
State edm::Worker::state ( ) const
inline

Definition at line 229 of file Worker.h.

Referenced by edm::WorkerInPath::checkResultsOfRunWorker().

229 { return state_; }
std::atomic< State > state_
Definition: Worker.h:611
int edm::Worker::timesExcept ( ) const
inline

Definition at line 228 of file Worker.h.

Referenced by edm::fillWorkerSummaryAux().

228 { return timesExcept_.load(std::memory_order_acquire); }
std::atomic< int > timesExcept_
Definition: Worker.h:610
int edm::Worker::timesFailed ( ) const
inline

Definition at line 227 of file Worker.h.

Referenced by edm::fillWorkerSummaryAux().

227 { return timesFailed_.load(std::memory_order_acquire); }
std::atomic< int > timesFailed_
Definition: Worker.h:609
int edm::Worker::timesPass ( ) const
inline

Definition at line 231 of file Worker.h.

231 { return timesPassed(); } // for backward compatibility only - to be removed soon
int timesPassed() const
Definition: Worker.h:226
int edm::Worker::timesPassed ( ) const
inline

Definition at line 226 of file Worker.h.

Referenced by edm::fillWorkerSummaryAux().

226 { return timesPassed_.load(std::memory_order_acquire); }
std::atomic< int > timesPassed_
Definition: Worker.h:608
int edm::Worker::timesRun ( ) const
inline

Definition at line 224 of file Worker.h.

Referenced by edm::fillWorkerSummaryAux().

224 { return timesRun_.load(std::memory_order_acquire); }
std::atomic< int > timesRun_
Definition: Worker.h:606
int edm::Worker::timesVisited ( ) const
inline

Definition at line 225 of file Worker.h.

Referenced by edm::fillWorkerSummaryAux().

225 { return timesVisited_.load(std::memory_order_acquire); }
std::atomic< int > timesVisited_
Definition: Worker.h:607
virtual void edm::Worker::updateLookup ( BranchType  iBranchType,
ProductResolverIndexHelper const &   
)
pure virtual

Implemented in edm::WorkerT< T >.

Referenced by edm::Schedule::changeModule().

virtual bool edm::Worker::wantsGlobalLuminosityBlocks ( ) const
pure virtual
virtual bool edm::Worker::wantsGlobalRuns ( ) const
pure virtual
virtual bool edm::Worker::wantsStreamLuminosityBlocks ( ) const
pure virtual
virtual bool edm::Worker::wantsStreamRuns ( ) const
pure virtual
virtual std::string edm::Worker::workerType ( ) const
protectedpure virtual

Implemented in edm::WorkerT< T >.

Friends And Related Function Documentation

template<typename O >
friend class workerhelper::CallImpl
friend

Definition at line 236 of file Worker.h.

Member Data Documentation

ExceptionToActionTable const* edm::Worker::actions_
private

Definition at line 617 of file Worker.h.

Referenced by shouldRethrowException().

std::shared_ptr<ActivityRegistry> edm::Worker::actReg_
private

Definition at line 620 of file Worker.h.

Referenced by beginJob(), beginStream(), endJob(), endStream(), prefetchAsync(), and setActivityRegistry().

std::exception_ptr edm::Worker::cached_exception_
private

Definition at line 618 of file Worker.h.

Referenced by skipOnPath().

edm::propagate_const<EarlyDeleteHelper*> edm::Worker::earlyDeleteHelper_
private

Definition at line 622 of file Worker.h.

Referenced by postDoEvent(), and setEarlyDeleteHelper().

ModuleCallingContext edm::Worker::moduleCallingContext_
private
std::atomic<int> edm::Worker::numberOfPathsLeftToRun_
private

Definition at line 613 of file Worker.h.

Referenced by skipOnPath().

int edm::Worker::numberOfPathsOn_
private

Definition at line 612 of file Worker.h.

bool edm::Worker::ranAcquireWithoutException_
private

Definition at line 626 of file Worker.h.

Referenced by handleExternalWorkException(), and runAcquireAfterAsyncPrefetch().

std::atomic<State> edm::Worker::state_
private

Definition at line 611 of file Worker.h.

Referenced by beginJob(), beginStream(), endJob(), and endStream().

std::atomic<int> edm::Worker::timesExcept_
private

Definition at line 610 of file Worker.h.

std::atomic<int> edm::Worker::timesFailed_
private

Definition at line 609 of file Worker.h.

std::atomic<int> edm::Worker::timesPassed_
private

Definition at line 608 of file Worker.h.

std::atomic<int> edm::Worker::timesRun_
private

Definition at line 606 of file Worker.h.

Referenced by prePrefetchSelectionAsync(), and runAcquire().

std::atomic<int> edm::Worker::timesVisited_
private

Definition at line 607 of file Worker.h.

edm::WaitingTaskList edm::Worker::waitingTasks_
private

Definition at line 624 of file Worker.h.

Referenced by prePrefetchSelectionAsync(), and skipOnPath().

std::atomic<bool> edm::Worker::workStarted_
private

Definition at line 625 of file Worker.h.