CMS 3D CMS Logo

List of all members | Classes | Public Types | Public Member Functions | Protected Member Functions | Private Member Functions | Static Private Member Functions | Private Attributes | Friends
edm::Worker Class Referenceabstract

#include <Worker.h>

Inheritance diagram for edm::Worker:
edm::WorkerT< T >

Classes

class  RunModuleTask
 
class  TransitionIDValue
 
class  TransitionIDValueBase
 

Public Types

enum  State { Ready, Pass, Fail, Exception }
 
enum  Types { kAnalyzer, kFilter, kProducer, kOutputModule }
 

Public Member Functions

void addedToPath ()
 
void beginJob ()
 
void beginStream (StreamID id, StreamContext &streamContext)
 
void callWhenDoneAsync (WaitingTask *task)
 
void clearCounters ()
 
virtual std::vector< ConsumesInfoconsumesInfo () const =0
 
ModuleDescription const * descPtr () const
 
ModuleDescription const & description () const
 
template<typename T >
bool doWork (typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
 
template<typename T >
void doWorkAsync (WaitingTask *task, typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
 
template<typename T >
void doWorkNoPrefetchingAsync (WaitingTask *task, typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
 
void endJob ()
 
void endStream (StreamID id, StreamContext &streamContext)
 
virtual void modulesWhoseProductsAreConsumed (std::vector< ModuleDescription const * > &modules, ProductRegistry const &preg, std::map< std::string, ModuleDescription const * > const &labelsToDesc) const =0
 
virtual Types moduleType () const =0
 
Workeroperator= (Worker const &)=delete
 
void postDoEvent (EventPrincipal const &)
 
void postForkReacquireResources (unsigned int iChildIndex, unsigned int iNumberOfChildren)
 
void preForkReleaseResources ()
 
void registerThinnedAssociations (ProductRegistry const &registry, ThinnedAssociationsHelper &helper)
 
void reset ()
 
virtual void resolvePutIndicies (BranchType iBranchType, std::unordered_multimap< std::string, edm::ProductResolverIndex > const &iIndicies)=0
 
void respondToCloseInputFile (FileBlock const &fb)
 
void respondToOpenInputFile (FileBlock const &fb)
 
void setActivityRegistry (std::shared_ptr< ActivityRegistry > areg)
 
void setEarlyDeleteHelper (EarlyDeleteHelper *iHelper)
 
void skipOnPath ()
 
State state () const
 
int timesExcept () const
 
int timesFailed () const
 
int timesPass () const
 
int timesPassed () const
 
int timesRun () const
 
int timesVisited () const
 
virtual void updateLookup (BranchType iBranchType, ProductResolverIndexHelper const &)=0
 
 Worker (ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
 
 Worker (Worker const &)=delete
 
virtual ~Worker ()
 

Protected Member Functions

ActivityRegistryactivityRegistry ()
 
virtual void implBeginJob ()=0
 
virtual void implBeginStream (StreamID)=0
 
virtual bool implDo (EventPrincipal const &, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoBegin (RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoBegin (LuminosityBlockPrincipal const &lbp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoEnd (RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoEnd (LuminosityBlockPrincipal const &lbp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoPrePrefetchSelection (StreamID id, EventPrincipal const &ep, ModuleCallingContext const *mcc)=0
 
virtual bool implDoStreamBegin (StreamID id, RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoStreamBegin (StreamID id, LuminosityBlockPrincipal const &lbp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoStreamEnd (StreamID id, RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoStreamEnd (StreamID id, LuminosityBlockPrincipal const &lbp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual void implEndJob ()=0
 
virtual void implEndStream (StreamID)=0
 
void resetModuleDescription (ModuleDescription const *)
 
virtual std::string workerType () const =0
 

Private Member Functions

void emitPostModuleEventPrefetchingSignal ()
 
virtual void implPostForkReacquireResources (unsigned int iChildIndex, unsigned int iNumberOfChildren)=0
 
virtual void implPreForkReleaseResources ()=0
 
virtual void implRegisterThinnedAssociations (ProductRegistry const &, ThinnedAssociationsHelper &)=0
 
virtual void implRespondToCloseInputFile (FileBlock const &fb)=0
 
virtual void implRespondToOpenInputFile (FileBlock const &fb)=0
 
virtual void itemsMayGet (BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
 
virtual std::vector< ProductResolverIndex > const & itemsShouldPutInEvent () const =0
 
virtual void itemsToGet (BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
 
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom (BranchType) const =0
 
virtual void preActionBeforeRunEventAsync (WaitingTask *iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const =0
 
void prefetchAsync (WaitingTask *, ParentContext const &parentContext, Principal const &)
 
template<typename T >
bool runModule (typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
 
template<typename T >
void runModuleAfterAsyncPrefetch (std::exception_ptr const *iEPtr, typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
 
virtual SerialTaskQueueChainserializeRunModule ()=0
 
template<bool IS_EVENT>
std::exception_ptr setException (std::exception_ptr iException)
 
template<bool IS_EVENT>
bool setFailed ()
 
template<bool IS_EVENT>
bool setPassed ()
 
bool shouldRethrowException (std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
 

Static Private Member Functions

static void exceptionContext (cms::Exception &ex, ModuleCallingContext const *mcc)
 

Private Attributes

ExceptionToActionTable const * actions_
 
std::shared_ptr< ActivityRegistryactReg_
 
std::exception_ptr cached_exception_
 
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
 
ModuleCallingContext moduleCallingContext_
 
std::atomic< int > numberOfPathsLeftToRun_
 
int numberOfPathsOn_
 
std::atomic< Statestate_
 
std::atomic< int > timesExcept_
 
std::atomic< int > timesFailed_
 
std::atomic< int > timesPassed_
 
std::atomic< int > timesRun_
 
std::atomic< int > timesVisited_
 
edm::WaitingTaskList waitingTasks_
 
std::atomic< bool > workStarted_
 

Friends

template<typename O >
class workerhelper::CallImpl
 

Detailed Description

Definition at line 76 of file Worker.h.

Member Enumeration Documentation

Enumerator
Ready 
Pass 
Fail 
Exception 

Definition at line 78 of file Worker.h.

Enumerator
kAnalyzer 
kFilter 
kProducer 
kOutputModule 

Definition at line 79 of file Worker.h.

Constructor & Destructor Documentation

edm::Worker::Worker ( ModuleDescription const &  iMD,
ExceptionToActionTable const *  iActions 
)

Definition at line 72 of file Worker.cc.

73  :
74  timesRun_(0),
75  timesVisited_(0),
76  timesPassed_(0),
77  timesFailed_(0),
78  timesExcept_(0),
79  state_(Ready),
83  actions_(iActions),
85  actReg_(),
86  earlyDeleteHelper_(nullptr),
87  workStarted_(false)
88  {
89  }
std::atomic< int > timesVisited_
Definition: Worker.h:396
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:402
std::atomic< int > timesExcept_
Definition: Worker.h:399
std::atomic< bool > workStarted_
Definition: Worker.h:414
std::atomic< int > timesFailed_
Definition: Worker.h:398
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:409
ExceptionToActionTable const * actions_
Definition: Worker.h:406
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:404
std::exception_ptr cached_exception_
Definition: Worker.h:407
std::atomic< State > state_
Definition: Worker.h:400
int numberOfPathsOn_
Definition: Worker.h:401
std::atomic< int > timesPassed_
Definition: Worker.h:397
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:411
std::atomic< int > timesRun_
Definition: Worker.h:395
edm::Worker::~Worker ( )
virtual

Definition at line 91 of file Worker.cc.

91  {
92  }
edm::Worker::Worker ( Worker const &  )
delete

Member Function Documentation

ActivityRegistry* edm::Worker::activityRegistry ( )
inlineprotected

Definition at line 206 of file Worker.h.

References edm::exceptionContext(), and benchmark_cfg::fb.

Referenced by edm::WorkerT< T >::implDo().

206 { return actReg_.get(); }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:409
void edm::Worker::addedToPath ( )
inline

Definition at line 162 of file Worker.h.

Referenced by edm::WorkerInPath::WorkerInPath().

162  {
164  }
int numberOfPathsOn_
Definition: Worker.h:401
void edm::Worker::beginJob ( void  )

Definition at line 245 of file Worker.cc.

References actReg_, cms::Exception::addContext(), description(), Exception, implBeginJob(), edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), state_, and edm::convertException::wrap().

Referenced by edm::WorkerManager::beginJob(), and edm::GlobalSchedule::replaceModule().

245  {
246  try {
247  convertException::wrap([&]() {
248  ModuleBeginJobSignalSentry cpp(actReg_.get(), description());
249  implBeginJob();
250  });
251  }
252  catch(cms::Exception& ex) {
253  state_ = Exception;
254  std::ostringstream ost;
255  ost << "Calling beginJob for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
256  ex.addContext(ost.str());
257  throw;
258  }
259  }
ModuleDescription const & description() const
Definition: Worker.h:132
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:409
std::string const & moduleName() const
std::string const & moduleLabel() const
std::atomic< State > state_
Definition: Worker.h:400
virtual void implBeginJob()=0
void addContext(std::string const &context)
Definition: Exception.cc:227
auto wrap(F iFunc) -> decltype(iFunc())
void edm::Worker::beginStream ( StreamID  id,
StreamContext streamContext 
)

Definition at line 277 of file Worker.cc.

References actReg_, cms::Exception::addContext(), description(), Exception, implBeginStream(), edm::LuminosityBlockIndex::invalidLuminosityBlockIndex(), edm::RunIndex::invalidRunIndex(), edm::StreamContext::kBeginStream, edm::ModuleCallingContext::kRunning, moduleCallingContext_, edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), edm::StreamContext::setEventID(), edm::StreamContext::setLuminosityBlockIndex(), edm::StreamContext::setRunIndex(), edm::ModuleCallingContext::setState(), edm::StreamContext::setTimestamp(), edm::StreamContext::setTransition(), state_, and edm::convertException::wrap().

Referenced by edm::StreamSchedule::replaceModule().

277  {
278  try {
279  convertException::wrap([&]() {
280  streamContext.setTransition(StreamContext::Transition::kBeginStream);
281  streamContext.setEventID(EventID(0, 0, 0));
282  streamContext.setRunIndex(RunIndex::invalidRunIndex());
283  streamContext.setLuminosityBlockIndex(LuminosityBlockIndex::invalidLuminosityBlockIndex());
284  streamContext.setTimestamp(Timestamp());
285  ParentContext parentContext(&streamContext);
286  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
288  ModuleBeginStreamSignalSentry beginSentry(actReg_.get(), streamContext, moduleCallingContext_);
289  implBeginStream(id);
290  });
291  }
292  catch(cms::Exception& ex) {
293  state_ = Exception;
294  std::ostringstream ost;
295  ost << "Calling beginStream for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
296  ex.addContext(ost.str());
297  throw;
298  }
299  }
ModuleDescription const & description() const
Definition: Worker.h:132
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:409
std::string const & moduleName() const
std::string const & moduleLabel() const
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:404
static RunIndex invalidRunIndex()
Definition: RunIndex.cc:9
std::atomic< State > state_
Definition: Worker.h:400
static LuminosityBlockIndex invalidLuminosityBlockIndex()
void addContext(std::string const &context)
Definition: Exception.cc:227
virtual void implBeginStream(StreamID)=0
auto wrap(F iFunc) -> decltype(iFunc())
void edm::Worker::callWhenDoneAsync ( WaitingTask task)
inline

Definition at line 107 of file Worker.h.

References bk::beginJob().

107  {
108  waitingTasks_.add(task);
109  }
void add(WaitingTask *)
Adds task to the waiting list.
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:413
void edm::Worker::clearCounters ( )
inline

Definition at line 154 of file Worker.h.

Referenced by edm::StreamSchedule::clearCounters().

154  {
155  timesRun_.store(0,std::memory_order_release);
156  timesVisited_.store(0,std::memory_order_release);
157  timesPassed_.store(0,std::memory_order_release);
158  timesFailed_.store(0,std::memory_order_release);
159  timesExcept_.store(0,std::memory_order_release);
160  }
std::atomic< int > timesVisited_
Definition: Worker.h:396
std::atomic< int > timesExcept_
Definition: Worker.h:399
std::atomic< int > timesFailed_
Definition: Worker.h:398
std::atomic< int > timesPassed_
Definition: Worker.h:397
std::atomic< int > timesRun_
Definition: Worker.h:395
virtual std::vector<ConsumesInfo> edm::Worker::consumesInfo ( ) const
pure virtual
ModuleDescription const* edm::Worker::descPtr ( ) const
inline

Definition at line 133 of file Worker.h.

ModuleCallingContext moduleCallingContext_
Definition: Worker.h:404
ModuleDescription const * moduleDescription() const
ModuleDescription const& edm::Worker::description ( ) const
inline
template<typename T >
bool edm::Worker::doWork ( typename T::MyPrincipal const &  ep,
EventSetup const &  c,
StreamID  stream,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

Definition at line 712 of file Worker.h.

References Fail, edm::ServiceRegistry::instance(), edm::ModuleCallingContext::kInvalid, edm::ModuleCallingContext::kPrefetching, edm::make_empty_waiting_task(), Pass, edm::ServiceRegistry::presentToken(), mps_fire::queue, Ready, and edm::ModuleCallingContext::setContext().

Referenced by edm::WorkerInPath::runWorker().

716  {
717 
718  if (T::isEvent_) {
719  timesVisited_.fetch_add(1,std::memory_order_relaxed);
720  }
721  bool rc = false;
722 
723  switch(state_) {
724  case Ready: break;
725  case Pass: return true;
726  case Fail: return false;
727  case Exception: {
728  std::rethrow_exception(cached_exception_);
729  }
730  }
731 
732  bool expected = false;
733  if(not workStarted_.compare_exchange_strong(expected, true) ) {
734  //another thread beat us here
735  auto waitTask = edm::make_empty_waiting_task();
736  waitTask->increment_ref_count();
737 
738  waitingTasks_.add(waitTask.get());
739 
740  waitTask->wait_for_all();
741 
742  switch(state_) {
743  case Ready: assert(false);
744  case Pass: return true;
745  case Fail: return false;
746  case Exception: {
747  std::rethrow_exception(cached_exception_);
748  }
749  }
750  }
751 
752  //Need the context to be set until after any exception is resolved
754 
755  auto resetContext = [](ModuleCallingContext* iContext) {iContext->setContext(ModuleCallingContext::State::kInvalid,ParentContext(),nullptr); };
756  std::unique_ptr<ModuleCallingContext, decltype(resetContext)> prefetchSentry(&moduleCallingContext_,resetContext);
757 
758  if (T::isEvent_) {
759  try {
760  //if have TriggerResults based selection we want to reject the event before doing prefetching
761  if( not workerhelper::CallImpl<T>::prePrefetchSelection(this,streamID,ep,&moduleCallingContext_) ) {
762  timesRun_.fetch_add(1,std::memory_order_relaxed);
763  rc = setPassed<T::isEvent_>();
764  waitingTasks_.doneWaiting(nullptr);
765  return true;
766  }
767  }catch(...) {}
768  auto waitTask = edm::make_empty_waiting_task();
769  {
770  //Make sure signal is sent once the prefetching is done
771  // [the 'pre' signal was sent in prefetchAsync]
772  //The purpose of this block is to send the signal after wait_for_all
773  auto sentryFunc = [this](void*) {
775  };
776  std::unique_ptr<ActivityRegistry, decltype(sentryFunc)> signalSentry(actReg_.get(),sentryFunc);
777 
778  //set count to 2 since wait_for_all requires value to not go to 0
779  waitTask->set_ref_count(2);
780 
781  prefetchAsync(waitTask.get(),parentContext, ep);
782  waitTask->decrement_ref_count();
783  waitTask->wait_for_all();
784  }
785  if(waitTask->exceptionPtr() != nullptr) {
786  TransitionIDValue<typename T::MyPrincipal> idValue(ep);
787  if(shouldRethrowException(*waitTask->exceptionPtr(), parentContext, T::isEvent_, idValue)) {
788  setException<T::isEvent_>(*waitTask->exceptionPtr());
790  std::rethrow_exception(cached_exception_);
791  } else {
792  setPassed<T::isEvent_>();
793  waitingTasks_.doneWaiting(nullptr);
794  return true;
795  }
796  }
797  }
798 
799  //successful prefetch so no reset necessary
800  prefetchSentry.release();
801  if(auto queue = serializeRunModule()) {
802  auto serviceToken = ServiceRegistry::instance().presentToken();
803  queue->pushAndWait([&]() {
804  //Need to make the services available
805  ServiceRegistry::Operate guard(serviceToken);
806  try {
807  rc = runModule<T>(ep,es,streamID,parentContext,context);
808  } catch(...) {
809  }
810  });
811  } else {
812  try {
813  rc = runModule<T>(ep,es,streamID,parentContext,context);
814  } catch(...) {
815  }
816  }
817  if(state_ == Exception) {
819  std::rethrow_exception(cached_exception_);
820  }
821 
822  waitingTasks_.doneWaiting(nullptr);
823  return rc;
824  }
std::atomic< int > timesVisited_
Definition: Worker.h:396
std::atomic< bool > workStarted_
Definition: Worker.h:414
void add(WaitingTask *)
Adds task to the waiting list.
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:409
virtual SerialTaskQueueChain * serializeRunModule()=0
ServiceToken presentToken() const
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:404
std::exception_ptr cached_exception_
Definition: Worker.h:407
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
Definition: Worker.cc:157
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:413
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
static ServiceRegistry & instance()
std::atomic< State > state_
Definition: Worker.h:400
void prefetchAsync(WaitingTask *, ParentContext const &parentContext, Principal const &)
Definition: Worker.cc:205
void emitPostModuleEventPrefetchingSignal()
Definition: Worker.h:301
std::atomic< int > timesRun_
Definition: Worker.h:395
template<typename T >
void edm::Worker::doWorkAsync ( WaitingTask task,
typename T::MyPrincipal const &  ep,
EventSetup const &  c,
StreamID  stream,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

Definition at line 615 of file Worker.h.

References edm::ModuleCallingContext::kPrefetching, and edm::ModuleCallingContext::setContext().

Referenced by edm::WorkerInPath::runWorkerAsync().

620  {
621  waitingTasks_.add(task);
622  if(T::isEvent_) {
623  timesVisited_.fetch_add(1,std::memory_order_relaxed);
624  }
625 
626  bool expected = false;
627  if(workStarted_.compare_exchange_strong(expected,true)) {
629 
630  //if have TriggerResults based selection we want to reject the event before doing prefetching
631  try {
632  if( not workerhelper::CallImpl<T>::prePrefetchSelection(this,streamID,ep,&moduleCallingContext_) ) {
633  setPassed<T::isEvent_>();
634  waitingTasks_.doneWaiting(nullptr);
635  return;
636  }
637  }catch(...) {}
638 
639  auto runTask = new (tbb::task::allocate_root()) RunModuleTask<T>(
640  this, ep,es,streamID,parentContext,context);
641  prefetchAsync(runTask, parentContext, ep);
642  }
643  }
std::atomic< int > timesVisited_
Definition: Worker.h:396
std::atomic< bool > workStarted_
Definition: Worker.h:414
void add(WaitingTask *)
Adds task to the waiting list.
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:404
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:413
void prefetchAsync(WaitingTask *, ParentContext const &parentContext, Principal const &)
Definition: Worker.cc:205
template<typename T >
void edm::Worker::doWorkNoPrefetchingAsync ( WaitingTask task,
typename T::MyPrincipal const &  principal,
EventSetup const &  c,
StreamID  stream,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

Definition at line 674 of file Worker.h.

References edm::ServiceRegistry::instance(), edm::make_functor_task(), edm::ServiceRegistry::presentToken(), cmsRelvalreport::principal(), and mps_fire::queue.

Referenced by edm::WorkerInPath::runWorkerAsync().

679  {
680  waitingTasks_.add(task);
681  bool expected = false;
682  if(workStarted_.compare_exchange_strong(expected,true)) {
683  auto serviceToken = ServiceRegistry::instance().presentToken();
684 
685  auto toDo =[this, &principal, &es, streamID,parentContext,context, serviceToken]()
686  {
687  std::exception_ptr exceptionPtr;
688  try {
689  //Need to make the services available
690  ServiceRegistry::Operate guard(serviceToken);
691 
692  this->runModule<T>(principal,
693  es,
694  streamID,
695  parentContext,
696  context);
697  } catch( ... ) {
698  exceptionPtr = std::current_exception();
699  }
700  this->waitingTasks_.doneWaiting(exceptionPtr);
701  };
702  if(auto queue = this->serializeRunModule()) {
703  queue->push( toDo);
704  } else {
705  auto task = make_functor_task( tbb::task::allocate_root(), toDo);
706  tbb::task::spawn(*task);
707  }
708  }
709  }
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
std::atomic< bool > workStarted_
Definition: Worker.h:414
void add(WaitingTask *)
Adds task to the waiting list.
virtual SerialTaskQueueChain * serializeRunModule()=0
ServiceToken presentToken() const
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
def principal(options)
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:413
static ServiceRegistry & instance()
void edm::Worker::emitPostModuleEventPrefetchingSignal ( )
inlineprivate

Definition at line 301 of file Worker.h.

301  {
302  actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),moduleCallingContext_);
303  }
StreamContext const * getStreamContext() const
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:409
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:404
void edm::Worker::endJob ( void  )

Definition at line 261 of file Worker.cc.

References actReg_, cms::Exception::addContext(), description(), Exception, implEndJob(), edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), state_, and edm::convertException::wrap().

261  {
262  try {
263  convertException::wrap([&]() {
264  ModuleEndJobSignalSentry cpp(actReg_.get(), description());
265  implEndJob();
266  });
267  }
268  catch(cms::Exception& ex) {
269  state_ = Exception;
270  std::ostringstream ost;
271  ost << "Calling endJob for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
272  ex.addContext(ost.str());
273  throw;
274  }
275  }
ModuleDescription const & description() const
Definition: Worker.h:132
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:409
std::string const & moduleName() const
std::string const & moduleLabel() const
std::atomic< State > state_
Definition: Worker.h:400
virtual void implEndJob()=0
void addContext(std::string const &context)
Definition: Exception.cc:227
auto wrap(F iFunc) -> decltype(iFunc())
void edm::Worker::endStream ( StreamID  id,
StreamContext streamContext 
)

Definition at line 301 of file Worker.cc.

References actReg_, cms::Exception::addContext(), description(), Exception, implEndStream(), edm::LuminosityBlockIndex::invalidLuminosityBlockIndex(), edm::RunIndex::invalidRunIndex(), edm::StreamContext::kEndStream, edm::ModuleCallingContext::kRunning, moduleCallingContext_, edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), edm::StreamContext::setEventID(), edm::StreamContext::setLuminosityBlockIndex(), edm::StreamContext::setRunIndex(), edm::ModuleCallingContext::setState(), edm::StreamContext::setTimestamp(), edm::StreamContext::setTransition(), state_, and edm::convertException::wrap().

301  {
302  try {
303  convertException::wrap([&]() {
304  streamContext.setTransition(StreamContext::Transition::kEndStream);
305  streamContext.setEventID(EventID(0, 0, 0));
306  streamContext.setRunIndex(RunIndex::invalidRunIndex());
307  streamContext.setLuminosityBlockIndex(LuminosityBlockIndex::invalidLuminosityBlockIndex());
308  streamContext.setTimestamp(Timestamp());
309  ParentContext parentContext(&streamContext);
310  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
312  ModuleEndStreamSignalSentry endSentry(actReg_.get(), streamContext, moduleCallingContext_);
313  implEndStream(id);
314  });
315  }
316  catch(cms::Exception& ex) {
317  state_ = Exception;
318  std::ostringstream ost;
319  ost << "Calling endStream for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
320  ex.addContext(ost.str());
321  throw;
322  }
323  }
ModuleDescription const & description() const
Definition: Worker.h:132
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:409
std::string const & moduleName() const
std::string const & moduleLabel() const
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:404
static RunIndex invalidRunIndex()
Definition: RunIndex.cc:9
std::atomic< State > state_
Definition: Worker.h:400
static LuminosityBlockIndex invalidLuminosityBlockIndex()
void addContext(std::string const &context)
Definition: Exception.cc:227
virtual void implEndStream(StreamID)=0
auto wrap(F iFunc) -> decltype(iFunc())
void edm::Worker::exceptionContext ( cms::Exception ex,
ModuleCallingContext const *  mcc 
)
staticprivate

Definition at line 99 of file Worker.cc.

References cms::Exception::addContext(), edm::exceptionContext(), edm::ModuleCallingContext::globalContext(), edm::ModuleCallingContext::internalContext(), edm::ParentContext::kGlobal, edm::ParentContext::kInternal, edm::ParentContext::kModule, edm::ParentContext::kPlaceInPath, edm::ModuleCallingContext::kPrefetching, edm::ParentContext::kStream, edm::InternalContext::moduleCallingContext(), edm::ModuleCallingContext::moduleCallingContext(), edm::ModuleCallingContext::moduleDescription(), edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), or, edm::PlaceInPathContext::pathContext(), edm::PathContext::pathName(), edm::ModuleCallingContext::placeInPathContext(), edm::ModuleCallingContext::state(), edm::PathContext::streamContext(), edm::ModuleCallingContext::streamContext(), and edm::ModuleCallingContext::type().

101  {
102 
103  ModuleCallingContext const* imcc = mcc;
104  while( (imcc->type() == ParentContext::Type::kModule) or
105  (imcc->type() == ParentContext::Type::kInternal) ) {
106  std::ostringstream iost;
107  if( imcc->state() == ModuleCallingContext::State::kPrefetching ) {
108  iost << "Prefetching for module ";
109  } else {
110  iost << "Calling method for module ";
111  }
112  iost << imcc->moduleDescription()->moduleName() << "/'"
113  << imcc->moduleDescription()->moduleLabel() << "'";
114 
115  if(imcc->type() == ParentContext::Type::kInternal) {
116  iost << " (probably inside some kind of mixing module)";
117  imcc = imcc->internalContext()->moduleCallingContext();
118  } else {
119  imcc = imcc->moduleCallingContext();
120  }
121  ex.addContext(iost.str());
122  }
123  std::ostringstream ost;
124  if( imcc->state() == ModuleCallingContext::State::kPrefetching ) {
125  ost << "Prefetching for module ";
126  } else {
127  ost << "Calling method for module ";
128  }
129  ost << imcc->moduleDescription()->moduleName() << "/'"
130  << imcc->moduleDescription()->moduleLabel() << "'";
131  ex.addContext(ost.str());
132 
133  if (imcc->type() == ParentContext::Type::kPlaceInPath) {
134  ost.str("");
135  ost << "Running path '";
136  ost << imcc->placeInPathContext()->pathContext()->pathName() << "'";
137  ex.addContext(ost.str());
138  auto streamContext =imcc->placeInPathContext()->pathContext()->streamContext();
139  if(streamContext) {
140  ost.str("");
141  edm::exceptionContext(ost,*streamContext);
142  ex.addContext(ost.str());
143  }
144  } else {
145  if (imcc->type() == ParentContext::Type::kStream) {
146  ost.str("");
147  edm::exceptionContext(ost, *(imcc->streamContext()) );
148  ex.addContext(ost.str());
149  } else if(imcc->type() == ParentContext::Type::kGlobal) {
150  ost.str("");
151  edm::exceptionContext(ost, *(imcc->globalContext()) );
152  ex.addContext(ost.str());
153  }
154  }
155  }
void exceptionContext(std::ostream &, GlobalContext const &)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
void addContext(std::string const &context)
Definition: Exception.cc:227
virtual void edm::Worker::implBeginJob ( )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by beginJob().

virtual void edm::Worker::implBeginStream ( StreamID  )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by beginStream().

virtual bool edm::Worker::implDo ( EventPrincipal const &  ,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual
virtual bool edm::Worker::implDoBegin ( RunPrincipal const &  rp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual
virtual bool edm::Worker::implDoBegin ( LuminosityBlockPrincipal const &  lbp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

virtual bool edm::Worker::implDoEnd ( RunPrincipal const &  rp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual
virtual bool edm::Worker::implDoEnd ( LuminosityBlockPrincipal const &  lbp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

virtual bool edm::Worker::implDoPrePrefetchSelection ( StreamID  id,
EventPrincipal const &  ep,
ModuleCallingContext const *  mcc 
)
protectedpure virtual
virtual bool edm::Worker::implDoStreamBegin ( StreamID  id,
RunPrincipal const &  rp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual
virtual bool edm::Worker::implDoStreamBegin ( StreamID  id,
LuminosityBlockPrincipal const &  lbp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

virtual bool edm::Worker::implDoStreamEnd ( StreamID  id,
RunPrincipal const &  rp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual
virtual bool edm::Worker::implDoStreamEnd ( StreamID  id,
LuminosityBlockPrincipal const &  lbp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::implEndJob ( )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by endJob().

virtual void edm::Worker::implEndStream ( StreamID  )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by endStream().

virtual void edm::Worker::implPostForkReacquireResources ( unsigned int  iChildIndex,
unsigned int  iNumberOfChildren 
)
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::implPreForkReleaseResources ( )
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::implRegisterThinnedAssociations ( ProductRegistry const &  ,
ThinnedAssociationsHelper  
)
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::implRespondToCloseInputFile ( FileBlock const &  fb)
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::implRespondToOpenInputFile ( FileBlock const &  fb)
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::itemsMayGet ( BranchType  ,
std::vector< ProductResolverIndexAndSkipBit > &   
) const
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual std::vector<ProductResolverIndex> const& edm::Worker::itemsShouldPutInEvent ( ) const
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::itemsToGet ( BranchType  ,
std::vector< ProductResolverIndexAndSkipBit > &   
) const
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual std::vector<ProductResolverIndexAndSkipBit> const& edm::Worker::itemsToGetFrom ( BranchType  ) const
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by prefetchAsync().

virtual void edm::Worker::modulesWhoseProductsAreConsumed ( std::vector< ModuleDescription const * > &  modules,
ProductRegistry const &  preg,
std::map< std::string, ModuleDescription const * > const &  labelsToDesc 
) const
pure virtual

Implemented in edm::WorkerT< T >.

virtual Types edm::Worker::moduleType ( ) const
pure virtual
Worker& edm::Worker::operator= ( Worker const &  )
delete
void edm::Worker::postDoEvent ( EventPrincipal const &  iEvent)

Definition at line 331 of file Worker.cc.

References earlyDeleteHelper_.

Referenced by edm::WorkerT< T >::implDo().

331  {
332  if(earlyDeleteHelper_) {
333  earlyDeleteHelper_->moduleRan(iEvent);
334  }
335  }
int iEvent
Definition: GenABIO.cc:230
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:411
void edm::Worker::postForkReacquireResources ( unsigned int  iChildIndex,
unsigned int  iNumberOfChildren 
)
inline

Definition at line 119 of file Worker.h.

Referenced by edm::Schedule::postForkReacquireResources().

119 {implPostForkReacquireResources(iChildIndex, iNumberOfChildren);}
virtual void implPostForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren)=0
virtual void edm::Worker::preActionBeforeRunEventAsync ( WaitingTask iTask,
ModuleCallingContext const &  moduleCallingContext,
Principal const &  iPrincipal 
) const
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by prefetchAsync().

void edm::Worker::prefetchAsync ( WaitingTask iTask,
ParentContext const &  parentContext,
Principal const &  iPrincipal 
)
private

Definition at line 205 of file Worker.cc.

References actReg_, edm::Principal::branchType(), edm::ModuleCallingContext::getStreamContext(), edm::InEvent, mps_monitormerge::items, itemsToGetFrom(), edm::ModuleCallingContext::kPrefetching, moduleCallingContext_, preActionBeforeRunEventAsync(), edm::Principal::prefetchAsync(), edm::ProductResolverIndexAmbiguous, and edm::ModuleCallingContext::setContext().

205  {
206  // Prefetch products the module declares it consumes (not including the products it maybe consumes)
207  std::vector<ProductResolverIndexAndSkipBit> const& items = itemsToGetFrom(iPrincipal.branchType());
208 
210 
211  if(iPrincipal.branchType()==InEvent) {
212  actReg_->preModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),moduleCallingContext_);
213  }
214 
215  //Need to be sure the ref count isn't set to 0 immediately
216  iTask->increment_ref_count();
217  for(auto const& item : items) {
218  ProductResolverIndex productResolverIndex = item.productResolverIndex();
219  bool skipCurrentProcess = item.skipCurrentProcess();
220  if(productResolverIndex != ProductResolverIndexAmbiguous) {
221  iPrincipal.prefetchAsync(iTask,productResolverIndex, skipCurrentProcess, &moduleCallingContext_);
222  }
223  }
224 
225  if(iPrincipal.branchType()==InEvent) {
227  }
228 
229  if(0 == iTask->decrement_ref_count()) {
230  //if everything finishes before we leave this routine, we need to launch the task
231  tbb::task::spawn(*iTask);
232  }
233  }
unsigned int ProductResolverIndex
StreamContext const * getStreamContext() const
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:409
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:404
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom(BranchType) const =0
virtual void preActionBeforeRunEventAsync(WaitingTask *iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const =0
void edm::Worker::preForkReleaseResources ( )
inline

Definition at line 118 of file Worker.h.

Referenced by edm::Schedule::preForkReleaseResources().

virtual void implPreForkReleaseResources()=0
void edm::Worker::registerThinnedAssociations ( ProductRegistry const &  registry,
ThinnedAssociationsHelper helper 
)
inline

Definition at line 120 of file Worker.h.

Definition: helper.py:1
virtual void implRegisterThinnedAssociations(ProductRegistry const &, ThinnedAssociationsHelper &)=0
static Interceptor::Registry registry("Interceptor")
void edm::Worker::reset ( void  )
inline

Definition at line 122 of file Worker.h.

References Ready.

Referenced by edm::WorkerManager::resetAll().

122  {
123  cached_exception_ = std::exception_ptr();
124  state_ = Ready;
126  workStarted_ = false;
128  }
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:402
std::atomic< bool > workStarted_
Definition: Worker.h:414
void reset()
Resets access to the resource so that added tasks will wait.
std::exception_ptr cached_exception_
Definition: Worker.h:407
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:413
std::atomic< State > state_
Definition: Worker.h:400
int numberOfPathsOn_
Definition: Worker.h:401
void edm::Worker::resetModuleDescription ( ModuleDescription const *  iDesc)
protected
virtual void edm::Worker::resolvePutIndicies ( BranchType  iBranchType,
std::unordered_multimap< std::string, edm::ProductResolverIndex > const &  iIndicies 
)
pure virtual

Implemented in edm::WorkerT< T >.

void edm::Worker::respondToCloseInputFile ( FileBlock const &  fb)
inline

Definition at line 116 of file Worker.h.

Referenced by edm::Schedule::respondToCloseInputFile().

virtual void implRespondToCloseInputFile(FileBlock const &fb)=0
void edm::Worker::respondToOpenInputFile ( FileBlock const &  fb)
inline

Definition at line 115 of file Worker.h.

Referenced by edm::Schedule::respondToOpenInputFile().

virtual void implRespondToOpenInputFile(FileBlock const &fb)=0
template<typename T >
bool edm::Worker::runModule ( typename T::MyPrincipal const &  ep,
EventSetup const &  c,
StreamID  stream,
ParentContext const &  parentContext,
typename T::Context const *  context 
)
private

Definition at line 828 of file Worker.h.

References edm::exceptionContext(), edm::EventSetup::get(), and edm::convertException::wrap().

832  {
833  //unscheduled producers should advance this
834  //if (T::isEvent_) {
835  // ++timesVisited_;
836  //}
837  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
838  if (T::isEvent_) {
839  timesRun_.fetch_add(1,std::memory_order_relaxed);
840  }
841 
842  bool rc = true;
843  try {
845  {
846  rc = workerhelper::CallImpl<T>::call(this,streamID,ep,es, actReg_.get(), &moduleCallingContext_, context);
847 
848  if (rc) {
849  setPassed<T::isEvent_>();
850  } else {
851  setFailed<T::isEvent_>();
852  }
853  });
854  } catch(cms::Exception& ex) {
856  TransitionIDValue<typename T::MyPrincipal> idValue(ep);
857  if(shouldRethrowException(std::current_exception(), parentContext, T::isEvent_, idValue)) {
858  assert(not cached_exception_);
859  setException<T::isEvent_>(std::current_exception());
860  std::rethrow_exception(cached_exception_);
861  } else {
862  rc = setPassed<T::isEvent_>();
863  }
864  }
865 
866  return rc;
867  }
static void exceptionContext(cms::Exception &ex, ModuleCallingContext const *mcc)
Definition: Worker.cc:99
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:409
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:404
std::exception_ptr cached_exception_
Definition: Worker.h:407
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
Definition: Worker.cc:157
auto wrap(F iFunc) -> decltype(iFunc())
std::atomic< int > timesRun_
Definition: Worker.h:395
template<typename T >
void edm::Worker::runModuleAfterAsyncPrefetch ( std::exception_ptr const *  iEPtr,
typename T::MyPrincipal const &  ep,
EventSetup const &  es,
StreamID  streamID,
ParentContext const &  parentContext,
typename T::Context const *  context 
)
private

Definition at line 646 of file Worker.h.

References edm::ModuleCallingContext::kInvalid, and edm::ModuleCallingContext::setContext().

Referenced by edm::Worker::RunModuleTask< T >::execute().

651  {
652  std::exception_ptr exceptionPtr;
653  if(iEPtr) {
654  assert(*iEPtr);
655  TransitionIDValue<typename T::MyPrincipal> idValue(ep);
656  if(shouldRethrowException(*iEPtr, parentContext, T::isEvent_, idValue)) {
657  exceptionPtr = *iEPtr;
658  setException<T::isEvent_>(exceptionPtr);
659  } else {
660  setPassed<T::isEvent_>();
661  }
663  } else {
664  try {
665  runModule<T>(ep,es,streamID,parentContext,context);
666  } catch(...) {
667  exceptionPtr = std::current_exception();
668  }
669  }
670  waitingTasks_.doneWaiting(exceptionPtr);
671  }
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:404
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
Definition: Worker.cc:157
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:413
virtual SerialTaskQueueChain* edm::Worker::serializeRunModule ( )
privatepure virtual
void edm::Worker::setActivityRegistry ( std::shared_ptr< ActivityRegistry areg)

The signals are required to live longer than the last call to 'doWork' this was done to improve performance based on profiling

Definition at line 94 of file Worker.cc.

References actReg_.

94  {
95  actReg_ = areg;
96  }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:409
void edm::Worker::setEarlyDeleteHelper ( EarlyDeleteHelper iHelper)

Definition at line 235 of file Worker.cc.

References earlyDeleteHelper_.

235  {
236  earlyDeleteHelper_=iHelper;
237  }
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:411
template<bool IS_EVENT>
std::exception_ptr edm::Worker::setException ( std::exception_ptr  iException)
inlineprivate

Definition at line 288 of file Worker.h.

References Exception.

288  {
289  if (IS_EVENT) {
290  timesExcept_.fetch_add(1,std::memory_order_relaxed);
291  }
292  cached_exception_ = iException; // propagate_const<T> has no reset() function
293  state_ = Exception;
294  return cached_exception_;
295  }
std::atomic< int > timesExcept_
Definition: Worker.h:399
std::exception_ptr cached_exception_
Definition: Worker.h:407
std::atomic< State > state_
Definition: Worker.h:400
template<bool IS_EVENT>
bool edm::Worker::setFailed ( )
inlineprivate

Definition at line 279 of file Worker.h.

References Fail.

279  {
280  if(IS_EVENT) {
281  timesFailed_.fetch_add(1,std::memory_order_relaxed);
282  }
283  state_ = Fail;
284  return false;
285  }
std::atomic< int > timesFailed_
Definition: Worker.h:398
std::atomic< State > state_
Definition: Worker.h:400
template<bool IS_EVENT>
bool edm::Worker::setPassed ( )
inlineprivate

Definition at line 270 of file Worker.h.

References Pass.

270  {
271  if(IS_EVENT) {
272  timesPassed_.fetch_add(1,std::memory_order_relaxed);
273  }
274  state_ = Pass;
275  return true;
276  }
std::atomic< State > state_
Definition: Worker.h:400
std::atomic< int > timesPassed_
Definition: Worker.h:397
bool edm::Worker::shouldRethrowException ( std::exception_ptr  iPtr,
ParentContext const &  parentContext,
bool  isEvent,
TransitionIDValueBase const &  iID 
) const
private

Definition at line 157 of file Worker.cc.

References mps_alisetup::action, actions_, cms::Exception::category(), description(), edm::exception_actions::FailPath, edm::ExceptionToActionTable::find(), edm::ModuleCallingContext::getTopModuleCallingContext(), edm::exception_actions::IgnoreCompletely, edm::PathContext::isEndPath(), edm::ModuleCallingContext::kInvalid, edm::ParentContext::kPlaceInPath, edm::PlaceInPathContext::pathContext(), edm::ModuleCallingContext::placeInPathContext(), edm::printCmsExceptionWarning(), edm::exception_actions::Rethrow, edm::exception_actions::SkipEvent, edm::ModuleCallingContext::type(), and edm::convertException::wrap().

160  {
161 
162  // NOTE: the warning printed as a result of ignoring or failing
163  // a module will only be printed during the full true processing
164  // pass of this module
165 
166  // Get the action corresponding to this exception. However, if processing
167  // something other than an event (e.g. run, lumi) always rethrow.
168  if(not isEvent) {
169  return true;
170  }
171  try {
172  convertException::wrap([&]() {
173  std::rethrow_exception(iPtr);
174  });
175  } catch(cms::Exception &ex) {
177 
178  if(action == exception_actions::Rethrow) {
179  return true;
180  }
181 
182  ModuleCallingContext tempContext(&description(),ModuleCallingContext::State::kInvalid, parentContext, nullptr);
183 
184  // If we are processing an endpath and the module was scheduled, treat SkipEvent or FailPath
185  // as IgnoreCompletely, so any subsequent OutputModules are still run.
186  // For unscheduled modules only treat FailPath as IgnoreCompletely but still allow SkipEvent to throw
187  ModuleCallingContext const* top_mcc = tempContext.getTopModuleCallingContext();
188  if(top_mcc->type() == ParentContext::Type::kPlaceInPath &&
189  top_mcc->placeInPathContext()->pathContext()->isEndPath()) {
190 
191  if ((action == exception_actions::SkipEvent && tempContext.type() == ParentContext::Type::kPlaceInPath) ||
192  action == exception_actions::FailPath) {
194  }
195  }
197  edm::printCmsExceptionWarning("IgnoreCompletely", ex);
198  return false;
199  }
200  }
201  return true;
202  }
ModuleDescription const & description() const
Definition: Worker.h:132
std::string const & category() const
Definition: Exception.cc:183
exception_actions::ActionCodes find(const std::string &category) const
ExceptionToActionTable const * actions_
Definition: Worker.h:406
auto wrap(F iFunc) -> decltype(iFunc())
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
void edm::Worker::skipOnPath ( )

Definition at line 325 of file Worker.cc.

References cached_exception_, edm::WaitingTaskList::doneWaiting(), numberOfPathsLeftToRun_, and waitingTasks_.

Referenced by edm::WorkerInPath::skipWorker().

325  {
326  if( 0 == --numberOfPathsLeftToRun_) {
328  }
329  }
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:402
std::exception_ptr cached_exception_
Definition: Worker.h:407
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:413
State edm::Worker::state ( ) const
inline

Definition at line 171 of file Worker.h.

Referenced by edm::WorkerInPath::checkResultsOfRunWorker().

171 { return state_; }
std::atomic< State > state_
Definition: Worker.h:400
int edm::Worker::timesExcept ( ) const
inline

Definition at line 170 of file Worker.h.

Referenced by edm::fillWorkerSummaryAux().

170 { return timesExcept_.load(std::memory_order_acquire); }
std::atomic< int > timesExcept_
Definition: Worker.h:399
int edm::Worker::timesFailed ( ) const
inline

Definition at line 169 of file Worker.h.

Referenced by edm::fillWorkerSummaryAux().

169 { return timesFailed_.load(std::memory_order_acquire); }
std::atomic< int > timesFailed_
Definition: Worker.h:398
int edm::Worker::timesPass ( ) const
inline

Definition at line 173 of file Worker.h.

173 { return timesPassed(); } // for backward compatibility only - to be removed soon
int timesPassed() const
Definition: Worker.h:168
int edm::Worker::timesPassed ( ) const
inline

Definition at line 168 of file Worker.h.

Referenced by edm::fillWorkerSummaryAux().

168 { return timesPassed_.load(std::memory_order_acquire); }
std::atomic< int > timesPassed_
Definition: Worker.h:397
int edm::Worker::timesRun ( ) const
inline

Definition at line 166 of file Worker.h.

Referenced by edm::fillWorkerSummaryAux().

166 { return timesRun_.load(std::memory_order_acquire); }
std::atomic< int > timesRun_
Definition: Worker.h:395
int edm::Worker::timesVisited ( ) const
inline

Definition at line 167 of file Worker.h.

Referenced by edm::fillWorkerSummaryAux().

167 { return timesVisited_.load(std::memory_order_acquire); }
std::atomic< int > timesVisited_
Definition: Worker.h:396
virtual void edm::Worker::updateLookup ( BranchType  iBranchType,
ProductResolverIndexHelper const &   
)
pure virtual

Implemented in edm::WorkerT< T >.

Referenced by edm::Schedule::changeModule().

virtual std::string edm::Worker::workerType ( ) const
protectedpure virtual

Implemented in edm::WorkerT< T >.

Friends And Related Function Documentation

template<typename O >
friend class workerhelper::CallImpl
friend

Definition at line 176 of file Worker.h.

Member Data Documentation

ExceptionToActionTable const* edm::Worker::actions_
private

Definition at line 406 of file Worker.h.

Referenced by shouldRethrowException().

std::shared_ptr<ActivityRegistry> edm::Worker::actReg_
private

Definition at line 409 of file Worker.h.

Referenced by beginJob(), beginStream(), endJob(), endStream(), prefetchAsync(), and setActivityRegistry().

std::exception_ptr edm::Worker::cached_exception_
private

Definition at line 407 of file Worker.h.

Referenced by skipOnPath().

edm::propagate_const<EarlyDeleteHelper*> edm::Worker::earlyDeleteHelper_
private

Definition at line 411 of file Worker.h.

Referenced by postDoEvent(), and setEarlyDeleteHelper().

ModuleCallingContext edm::Worker::moduleCallingContext_
private

Definition at line 404 of file Worker.h.

Referenced by beginStream(), endStream(), prefetchAsync(), and resetModuleDescription().

std::atomic<int> edm::Worker::numberOfPathsLeftToRun_
private

Definition at line 402 of file Worker.h.

Referenced by skipOnPath().

int edm::Worker::numberOfPathsOn_
private

Definition at line 401 of file Worker.h.

std::atomic<State> edm::Worker::state_
private

Definition at line 400 of file Worker.h.

Referenced by beginJob(), beginStream(), endJob(), and endStream().

std::atomic<int> edm::Worker::timesExcept_
private

Definition at line 399 of file Worker.h.

std::atomic<int> edm::Worker::timesFailed_
private

Definition at line 398 of file Worker.h.

std::atomic<int> edm::Worker::timesPassed_
private

Definition at line 397 of file Worker.h.

std::atomic<int> edm::Worker::timesRun_
private

Definition at line 395 of file Worker.h.

std::atomic<int> edm::Worker::timesVisited_
private

Definition at line 396 of file Worker.h.

edm::WaitingTaskList edm::Worker::waitingTasks_
private

Definition at line 413 of file Worker.h.

Referenced by skipOnPath().

std::atomic<bool> edm::Worker::workStarted_
private

Definition at line 414 of file Worker.h.