CMS 3D CMS Logo

List of all members | Classes | Public Types | Public Member Functions | Protected Member Functions | Private Member Functions | Static Private Member Functions | Private Attributes | Friends
edm::Worker Class Referenceabstract

#include <Worker.h>

Inheritance diagram for edm::Worker:
edm::WorkerT< T >

Classes

class  RunModuleTask
 
class  TransitionIDValue
 
class  TransitionIDValueBase
 

Public Types

enum  State { Ready, Pass, Fail, Exception }
 
enum  Types { kAnalyzer, kFilter, kProducer, kOutputModule }
 

Public Member Functions

void addedToPath ()
 
void beginJob ()
 
void beginStream (StreamID id, StreamContext &streamContext)
 
void callWhenDoneAsync (WaitingTask *task)
 
void clearCounters ()
 
virtual std::vector< ConsumesInfoconsumesInfo () const =0
 
ModuleDescription const * descPtr () const
 
ModuleDescription const & description () const
 
template<typename T >
bool doWork (typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
 
template<typename T >
void doWorkAsync (WaitingTask *task, typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
 
template<typename T >
void doWorkNoPrefetchingAsync (WaitingTask *task, typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
 
void endJob ()
 
void endStream (StreamID id, StreamContext &streamContext)
 
virtual void modulesWhoseProductsAreConsumed (std::vector< ModuleDescription const * > &modules, ProductRegistry const &preg, std::map< std::string, ModuleDescription const * > const &labelsToDesc) const =0
 
virtual Types moduleType () const =0
 
Workeroperator= (Worker const &)=delete
 
void postDoEvent (EventPrincipal const &)
 
void postForkReacquireResources (unsigned int iChildIndex, unsigned int iNumberOfChildren)
 
void preForkReleaseResources ()
 
void registerThinnedAssociations (ProductRegistry const &registry, ThinnedAssociationsHelper &helper)
 
void reset ()
 
virtual void resolvePutIndicies (BranchType iBranchType, std::unordered_multimap< std::string, edm::ProductResolverIndex > const &iIndicies)=0
 
void respondToCloseInputFile (FileBlock const &fb)
 
void respondToOpenInputFile (FileBlock const &fb)
 
void setActivityRegistry (std::shared_ptr< ActivityRegistry > areg)
 
void setEarlyDeleteHelper (EarlyDeleteHelper *iHelper)
 
void skipOnPath ()
 
State state () const
 
int timesExcept () const
 
int timesFailed () const
 
int timesPass () const
 
int timesPassed () const
 
int timesRun () const
 
int timesVisited () const
 
virtual void updateLookup (BranchType iBranchType, ProductResolverIndexHelper const &)=0
 
 Worker (ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
 
 Worker (Worker const &)=delete
 
virtual ~Worker ()
 

Protected Member Functions

ActivityRegistryactivityRegistry ()
 
virtual void implBeginJob ()=0
 
virtual void implBeginStream (StreamID)=0
 
virtual bool implDo (EventPrincipal const &, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoBegin (RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoBegin (LuminosityBlockPrincipal const &lbp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoEnd (RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoEnd (LuminosityBlockPrincipal const &lbp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoPrePrefetchSelection (StreamID id, EventPrincipal const &ep, ModuleCallingContext const *mcc)=0
 
virtual bool implDoStreamBegin (StreamID id, RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoStreamBegin (StreamID id, LuminosityBlockPrincipal const &lbp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoStreamEnd (StreamID id, RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoStreamEnd (StreamID id, LuminosityBlockPrincipal const &lbp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual void implEndJob ()=0
 
virtual void implEndStream (StreamID)=0
 
void resetModuleDescription (ModuleDescription const *)
 
virtual std::string workerType () const =0
 

Private Member Functions

void emitPostModuleEventPrefetchingSignal ()
 
virtual void implPostForkReacquireResources (unsigned int iChildIndex, unsigned int iNumberOfChildren)=0
 
virtual void implPreForkReleaseResources ()=0
 
virtual void implRegisterThinnedAssociations (ProductRegistry const &, ThinnedAssociationsHelper &)=0
 
virtual void implRespondToCloseInputFile (FileBlock const &fb)=0
 
virtual void implRespondToOpenInputFile (FileBlock const &fb)=0
 
virtual void itemsMayGet (BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
 
virtual std::vector< ProductResolverIndex > const & itemsShouldPutInEvent () const =0
 
virtual void itemsToGet (BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
 
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFromEvent () const =0
 
virtual void preActionBeforeRunEventAsync (WaitingTask *iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const =0
 
void prefetchAsync (WaitingTask *, ParentContext const &parentContext, Principal const &)
 
template<typename T >
bool runModule (typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
 
template<typename T >
void runModuleAfterAsyncPrefetch (std::exception_ptr const *iEPtr, typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
 
virtual SerialTaskQueueChainserializeRunModule ()=0
 
template<bool IS_EVENT>
std::exception_ptr setException (std::exception_ptr iException)
 
template<bool IS_EVENT>
bool setFailed ()
 
template<bool IS_EVENT>
bool setPassed ()
 
bool shouldRethrowException (std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
 

Static Private Member Functions

static void exceptionContext (cms::Exception &ex, ModuleCallingContext const *mcc)
 

Private Attributes

ExceptionToActionTable const * actions_
 
std::shared_ptr< ActivityRegistryactReg_
 
std::exception_ptr cached_exception_
 
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
 
ModuleCallingContext moduleCallingContext_
 
std::atomic< int > numberOfPathsLeftToRun_
 
int numberOfPathsOn_
 
std::atomic< Statestate_
 
std::atomic< int > timesExcept_
 
std::atomic< int > timesFailed_
 
std::atomic< int > timesPassed_
 
std::atomic< int > timesRun_
 
std::atomic< int > timesVisited_
 
edm::WaitingTaskList waitingTasks_
 
std::atomic< bool > workStarted_
 

Friends

template<typename O >
class workerhelper::CallImpl
 

Detailed Description

Definition at line 76 of file Worker.h.

Member Enumeration Documentation

Enumerator
Ready 
Pass 
Fail 
Exception 

Definition at line 78 of file Worker.h.

Enumerator
kAnalyzer 
kFilter 
kProducer 
kOutputModule 

Definition at line 79 of file Worker.h.

Constructor & Destructor Documentation

edm::Worker::Worker ( ModuleDescription const &  iMD,
ExceptionToActionTable const *  iActions 
)

Definition at line 72 of file Worker.cc.

73  :
74  timesRun_(0),
75  timesVisited_(0),
76  timesPassed_(0),
77  timesFailed_(0),
78  timesExcept_(0),
79  state_(Ready),
83  actions_(iActions),
85  actReg_(),
86  earlyDeleteHelper_(nullptr),
87  workStarted_(false)
88  {
89  }
std::atomic< int > timesVisited_
Definition: Worker.h:393
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:399
std::atomic< int > timesExcept_
Definition: Worker.h:396
std::atomic< bool > workStarted_
Definition: Worker.h:411
std::atomic< int > timesFailed_
Definition: Worker.h:395
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:406
ExceptionToActionTable const * actions_
Definition: Worker.h:403
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:401
std::exception_ptr cached_exception_
Definition: Worker.h:404
std::atomic< State > state_
Definition: Worker.h:397
int numberOfPathsOn_
Definition: Worker.h:398
std::atomic< int > timesPassed_
Definition: Worker.h:394
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:408
std::atomic< int > timesRun_
Definition: Worker.h:392
edm::Worker::~Worker ( )
virtual

Definition at line 91 of file Worker.cc.

91  {
92  }
edm::Worker::Worker ( Worker const &  )
delete

Member Function Documentation

ActivityRegistry* edm::Worker::activityRegistry ( )
inlineprotected

Definition at line 206 of file Worker.h.

References edm::exceptionContext(), and benchmark_cfg::fb.

Referenced by edm::WorkerT< T >::implDo().

206 { return actReg_.get(); }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:406
void edm::Worker::addedToPath ( )
inline

Definition at line 162 of file Worker.h.

Referenced by edm::WorkerInPath::WorkerInPath().

162  {
164  }
int numberOfPathsOn_
Definition: Worker.h:398
void edm::Worker::beginJob ( void  )

Definition at line 241 of file Worker.cc.

References actReg_, cms::Exception::addContext(), description(), Exception, implBeginJob(), edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), state_, and edm::convertException::wrap().

Referenced by edm::WorkerManager::beginJob(), and edm::GlobalSchedule::replaceModule().

241  {
242  try {
243  convertException::wrap([&]() {
244  ModuleBeginJobSignalSentry cpp(actReg_.get(), description());
245  implBeginJob();
246  });
247  }
248  catch(cms::Exception& ex) {
249  state_ = Exception;
250  std::ostringstream ost;
251  ost << "Calling beginJob for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
252  ex.addContext(ost.str());
253  throw;
254  }
255  }
ModuleDescription const & description() const
Definition: Worker.h:132
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:406
std::string const & moduleName() const
std::string const & moduleLabel() const
std::atomic< State > state_
Definition: Worker.h:397
virtual void implBeginJob()=0
void addContext(std::string const &context)
Definition: Exception.cc:227
auto wrap(F iFunc) -> decltype(iFunc())
void edm::Worker::beginStream ( StreamID  id,
StreamContext streamContext 
)

Definition at line 273 of file Worker.cc.

References actReg_, cms::Exception::addContext(), description(), Exception, implBeginStream(), edm::LuminosityBlockIndex::invalidLuminosityBlockIndex(), edm::RunIndex::invalidRunIndex(), edm::StreamContext::kBeginStream, edm::ModuleCallingContext::kRunning, moduleCallingContext_, edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), edm::StreamContext::setEventID(), edm::StreamContext::setLuminosityBlockIndex(), edm::StreamContext::setRunIndex(), edm::ModuleCallingContext::setState(), edm::StreamContext::setTimestamp(), edm::StreamContext::setTransition(), state_, and edm::convertException::wrap().

Referenced by edm::StreamSchedule::replaceModule().

273  {
274  try {
275  convertException::wrap([&]() {
276  streamContext.setTransition(StreamContext::Transition::kBeginStream);
277  streamContext.setEventID(EventID(0, 0, 0));
278  streamContext.setRunIndex(RunIndex::invalidRunIndex());
279  streamContext.setLuminosityBlockIndex(LuminosityBlockIndex::invalidLuminosityBlockIndex());
280  streamContext.setTimestamp(Timestamp());
281  ParentContext parentContext(&streamContext);
282  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
284  ModuleBeginStreamSignalSentry beginSentry(actReg_.get(), streamContext, moduleCallingContext_);
285  implBeginStream(id);
286  });
287  }
288  catch(cms::Exception& ex) {
289  state_ = Exception;
290  std::ostringstream ost;
291  ost << "Calling beginStream for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
292  ex.addContext(ost.str());
293  throw;
294  }
295  }
ModuleDescription const & description() const
Definition: Worker.h:132
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:406
std::string const & moduleName() const
std::string const & moduleLabel() const
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:401
static RunIndex invalidRunIndex()
Definition: RunIndex.cc:9
std::atomic< State > state_
Definition: Worker.h:397
static LuminosityBlockIndex invalidLuminosityBlockIndex()
void addContext(std::string const &context)
Definition: Exception.cc:227
virtual void implBeginStream(StreamID)=0
auto wrap(F iFunc) -> decltype(iFunc())
void edm::Worker::callWhenDoneAsync ( WaitingTask task)
inline

Definition at line 107 of file Worker.h.

References bk::beginJob().

107  {
108  waitingTasks_.add(task);
109  }
void add(WaitingTask *)
Adds task to the waiting list.
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:410
void edm::Worker::clearCounters ( )
inline

Definition at line 154 of file Worker.h.

Referenced by edm::StreamSchedule::clearCounters().

154  {
155  timesRun_.store(0,std::memory_order_release);
156  timesVisited_.store(0,std::memory_order_release);
157  timesPassed_.store(0,std::memory_order_release);
158  timesFailed_.store(0,std::memory_order_release);
159  timesExcept_.store(0,std::memory_order_release);
160  }
std::atomic< int > timesVisited_
Definition: Worker.h:393
std::atomic< int > timesExcept_
Definition: Worker.h:396
std::atomic< int > timesFailed_
Definition: Worker.h:395
std::atomic< int > timesPassed_
Definition: Worker.h:394
std::atomic< int > timesRun_
Definition: Worker.h:392
virtual std::vector<ConsumesInfo> edm::Worker::consumesInfo ( ) const
pure virtual
ModuleDescription const* edm::Worker::descPtr ( ) const
inline

Definition at line 133 of file Worker.h.

ModuleCallingContext moduleCallingContext_
Definition: Worker.h:401
ModuleDescription const * moduleDescription() const
ModuleDescription const& edm::Worker::description ( ) const
inline
template<typename T >
bool edm::Worker::doWork ( typename T::MyPrincipal const &  ep,
EventSetup const &  c,
StreamID  stream,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

Definition at line 709 of file Worker.h.

References Fail, edm::ServiceRegistry::instance(), edm::ModuleCallingContext::kInvalid, edm::ModuleCallingContext::kPrefetching, edm::make_empty_waiting_task(), Pass, edm::ServiceRegistry::presentToken(), mps_fire::queue, Ready, and edm::ModuleCallingContext::setContext().

Referenced by edm::WorkerInPath::runWorker().

713  {
714 
715  if (T::isEvent_) {
716  timesVisited_.fetch_add(1,std::memory_order_relaxed);
717  }
718  bool rc = false;
719 
720  switch(state_) {
721  case Ready: break;
722  case Pass: return true;
723  case Fail: return false;
724  case Exception: {
725  std::rethrow_exception(cached_exception_);
726  }
727  }
728 
729  bool expected = false;
730  if(not workStarted_.compare_exchange_strong(expected, true) ) {
731  //another thread beat us here
732  auto waitTask = edm::make_empty_waiting_task();
733  waitTask->increment_ref_count();
734 
735  waitingTasks_.add(waitTask.get());
736 
737  waitTask->wait_for_all();
738 
739  switch(state_) {
740  case Ready: assert(false);
741  case Pass: return true;
742  case Fail: return false;
743  case Exception: {
744  std::rethrow_exception(cached_exception_);
745  }
746  }
747  }
748 
749  //Need the context to be set until after any exception is resolved
751 
752  auto resetContext = [](ModuleCallingContext* iContext) {iContext->setContext(ModuleCallingContext::State::kInvalid,ParentContext(),nullptr); };
753  std::unique_ptr<ModuleCallingContext, decltype(resetContext)> prefetchSentry(&moduleCallingContext_,resetContext);
754 
755  if (T::isEvent_) {
756  try {
757  //if have TriggerResults based selection we want to reject the event before doing prefetching
758  if( not workerhelper::CallImpl<T>::prePrefetchSelection(this,streamID,ep,&moduleCallingContext_) ) {
759  timesRun_.fetch_add(1,std::memory_order_relaxed);
760  rc = setPassed<T::isEvent_>();
761  waitingTasks_.doneWaiting(nullptr);
762  return true;
763  }
764  }catch(...) {}
765  auto waitTask = edm::make_empty_waiting_task();
766  {
767  //Make sure signal is sent once the prefetching is done
768  // [the 'pre' signal was sent in prefetchAsync]
769  //The purpose of this block is to send the signal after wait_for_all
770  auto sentryFunc = [this](void*) {
772  };
773  std::unique_ptr<ActivityRegistry, decltype(sentryFunc)> signalSentry(actReg_.get(),sentryFunc);
774 
775  //set count to 2 since wait_for_all requires value to not go to 0
776  waitTask->set_ref_count(2);
777 
778  prefetchAsync(waitTask.get(),parentContext, ep);
779  waitTask->decrement_ref_count();
780  waitTask->wait_for_all();
781  }
782  if(waitTask->exceptionPtr() != nullptr) {
783  TransitionIDValue<typename T::MyPrincipal> idValue(ep);
784  if(shouldRethrowException(*waitTask->exceptionPtr(), parentContext, T::isEvent_, idValue)) {
785  setException<T::isEvent_>(*waitTask->exceptionPtr());
787  std::rethrow_exception(cached_exception_);
788  } else {
789  setPassed<T::isEvent_>();
790  waitingTasks_.doneWaiting(nullptr);
791  return true;
792  }
793  }
794  }
795 
796  //successful prefetch so no reset necessary
797  prefetchSentry.release();
798  if(auto queue = serializeRunModule()) {
799  auto serviceToken = ServiceRegistry::instance().presentToken();
800  queue->pushAndWait([&]() {
801  //Need to make the services available
802  ServiceRegistry::Operate guard(serviceToken);
803  try {
804  rc = runModule<T>(ep,es,streamID,parentContext,context);
805  } catch(...) {
806  }
807  });
808  } else {
809  try {
810  rc = runModule<T>(ep,es,streamID,parentContext,context);
811  } catch(...) {
812  }
813  }
814  if(state_ == Exception) {
816  std::rethrow_exception(cached_exception_);
817  }
818 
819  waitingTasks_.doneWaiting(nullptr);
820  return rc;
821  }
std::atomic< int > timesVisited_
Definition: Worker.h:393
std::atomic< bool > workStarted_
Definition: Worker.h:411
void add(WaitingTask *)
Adds task to the waiting list.
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:406
virtual SerialTaskQueueChain * serializeRunModule()=0
ServiceToken presentToken() const
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:401
std::exception_ptr cached_exception_
Definition: Worker.h:404
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
Definition: Worker.cc:157
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:410
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
static ServiceRegistry & instance()
std::atomic< State > state_
Definition: Worker.h:397
void prefetchAsync(WaitingTask *, ParentContext const &parentContext, Principal const &)
Definition: Worker.cc:205
void emitPostModuleEventPrefetchingSignal()
Definition: Worker.h:300
std::atomic< int > timesRun_
Definition: Worker.h:392
template<typename T >
void edm::Worker::doWorkAsync ( WaitingTask task,
typename T::MyPrincipal const &  ep,
EventSetup const &  c,
StreamID  stream,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

Definition at line 612 of file Worker.h.

References edm::ModuleCallingContext::kPrefetching, and edm::ModuleCallingContext::setContext().

Referenced by edm::WorkerInPath::runWorkerAsync().

617  {
618  waitingTasks_.add(task);
619  if(T::isEvent_) {
620  timesVisited_.fetch_add(1,std::memory_order_relaxed);
621  }
622 
623  bool expected = false;
624  if(workStarted_.compare_exchange_strong(expected,true)) {
626 
627  //if have TriggerResults based selection we want to reject the event before doing prefetching
628  try {
629  if( not workerhelper::CallImpl<T>::prePrefetchSelection(this,streamID,ep,&moduleCallingContext_) ) {
630  setPassed<T::isEvent_>();
631  waitingTasks_.doneWaiting(nullptr);
632  return;
633  }
634  }catch(...) {}
635 
636  auto runTask = new (tbb::task::allocate_root()) RunModuleTask<T>(
637  this, ep,es,streamID,parentContext,context);
638  prefetchAsync(runTask, parentContext, ep);
639  }
640  }
std::atomic< int > timesVisited_
Definition: Worker.h:393
std::atomic< bool > workStarted_
Definition: Worker.h:411
void add(WaitingTask *)
Adds task to the waiting list.
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:401
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:410
void prefetchAsync(WaitingTask *, ParentContext const &parentContext, Principal const &)
Definition: Worker.cc:205
template<typename T >
void edm::Worker::doWorkNoPrefetchingAsync ( WaitingTask task,
typename T::MyPrincipal const &  principal,
EventSetup const &  c,
StreamID  stream,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

Definition at line 671 of file Worker.h.

References edm::ServiceRegistry::instance(), edm::make_functor_task(), edm::ServiceRegistry::presentToken(), cmsRelvalreport::principal(), and mps_fire::queue.

Referenced by edm::WorkerInPath::runWorkerAsync().

676  {
677  waitingTasks_.add(task);
678  bool expected = false;
679  if(workStarted_.compare_exchange_strong(expected,true)) {
680  auto serviceToken = ServiceRegistry::instance().presentToken();
681 
682  auto toDo =[this, &principal, &es, streamID,parentContext,context, serviceToken]()
683  {
684  std::exception_ptr exceptionPtr;
685  try {
686  //Need to make the services available
687  ServiceRegistry::Operate guard(serviceToken);
688 
689  this->runModule<T>(principal,
690  es,
691  streamID,
692  parentContext,
693  context);
694  } catch( ... ) {
695  exceptionPtr = std::current_exception();
696  }
697  this->waitingTasks_.doneWaiting(exceptionPtr);
698  };
699  if(auto queue = this->serializeRunModule()) {
700  queue->push( toDo);
701  } else {
702  auto task = make_functor_task( tbb::task::allocate_root(), toDo);
703  tbb::task::spawn(*task);
704  }
705  }
706  }
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
std::atomic< bool > workStarted_
Definition: Worker.h:411
void add(WaitingTask *)
Adds task to the waiting list.
virtual SerialTaskQueueChain * serializeRunModule()=0
ServiceToken presentToken() const
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
def principal(options)
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:410
static ServiceRegistry & instance()
void edm::Worker::emitPostModuleEventPrefetchingSignal ( )
inlineprivate

Definition at line 300 of file Worker.h.

300  {
301  actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),moduleCallingContext_);
302  }
StreamContext const * getStreamContext() const
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:406
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:401
void edm::Worker::endJob ( void  )

Definition at line 257 of file Worker.cc.

References actReg_, cms::Exception::addContext(), description(), Exception, implEndJob(), edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), state_, and edm::convertException::wrap().

257  {
258  try {
259  convertException::wrap([&]() {
260  ModuleEndJobSignalSentry cpp(actReg_.get(), description());
261  implEndJob();
262  });
263  }
264  catch(cms::Exception& ex) {
265  state_ = Exception;
266  std::ostringstream ost;
267  ost << "Calling endJob for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
268  ex.addContext(ost.str());
269  throw;
270  }
271  }
ModuleDescription const & description() const
Definition: Worker.h:132
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:406
std::string const & moduleName() const
std::string const & moduleLabel() const
std::atomic< State > state_
Definition: Worker.h:397
virtual void implEndJob()=0
void addContext(std::string const &context)
Definition: Exception.cc:227
auto wrap(F iFunc) -> decltype(iFunc())
void edm::Worker::endStream ( StreamID  id,
StreamContext streamContext 
)

Definition at line 297 of file Worker.cc.

References actReg_, cms::Exception::addContext(), description(), Exception, implEndStream(), edm::LuminosityBlockIndex::invalidLuminosityBlockIndex(), edm::RunIndex::invalidRunIndex(), edm::StreamContext::kEndStream, edm::ModuleCallingContext::kRunning, moduleCallingContext_, edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), edm::StreamContext::setEventID(), edm::StreamContext::setLuminosityBlockIndex(), edm::StreamContext::setRunIndex(), edm::ModuleCallingContext::setState(), edm::StreamContext::setTimestamp(), edm::StreamContext::setTransition(), state_, and edm::convertException::wrap().

297  {
298  try {
299  convertException::wrap([&]() {
300  streamContext.setTransition(StreamContext::Transition::kEndStream);
301  streamContext.setEventID(EventID(0, 0, 0));
302  streamContext.setRunIndex(RunIndex::invalidRunIndex());
303  streamContext.setLuminosityBlockIndex(LuminosityBlockIndex::invalidLuminosityBlockIndex());
304  streamContext.setTimestamp(Timestamp());
305  ParentContext parentContext(&streamContext);
306  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
308  ModuleEndStreamSignalSentry endSentry(actReg_.get(), streamContext, moduleCallingContext_);
309  implEndStream(id);
310  });
311  }
312  catch(cms::Exception& ex) {
313  state_ = Exception;
314  std::ostringstream ost;
315  ost << "Calling endStream for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
316  ex.addContext(ost.str());
317  throw;
318  }
319  }
ModuleDescription const & description() const
Definition: Worker.h:132
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:406
std::string const & moduleName() const
std::string const & moduleLabel() const
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:401
static RunIndex invalidRunIndex()
Definition: RunIndex.cc:9
std::atomic< State > state_
Definition: Worker.h:397
static LuminosityBlockIndex invalidLuminosityBlockIndex()
void addContext(std::string const &context)
Definition: Exception.cc:227
virtual void implEndStream(StreamID)=0
auto wrap(F iFunc) -> decltype(iFunc())
void edm::Worker::exceptionContext ( cms::Exception ex,
ModuleCallingContext const *  mcc 
)
staticprivate

Definition at line 99 of file Worker.cc.

References cms::Exception::addContext(), edm::exceptionContext(), edm::ModuleCallingContext::globalContext(), edm::ModuleCallingContext::internalContext(), edm::ParentContext::kGlobal, edm::ParentContext::kInternal, edm::ParentContext::kModule, edm::ParentContext::kPlaceInPath, edm::ModuleCallingContext::kPrefetching, edm::ParentContext::kStream, edm::InternalContext::moduleCallingContext(), edm::ModuleCallingContext::moduleCallingContext(), edm::ModuleCallingContext::moduleDescription(), edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), or, edm::PlaceInPathContext::pathContext(), edm::PathContext::pathName(), edm::ModuleCallingContext::placeInPathContext(), edm::ModuleCallingContext::state(), edm::PathContext::streamContext(), edm::ModuleCallingContext::streamContext(), and edm::ModuleCallingContext::type().

101  {
102 
103  ModuleCallingContext const* imcc = mcc;
104  while( (imcc->type() == ParentContext::Type::kModule) or
105  (imcc->type() == ParentContext::Type::kInternal) ) {
106  std::ostringstream iost;
107  if( imcc->state() == ModuleCallingContext::State::kPrefetching ) {
108  iost << "Prefetching for module ";
109  } else {
110  iost << "Calling method for module ";
111  }
112  iost << imcc->moduleDescription()->moduleName() << "/'"
113  << imcc->moduleDescription()->moduleLabel() << "'";
114 
115  if(imcc->type() == ParentContext::Type::kInternal) {
116  iost << " (probably inside some kind of mixing module)";
117  imcc = imcc->internalContext()->moduleCallingContext();
118  } else {
119  imcc = imcc->moduleCallingContext();
120  }
121  ex.addContext(iost.str());
122  }
123  std::ostringstream ost;
124  if( imcc->state() == ModuleCallingContext::State::kPrefetching ) {
125  ost << "Prefetching for module ";
126  } else {
127  ost << "Calling method for module ";
128  }
129  ost << imcc->moduleDescription()->moduleName() << "/'"
130  << imcc->moduleDescription()->moduleLabel() << "'";
131  ex.addContext(ost.str());
132 
133  if (imcc->type() == ParentContext::Type::kPlaceInPath) {
134  ost.str("");
135  ost << "Running path '";
136  ost << imcc->placeInPathContext()->pathContext()->pathName() << "'";
137  ex.addContext(ost.str());
138  auto streamContext =imcc->placeInPathContext()->pathContext()->streamContext();
139  if(streamContext) {
140  ost.str("");
141  edm::exceptionContext(ost,*streamContext);
142  ex.addContext(ost.str());
143  }
144  } else {
145  if (imcc->type() == ParentContext::Type::kStream) {
146  ost.str("");
147  edm::exceptionContext(ost, *(imcc->streamContext()) );
148  ex.addContext(ost.str());
149  } else if(imcc->type() == ParentContext::Type::kGlobal) {
150  ost.str("");
151  edm::exceptionContext(ost, *(imcc->globalContext()) );
152  ex.addContext(ost.str());
153  }
154  }
155  }
void exceptionContext(std::ostream &, GlobalContext const &)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
void addContext(std::string const &context)
Definition: Exception.cc:227
virtual void edm::Worker::implBeginJob ( )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by beginJob().

virtual void edm::Worker::implBeginStream ( StreamID  )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by beginStream().

virtual bool edm::Worker::implDo ( EventPrincipal const &  ,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual
virtual bool edm::Worker::implDoBegin ( RunPrincipal const &  rp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual
virtual bool edm::Worker::implDoBegin ( LuminosityBlockPrincipal const &  lbp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

virtual bool edm::Worker::implDoEnd ( RunPrincipal const &  rp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual
virtual bool edm::Worker::implDoEnd ( LuminosityBlockPrincipal const &  lbp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

virtual bool edm::Worker::implDoPrePrefetchSelection ( StreamID  id,
EventPrincipal const &  ep,
ModuleCallingContext const *  mcc 
)
protectedpure virtual
virtual bool edm::Worker::implDoStreamBegin ( StreamID  id,
RunPrincipal const &  rp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual
virtual bool edm::Worker::implDoStreamBegin ( StreamID  id,
LuminosityBlockPrincipal const &  lbp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

virtual bool edm::Worker::implDoStreamEnd ( StreamID  id,
RunPrincipal const &  rp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual
virtual bool edm::Worker::implDoStreamEnd ( StreamID  id,
LuminosityBlockPrincipal const &  lbp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::implEndJob ( )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by endJob().

virtual void edm::Worker::implEndStream ( StreamID  )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by endStream().

virtual void edm::Worker::implPostForkReacquireResources ( unsigned int  iChildIndex,
unsigned int  iNumberOfChildren 
)
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::implPreForkReleaseResources ( )
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::implRegisterThinnedAssociations ( ProductRegistry const &  ,
ThinnedAssociationsHelper  
)
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::implRespondToCloseInputFile ( FileBlock const &  fb)
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::implRespondToOpenInputFile ( FileBlock const &  fb)
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::itemsMayGet ( BranchType  ,
std::vector< ProductResolverIndexAndSkipBit > &   
) const
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual std::vector<ProductResolverIndex> const& edm::Worker::itemsShouldPutInEvent ( ) const
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::itemsToGet ( BranchType  ,
std::vector< ProductResolverIndexAndSkipBit > &   
) const
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual std::vector<ProductResolverIndexAndSkipBit> const& edm::Worker::itemsToGetFromEvent ( ) const
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by prefetchAsync().

virtual void edm::Worker::modulesWhoseProductsAreConsumed ( std::vector< ModuleDescription const * > &  modules,
ProductRegistry const &  preg,
std::map< std::string, ModuleDescription const * > const &  labelsToDesc 
) const
pure virtual

Implemented in edm::WorkerT< T >.

virtual Types edm::Worker::moduleType ( ) const
pure virtual
Worker& edm::Worker::operator= ( Worker const &  )
delete
void edm::Worker::postDoEvent ( EventPrincipal const &  iEvent)

Definition at line 327 of file Worker.cc.

References earlyDeleteHelper_.

Referenced by edm::WorkerT< T >::implDo().

327  {
328  if(earlyDeleteHelper_) {
329  earlyDeleteHelper_->moduleRan(iEvent);
330  }
331  }
int iEvent
Definition: GenABIO.cc:230
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:408
void edm::Worker::postForkReacquireResources ( unsigned int  iChildIndex,
unsigned int  iNumberOfChildren 
)
inline

Definition at line 119 of file Worker.h.

Referenced by edm::Schedule::postForkReacquireResources().

119 {implPostForkReacquireResources(iChildIndex, iNumberOfChildren);}
virtual void implPostForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren)=0
virtual void edm::Worker::preActionBeforeRunEventAsync ( WaitingTask iTask,
ModuleCallingContext const &  moduleCallingContext,
Principal const &  iPrincipal 
) const
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by prefetchAsync().

void edm::Worker::prefetchAsync ( WaitingTask iTask,
ParentContext const &  parentContext,
Principal const &  iPrincipal 
)
private

Definition at line 205 of file Worker.cc.

References actReg_, edm::ModuleCallingContext::getStreamContext(), mps_monitormerge::items, itemsToGetFromEvent(), edm::ModuleCallingContext::kPrefetching, moduleCallingContext_, preActionBeforeRunEventAsync(), edm::Principal::prefetchAsync(), edm::ProductResolverIndexAmbiguous, and edm::ModuleCallingContext::setContext().

205  {
206  // Prefetch products the module declares it consumes (not including the products it maybe consumes)
207  std::vector<ProductResolverIndexAndSkipBit> const& items = itemsToGetFromEvent();
208 
210 
211  actReg_->preModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),moduleCallingContext_);
212 
213  //Need to be sure the ref count isn't set to 0 immediately
214  iTask->increment_ref_count();
215  for(auto const& item : items) {
216  ProductResolverIndex productResolverIndex = item.productResolverIndex();
217  bool skipCurrentProcess = item.skipCurrentProcess();
218  if(productResolverIndex != ProductResolverIndexAmbiguous) {
219  iPrincipal.prefetchAsync(iTask,productResolverIndex, skipCurrentProcess, &moduleCallingContext_);
220  }
221  }
222 
224 
225  if(0 == iTask->decrement_ref_count()) {
226  //if everything finishes before we leave this routine, we need to launch the task
227  tbb::task::spawn(*iTask);
228  }
229  }
unsigned int ProductResolverIndex
StreamContext const * getStreamContext() const
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFromEvent() const =0
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:406
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:401
virtual void preActionBeforeRunEventAsync(WaitingTask *iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const =0
void edm::Worker::preForkReleaseResources ( )
inline

Definition at line 118 of file Worker.h.

Referenced by edm::Schedule::preForkReleaseResources().

virtual void implPreForkReleaseResources()=0
void edm::Worker::registerThinnedAssociations ( ProductRegistry const &  registry,
ThinnedAssociationsHelper helper 
)
inline

Definition at line 120 of file Worker.h.

Definition: helper.py:1
virtual void implRegisterThinnedAssociations(ProductRegistry const &, ThinnedAssociationsHelper &)=0
static Interceptor::Registry registry("Interceptor")
void edm::Worker::reset ( void  )
inline

Definition at line 122 of file Worker.h.

References Ready.

Referenced by edm::WorkerManager::resetAll().

122  {
123  cached_exception_ = std::exception_ptr();
124  state_ = Ready;
126  workStarted_ = false;
128  }
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:399
std::atomic< bool > workStarted_
Definition: Worker.h:411
void reset()
Resets access to the resource so that added tasks will wait.
std::exception_ptr cached_exception_
Definition: Worker.h:404
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:410
std::atomic< State > state_
Definition: Worker.h:397
int numberOfPathsOn_
Definition: Worker.h:398
void edm::Worker::resetModuleDescription ( ModuleDescription const *  iDesc)
protected
virtual void edm::Worker::resolvePutIndicies ( BranchType  iBranchType,
std::unordered_multimap< std::string, edm::ProductResolverIndex > const &  iIndicies 
)
pure virtual

Implemented in edm::WorkerT< T >.

void edm::Worker::respondToCloseInputFile ( FileBlock const &  fb)
inline

Definition at line 116 of file Worker.h.

Referenced by edm::Schedule::respondToCloseInputFile().

virtual void implRespondToCloseInputFile(FileBlock const &fb)=0
void edm::Worker::respondToOpenInputFile ( FileBlock const &  fb)
inline

Definition at line 115 of file Worker.h.

Referenced by edm::Schedule::respondToOpenInputFile().

virtual void implRespondToOpenInputFile(FileBlock const &fb)=0
template<typename T >
bool edm::Worker::runModule ( typename T::MyPrincipal const &  ep,
EventSetup const &  c,
StreamID  stream,
ParentContext const &  parentContext,
typename T::Context const *  context 
)
private

Definition at line 825 of file Worker.h.

References edm::exceptionContext(), edm::EventSetup::get(), and edm::convertException::wrap().

829  {
830  //unscheduled producers should advance this
831  //if (T::isEvent_) {
832  // ++timesVisited_;
833  //}
834  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
835  if (T::isEvent_) {
836  timesRun_.fetch_add(1,std::memory_order_relaxed);
837  }
838 
839  bool rc = true;
840  try {
842  {
843  rc = workerhelper::CallImpl<T>::call(this,streamID,ep,es, actReg_.get(), &moduleCallingContext_, context);
844 
845  if (rc) {
846  setPassed<T::isEvent_>();
847  } else {
848  setFailed<T::isEvent_>();
849  }
850  });
851  } catch(cms::Exception& ex) {
853  TransitionIDValue<typename T::MyPrincipal> idValue(ep);
854  if(shouldRethrowException(std::current_exception(), parentContext, T::isEvent_, idValue)) {
855  assert(not cached_exception_);
856  setException<T::isEvent_>(std::current_exception());
857  std::rethrow_exception(cached_exception_);
858  } else {
859  rc = setPassed<T::isEvent_>();
860  }
861  }
862 
863  return rc;
864  }
static void exceptionContext(cms::Exception &ex, ModuleCallingContext const *mcc)
Definition: Worker.cc:99
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:406
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:401
std::exception_ptr cached_exception_
Definition: Worker.h:404
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
Definition: Worker.cc:157
auto wrap(F iFunc) -> decltype(iFunc())
std::atomic< int > timesRun_
Definition: Worker.h:392
template<typename T >
void edm::Worker::runModuleAfterAsyncPrefetch ( std::exception_ptr const *  iEPtr,
typename T::MyPrincipal const &  ep,
EventSetup const &  es,
StreamID  streamID,
ParentContext const &  parentContext,
typename T::Context const *  context 
)
private

Definition at line 643 of file Worker.h.

References edm::ModuleCallingContext::kInvalid, and edm::ModuleCallingContext::setContext().

Referenced by edm::Worker::RunModuleTask< T >::execute().

648  {
649  std::exception_ptr exceptionPtr;
650  if(iEPtr) {
651  assert(*iEPtr);
652  TransitionIDValue<typename T::MyPrincipal> idValue(ep);
653  if(shouldRethrowException(*iEPtr, parentContext, T::isEvent_, idValue)) {
654  exceptionPtr = *iEPtr;
655  setException<T::isEvent_>(exceptionPtr);
656  } else {
657  setPassed<T::isEvent_>();
658  }
660  } else {
661  try {
662  runModule<T>(ep,es,streamID,parentContext,context);
663  } catch(...) {
664  exceptionPtr = std::current_exception();
665  }
666  }
667  waitingTasks_.doneWaiting(exceptionPtr);
668  }
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:401
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
Definition: Worker.cc:157
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:410
virtual SerialTaskQueueChain* edm::Worker::serializeRunModule ( )
privatepure virtual
void edm::Worker::setActivityRegistry ( std::shared_ptr< ActivityRegistry areg)

The signals are required to live longer than the last call to 'doWork' this was done to improve performance based on profiling

Definition at line 94 of file Worker.cc.

References actReg_.

94  {
95  actReg_ = areg;
96  }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:406
void edm::Worker::setEarlyDeleteHelper ( EarlyDeleteHelper iHelper)

Definition at line 231 of file Worker.cc.

References earlyDeleteHelper_.

231  {
232  earlyDeleteHelper_=iHelper;
233  }
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:408
template<bool IS_EVENT>
std::exception_ptr edm::Worker::setException ( std::exception_ptr  iException)
inlineprivate

Definition at line 287 of file Worker.h.

References Exception.

287  {
288  if (IS_EVENT) {
289  timesExcept_.fetch_add(1,std::memory_order_relaxed);
290  }
291  cached_exception_ = iException; // propagate_const<T> has no reset() function
292  state_ = Exception;
293  return cached_exception_;
294  }
std::atomic< int > timesExcept_
Definition: Worker.h:396
std::exception_ptr cached_exception_
Definition: Worker.h:404
std::atomic< State > state_
Definition: Worker.h:397
template<bool IS_EVENT>
bool edm::Worker::setFailed ( )
inlineprivate

Definition at line 278 of file Worker.h.

References Fail.

278  {
279  if(IS_EVENT) {
280  timesFailed_.fetch_add(1,std::memory_order_relaxed);
281  }
282  state_ = Fail;
283  return false;
284  }
std::atomic< int > timesFailed_
Definition: Worker.h:395
std::atomic< State > state_
Definition: Worker.h:397
template<bool IS_EVENT>
bool edm::Worker::setPassed ( )
inlineprivate

Definition at line 269 of file Worker.h.

References Pass.

269  {
270  if(IS_EVENT) {
271  timesPassed_.fetch_add(1,std::memory_order_relaxed);
272  }
273  state_ = Pass;
274  return true;
275  }
std::atomic< State > state_
Definition: Worker.h:397
std::atomic< int > timesPassed_
Definition: Worker.h:394
bool edm::Worker::shouldRethrowException ( std::exception_ptr  iPtr,
ParentContext const &  parentContext,
bool  isEvent,
TransitionIDValueBase const &  iID 
) const
private

Definition at line 157 of file Worker.cc.

References mps_alisetup::action, actions_, cms::Exception::category(), description(), edm::exception_actions::FailPath, edm::ExceptionToActionTable::find(), edm::ModuleCallingContext::getTopModuleCallingContext(), edm::exception_actions::IgnoreCompletely, edm::PathContext::isEndPath(), edm::ModuleCallingContext::kInvalid, edm::ParentContext::kPlaceInPath, edm::PlaceInPathContext::pathContext(), edm::ModuleCallingContext::placeInPathContext(), edm::printCmsExceptionWarning(), edm::exception_actions::Rethrow, edm::exception_actions::SkipEvent, edm::ModuleCallingContext::type(), and edm::convertException::wrap().

160  {
161 
162  // NOTE: the warning printed as a result of ignoring or failing
163  // a module will only be printed during the full true processing
164  // pass of this module
165 
166  // Get the action corresponding to this exception. However, if processing
167  // something other than an event (e.g. run, lumi) always rethrow.
168  if(not isEvent) {
169  return true;
170  }
171  try {
172  convertException::wrap([&]() {
173  std::rethrow_exception(iPtr);
174  });
175  } catch(cms::Exception &ex) {
177 
178  if(action == exception_actions::Rethrow) {
179  return true;
180  }
181 
182  ModuleCallingContext tempContext(&description(),ModuleCallingContext::State::kInvalid, parentContext, nullptr);
183 
184  // If we are processing an endpath and the module was scheduled, treat SkipEvent or FailPath
185  // as IgnoreCompletely, so any subsequent OutputModules are still run.
186  // For unscheduled modules only treat FailPath as IgnoreCompletely but still allow SkipEvent to throw
187  ModuleCallingContext const* top_mcc = tempContext.getTopModuleCallingContext();
188  if(top_mcc->type() == ParentContext::Type::kPlaceInPath &&
189  top_mcc->placeInPathContext()->pathContext()->isEndPath()) {
190 
191  if ((action == exception_actions::SkipEvent && tempContext.type() == ParentContext::Type::kPlaceInPath) ||
192  action == exception_actions::FailPath) {
194  }
195  }
197  edm::printCmsExceptionWarning("IgnoreCompletely", ex);
198  return false;
199  }
200  }
201  return true;
202  }
ModuleDescription const & description() const
Definition: Worker.h:132
std::string const & category() const
Definition: Exception.cc:183
exception_actions::ActionCodes find(const std::string &category) const
ExceptionToActionTable const * actions_
Definition: Worker.h:403
auto wrap(F iFunc) -> decltype(iFunc())
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
void edm::Worker::skipOnPath ( )

Definition at line 321 of file Worker.cc.

References cached_exception_, edm::WaitingTaskList::doneWaiting(), numberOfPathsLeftToRun_, and waitingTasks_.

Referenced by edm::WorkerInPath::skipWorker().

321  {
322  if( 0 == --numberOfPathsLeftToRun_) {
324  }
325  }
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:399
std::exception_ptr cached_exception_
Definition: Worker.h:404
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:410
State edm::Worker::state ( ) const
inline

Definition at line 171 of file Worker.h.

Referenced by edm::WorkerInPath::checkResultsOfRunWorker().

171 { return state_; }
std::atomic< State > state_
Definition: Worker.h:397
int edm::Worker::timesExcept ( ) const
inline

Definition at line 170 of file Worker.h.

Referenced by edm::fillWorkerSummaryAux().

170 { return timesExcept_.load(std::memory_order_acquire); }
std::atomic< int > timesExcept_
Definition: Worker.h:396
int edm::Worker::timesFailed ( ) const
inline

Definition at line 169 of file Worker.h.

Referenced by edm::fillWorkerSummaryAux().

169 { return timesFailed_.load(std::memory_order_acquire); }
std::atomic< int > timesFailed_
Definition: Worker.h:395
int edm::Worker::timesPass ( ) const
inline

Definition at line 173 of file Worker.h.

173 { return timesPassed(); } // for backward compatibility only - to be removed soon
int timesPassed() const
Definition: Worker.h:168
int edm::Worker::timesPassed ( ) const
inline

Definition at line 168 of file Worker.h.

Referenced by edm::fillWorkerSummaryAux().

168 { return timesPassed_.load(std::memory_order_acquire); }
std::atomic< int > timesPassed_
Definition: Worker.h:394
int edm::Worker::timesRun ( ) const
inline

Definition at line 166 of file Worker.h.

Referenced by edm::fillWorkerSummaryAux().

166 { return timesRun_.load(std::memory_order_acquire); }
std::atomic< int > timesRun_
Definition: Worker.h:392
int edm::Worker::timesVisited ( ) const
inline

Definition at line 167 of file Worker.h.

Referenced by edm::fillWorkerSummaryAux().

167 { return timesVisited_.load(std::memory_order_acquire); }
std::atomic< int > timesVisited_
Definition: Worker.h:393
virtual void edm::Worker::updateLookup ( BranchType  iBranchType,
ProductResolverIndexHelper const &   
)
pure virtual

Implemented in edm::WorkerT< T >.

Referenced by edm::Schedule::changeModule().

virtual std::string edm::Worker::workerType ( ) const
protectedpure virtual

Implemented in edm::WorkerT< T >.

Friends And Related Function Documentation

template<typename O >
friend class workerhelper::CallImpl
friend

Definition at line 176 of file Worker.h.

Member Data Documentation

ExceptionToActionTable const* edm::Worker::actions_
private

Definition at line 403 of file Worker.h.

Referenced by shouldRethrowException().

std::shared_ptr<ActivityRegistry> edm::Worker::actReg_
private

Definition at line 406 of file Worker.h.

Referenced by beginJob(), beginStream(), endJob(), endStream(), prefetchAsync(), and setActivityRegistry().

std::exception_ptr edm::Worker::cached_exception_
private

Definition at line 404 of file Worker.h.

Referenced by skipOnPath().

edm::propagate_const<EarlyDeleteHelper*> edm::Worker::earlyDeleteHelper_
private

Definition at line 408 of file Worker.h.

Referenced by postDoEvent(), and setEarlyDeleteHelper().

ModuleCallingContext edm::Worker::moduleCallingContext_
private

Definition at line 401 of file Worker.h.

Referenced by beginStream(), endStream(), prefetchAsync(), and resetModuleDescription().

std::atomic<int> edm::Worker::numberOfPathsLeftToRun_
private

Definition at line 399 of file Worker.h.

Referenced by skipOnPath().

int edm::Worker::numberOfPathsOn_
private

Definition at line 398 of file Worker.h.

std::atomic<State> edm::Worker::state_
private

Definition at line 397 of file Worker.h.

Referenced by beginJob(), beginStream(), endJob(), and endStream().

std::atomic<int> edm::Worker::timesExcept_
private

Definition at line 396 of file Worker.h.

std::atomic<int> edm::Worker::timesFailed_
private

Definition at line 395 of file Worker.h.

std::atomic<int> edm::Worker::timesPassed_
private

Definition at line 394 of file Worker.h.

std::atomic<int> edm::Worker::timesRun_
private

Definition at line 392 of file Worker.h.

std::atomic<int> edm::Worker::timesVisited_
private

Definition at line 393 of file Worker.h.

edm::WaitingTaskList edm::Worker::waitingTasks_
private

Definition at line 410 of file Worker.h.

Referenced by skipOnPath().

std::atomic<bool> edm::Worker::workStarted_
private

Definition at line 411 of file Worker.h.