CMS 3D CMS Logo

List of all members | Classes | Public Types | Public Member Functions | Protected Member Functions | Private Member Functions | Static Private Member Functions | Private Attributes | Friends
edm::Worker Class Referenceabstract

#include <Worker.h>

Inheritance diagram for edm::Worker:
edm::WorkerT< T >

Classes

class  RunModuleTask
 
class  TransitionIDValue
 
class  TransitionIDValueBase
 

Public Types

enum  State { Ready, Pass, Fail, Exception }
 
enum  Types { kAnalyzer, kFilter, kProducer, kOutputModule }
 

Public Member Functions

void addedToPath ()
 
void beginJob ()
 
void beginStream (StreamID id, StreamContext &streamContext)
 
void callWhenDoneAsync (WaitingTask *task)
 
void clearCounters ()
 
virtual std::vector< ConsumesInfoconsumesInfo () const =0
 
virtual void convertCurrentProcessAlias (std::string const &processName)=0
 
ModuleDescription const * descPtr () const
 
ModuleDescription const & description () const
 
template<typename T >
bool doWork (typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
 
template<typename T >
void doWorkAsync (WaitingTask *task, typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
 
template<typename T >
void doWorkNoPrefetchingAsync (WaitingTask *task, typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
 
void endJob ()
 
void endStream (StreamID id, StreamContext &streamContext)
 
virtual void modulesWhoseProductsAreConsumed (std::vector< ModuleDescription const * > &modules, ProductRegistry const &preg, std::map< std::string, ModuleDescription const * > const &labelsToDesc) const =0
 
virtual Types moduleType () const =0
 
Workeroperator= (Worker const &)=delete
 
void postDoEvent (EventPrincipal const &)
 
void registerThinnedAssociations (ProductRegistry const &registry, ThinnedAssociationsHelper &helper)
 
void reset ()
 
virtual void resolvePutIndicies (BranchType iBranchType, std::unordered_multimap< std::string, edm::ProductResolverIndex > const &iIndicies)=0
 
void respondToCloseInputFile (FileBlock const &fb)
 
void respondToOpenInputFile (FileBlock const &fb)
 
template<typename T >
std::exception_ptr runModuleDirectly (typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
 
void setActivityRegistry (std::shared_ptr< ActivityRegistry > areg)
 
void setEarlyDeleteHelper (EarlyDeleteHelper *iHelper)
 
void skipOnPath ()
 
State state () const
 
int timesExcept () const
 
int timesFailed () const
 
int timesPass () const
 
int timesPassed () const
 
int timesRun () const
 
int timesVisited () const
 
virtual void updateLookup (BranchType iBranchType, ProductResolverIndexHelper const &)=0
 
 Worker (ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
 
 Worker (Worker const &)=delete
 
virtual ~Worker ()
 

Protected Member Functions

ActivityRegistryactivityRegistry ()
 
virtual void implBeginJob ()=0
 
virtual void implBeginStream (StreamID)=0
 
virtual bool implDo (EventPrincipal const &, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoBegin (RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoBegin (LuminosityBlockPrincipal const &lbp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoEnd (RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoEnd (LuminosityBlockPrincipal const &lbp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoPrePrefetchSelection (StreamID id, EventPrincipal const &ep, ModuleCallingContext const *mcc)=0
 
virtual bool implDoStreamBegin (StreamID id, RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoStreamBegin (StreamID id, LuminosityBlockPrincipal const &lbp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoStreamEnd (StreamID id, RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoStreamEnd (StreamID id, LuminosityBlockPrincipal const &lbp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual void implEndJob ()=0
 
virtual void implEndStream (StreamID)=0
 
void resetModuleDescription (ModuleDescription const *)
 
virtual std::string workerType () const =0
 

Private Member Functions

void emitPostModuleEventPrefetchingSignal ()
 
virtual void implRegisterThinnedAssociations (ProductRegistry const &, ThinnedAssociationsHelper &)=0
 
virtual void implRespondToCloseInputFile (FileBlock const &fb)=0
 
virtual void implRespondToOpenInputFile (FileBlock const &fb)=0
 
virtual void itemsMayGet (BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
 
virtual std::vector< ProductResolverIndex > const & itemsShouldPutInEvent () const =0
 
virtual void itemsToGet (BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
 
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom (BranchType) const =0
 
virtual void preActionBeforeRunEventAsync (WaitingTask *iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const =0
 
void prefetchAsync (WaitingTask *, ParentContext const &parentContext, Principal const &)
 
template<typename T >
bool runModule (typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
 
template<typename T >
std::exception_ptr runModuleAfterAsyncPrefetch (std::exception_ptr const *iEPtr, typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
 
virtual SerialTaskQueueChainserializeRunModule ()=0
 
template<bool IS_EVENT>
std::exception_ptr setException (std::exception_ptr iException)
 
template<bool IS_EVENT>
bool setFailed ()
 
template<bool IS_EVENT>
bool setPassed ()
 
bool shouldRethrowException (std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
 

Static Private Member Functions

static void exceptionContext (cms::Exception &ex, ModuleCallingContext const *mcc)
 

Private Attributes

ExceptionToActionTable const * actions_
 
std::shared_ptr< ActivityRegistryactReg_
 
std::exception_ptr cached_exception_
 
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
 
ModuleCallingContext moduleCallingContext_
 
std::atomic< int > numberOfPathsLeftToRun_
 
int numberOfPathsOn_
 
std::atomic< Statestate_
 
std::atomic< int > timesExcept_
 
std::atomic< int > timesFailed_
 
std::atomic< int > timesPassed_
 
std::atomic< int > timesRun_
 
std::atomic< int > timesVisited_
 
edm::WaitingTaskList waitingTasks_
 
std::atomic< bool > workStarted_
 

Friends

template<typename O >
class workerhelper::CallImpl
 

Detailed Description

Definition at line 77 of file Worker.h.

Member Enumeration Documentation

Enumerator
Ready 
Pass 
Fail 
Exception 

Definition at line 79 of file Worker.h.

Enumerator
kAnalyzer 
kFilter 
kProducer 
kOutputModule 

Definition at line 80 of file Worker.h.

Constructor & Destructor Documentation

edm::Worker::Worker ( ModuleDescription const &  iMD,
ExceptionToActionTable const *  iActions 
)

Definition at line 72 of file Worker.cc.

73  :
74  timesRun_(0),
75  timesVisited_(0),
76  timesPassed_(0),
77  timesFailed_(0),
78  timesExcept_(0),
79  state_(Ready),
83  actions_(iActions),
85  actReg_(),
86  earlyDeleteHelper_(nullptr),
87  workStarted_(false)
88  {
89  }
std::atomic< int > timesVisited_
Definition: Worker.h:401
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:407
std::atomic< int > timesExcept_
Definition: Worker.h:404
std::atomic< bool > workStarted_
Definition: Worker.h:419
std::atomic< int > timesFailed_
Definition: Worker.h:403
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:414
ExceptionToActionTable const * actions_
Definition: Worker.h:411
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:409
std::exception_ptr cached_exception_
Definition: Worker.h:412
std::atomic< State > state_
Definition: Worker.h:405
int numberOfPathsOn_
Definition: Worker.h:406
std::atomic< int > timesPassed_
Definition: Worker.h:402
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:416
std::atomic< int > timesRun_
Definition: Worker.h:400
edm::Worker::~Worker ( )
virtual

Definition at line 91 of file Worker.cc.

91  {
92  }
edm::Worker::Worker ( Worker const &  )
delete

Member Function Documentation

ActivityRegistry* edm::Worker::activityRegistry ( )
inlineprotected

Definition at line 214 of file Worker.h.

References edm::exceptionContext(), and benchmark_cfg::fb.

Referenced by edm::WorkerT< T >::implDo().

214 { return actReg_.get(); }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:414
void edm::Worker::addedToPath ( )
inline

Definition at line 170 of file Worker.h.

Referenced by edm::WorkerInPath::WorkerInPath().

170  {
172  }
int numberOfPathsOn_
Definition: Worker.h:406
void edm::Worker::beginJob ( void  )

Definition at line 245 of file Worker.cc.

References actReg_, cms::Exception::addContext(), description(), Exception, implBeginJob(), edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), state_, and edm::convertException::wrap().

Referenced by edm::WorkerManager::beginJob(), and edm::GlobalSchedule::replaceModule().

245  {
246  try {
247  convertException::wrap([&]() {
248  ModuleBeginJobSignalSentry cpp(actReg_.get(), description());
249  implBeginJob();
250  });
251  }
252  catch(cms::Exception& ex) {
253  state_ = Exception;
254  std::ostringstream ost;
255  ost << "Calling beginJob for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
256  ex.addContext(ost.str());
257  throw;
258  }
259  }
ModuleDescription const & description() const
Definition: Worker.h:138
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:414
std::string const & moduleName() const
std::string const & moduleLabel() const
std::atomic< State > state_
Definition: Worker.h:405
virtual void implBeginJob()=0
void addContext(std::string const &context)
Definition: Exception.cc:227
auto wrap(F iFunc) -> decltype(iFunc())
void edm::Worker::beginStream ( StreamID  id,
StreamContext streamContext 
)

Definition at line 277 of file Worker.cc.

References actReg_, cms::Exception::addContext(), description(), Exception, implBeginStream(), edm::LuminosityBlockIndex::invalidLuminosityBlockIndex(), edm::RunIndex::invalidRunIndex(), edm::StreamContext::kBeginStream, edm::ModuleCallingContext::kRunning, moduleCallingContext_, edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), edm::StreamContext::setEventID(), edm::StreamContext::setLuminosityBlockIndex(), edm::StreamContext::setRunIndex(), edm::ModuleCallingContext::setState(), edm::StreamContext::setTimestamp(), edm::StreamContext::setTransition(), state_, and edm::convertException::wrap().

Referenced by edm::StreamSchedule::replaceModule().

277  {
278  try {
279  convertException::wrap([&]() {
280  streamContext.setTransition(StreamContext::Transition::kBeginStream);
281  streamContext.setEventID(EventID(0, 0, 0));
282  streamContext.setRunIndex(RunIndex::invalidRunIndex());
283  streamContext.setLuminosityBlockIndex(LuminosityBlockIndex::invalidLuminosityBlockIndex());
284  streamContext.setTimestamp(Timestamp());
285  ParentContext parentContext(&streamContext);
286  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
288  ModuleBeginStreamSignalSentry beginSentry(actReg_.get(), streamContext, moduleCallingContext_);
289  implBeginStream(id);
290  });
291  }
292  catch(cms::Exception& ex) {
293  state_ = Exception;
294  std::ostringstream ost;
295  ost << "Calling beginStream for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
296  ex.addContext(ost.str());
297  throw;
298  }
299  }
ModuleDescription const & description() const
Definition: Worker.h:138
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:414
std::string const & moduleName() const
std::string const & moduleLabel() const
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:409
static RunIndex invalidRunIndex()
Definition: RunIndex.cc:9
std::atomic< State > state_
Definition: Worker.h:405
static LuminosityBlockIndex invalidLuminosityBlockIndex()
void addContext(std::string const &context)
Definition: Exception.cc:227
virtual void implBeginStream(StreamID)=0
auto wrap(F iFunc) -> decltype(iFunc())
void edm::Worker::callWhenDoneAsync ( WaitingTask task)
inline

Definition at line 115 of file Worker.h.

References bk::beginJob().

115  {
116  waitingTasks_.add(task);
117  }
void add(WaitingTask *)
Adds task to the waiting list.
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:418
void edm::Worker::clearCounters ( )
inline

Definition at line 162 of file Worker.h.

Referenced by edm::StreamSchedule::clearCounters().

162  {
163  timesRun_.store(0,std::memory_order_release);
164  timesVisited_.store(0,std::memory_order_release);
165  timesPassed_.store(0,std::memory_order_release);
166  timesFailed_.store(0,std::memory_order_release);
167  timesExcept_.store(0,std::memory_order_release);
168  }
std::atomic< int > timesVisited_
Definition: Worker.h:401
std::atomic< int > timesExcept_
Definition: Worker.h:404
std::atomic< int > timesFailed_
Definition: Worker.h:403
std::atomic< int > timesPassed_
Definition: Worker.h:402
std::atomic< int > timesRun_
Definition: Worker.h:400
virtual std::vector<ConsumesInfo> edm::Worker::consumesInfo ( ) const
pure virtual
virtual void edm::Worker::convertCurrentProcessAlias ( std::string const &  processName)
pure virtual

Implemented in edm::WorkerT< T >.

ModuleDescription const* edm::Worker::descPtr ( ) const
inline

Definition at line 139 of file Worker.h.

References modifiedElectrons_cfi::processName, and AlCaHLTBitMon_QueryRunRegistry::string.

ModuleCallingContext moduleCallingContext_
Definition: Worker.h:409
ModuleDescription const * moduleDescription() const
ModuleDescription const& edm::Worker::description ( ) const
inline
template<typename T >
bool edm::Worker::doWork ( typename T::MyPrincipal const &  ep,
EventSetup const &  c,
StreamID  stream,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

Definition at line 718 of file Worker.h.

References Fail, edm::ServiceRegistry::instance(), edm::ModuleCallingContext::kInvalid, edm::ModuleCallingContext::kPrefetching, edm::make_empty_waiting_task(), Pass, edm::ServiceRegistry::presentToken(), mps_fire::queue, Ready, and edm::ModuleCallingContext::setContext().

Referenced by edm::WorkerInPath::runWorker().

722  {
723 
724  if (T::isEvent_) {
725  timesVisited_.fetch_add(1,std::memory_order_relaxed);
726  }
727  bool rc = false;
728 
729  switch(state_) {
730  case Ready: break;
731  case Pass: return true;
732  case Fail: return false;
733  case Exception: {
734  std::rethrow_exception(cached_exception_);
735  }
736  }
737 
738  bool expected = false;
739  if(not workStarted_.compare_exchange_strong(expected, true) ) {
740  //another thread beat us here
741  auto waitTask = edm::make_empty_waiting_task();
742  waitTask->increment_ref_count();
743 
744  waitingTasks_.add(waitTask.get());
745 
746  waitTask->wait_for_all();
747 
748  switch(state_) {
749  case Ready: assert(false);
750  case Pass: return true;
751  case Fail: return false;
752  case Exception: {
753  std::rethrow_exception(cached_exception_);
754  }
755  }
756  }
757 
758  //Need the context to be set until after any exception is resolved
760 
761  auto resetContext = [](ModuleCallingContext* iContext) {iContext->setContext(ModuleCallingContext::State::kInvalid,ParentContext(),nullptr); };
762  std::unique_ptr<ModuleCallingContext, decltype(resetContext)> prefetchSentry(&moduleCallingContext_,resetContext);
763 
764  if (T::isEvent_) {
765  try {
766  //if have TriggerResults based selection we want to reject the event before doing prefetching
767  if( not workerhelper::CallImpl<T>::prePrefetchSelection(this,streamID,ep,&moduleCallingContext_) ) {
768  timesRun_.fetch_add(1,std::memory_order_relaxed);
769  rc = setPassed<T::isEvent_>();
770  waitingTasks_.doneWaiting(nullptr);
771  return true;
772  }
773  }catch(...) {}
774  auto waitTask = edm::make_empty_waiting_task();
775  {
776  //Make sure signal is sent once the prefetching is done
777  // [the 'pre' signal was sent in prefetchAsync]
778  //The purpose of this block is to send the signal after wait_for_all
779  auto sentryFunc = [this](void*) {
781  };
782  std::unique_ptr<ActivityRegistry, decltype(sentryFunc)> signalSentry(actReg_.get(),sentryFunc);
783 
784  //set count to 2 since wait_for_all requires value to not go to 0
785  waitTask->set_ref_count(2);
786 
787  prefetchAsync(waitTask.get(),parentContext, ep);
788  waitTask->decrement_ref_count();
789  waitTask->wait_for_all();
790  }
791  if(waitTask->exceptionPtr() != nullptr) {
792  TransitionIDValue<typename T::MyPrincipal> idValue(ep);
793  if(shouldRethrowException(*waitTask->exceptionPtr(), parentContext, T::isEvent_, idValue)) {
794  setException<T::isEvent_>(*waitTask->exceptionPtr());
796  std::rethrow_exception(cached_exception_);
797  } else {
798  setPassed<T::isEvent_>();
799  waitingTasks_.doneWaiting(nullptr);
800  return true;
801  }
802  }
803  }
804 
805  //successful prefetch so no reset necessary
806  prefetchSentry.release();
807  if(auto queue = serializeRunModule()) {
808  auto serviceToken = ServiceRegistry::instance().presentToken();
809  queue->pushAndWait([&]() {
810  //Need to make the services available
811  ServiceRegistry::Operate guard(serviceToken);
812  try {
813  rc = runModule<T>(ep,es,streamID,parentContext,context);
814  } catch(...) {
815  }
816  });
817  } else {
818  try {
819  rc = runModule<T>(ep,es,streamID,parentContext,context);
820  } catch(...) {
821  }
822  }
823  if(state_ == Exception) {
825  std::rethrow_exception(cached_exception_);
826  }
827 
828  waitingTasks_.doneWaiting(nullptr);
829  return rc;
830  }
std::atomic< int > timesVisited_
Definition: Worker.h:401
std::atomic< bool > workStarted_
Definition: Worker.h:419
void add(WaitingTask *)
Adds task to the waiting list.
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:414
virtual SerialTaskQueueChain * serializeRunModule()=0
ServiceToken presentToken() const
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:409
std::exception_ptr cached_exception_
Definition: Worker.h:412
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
Definition: Worker.cc:157
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:418
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
static ServiceRegistry & instance()
std::atomic< State > state_
Definition: Worker.h:405
void prefetchAsync(WaitingTask *, ParentContext const &parentContext, Principal const &)
Definition: Worker.cc:205
void emitPostModuleEventPrefetchingSignal()
Definition: Worker.h:306
std::atomic< int > timesRun_
Definition: Worker.h:400
template<typename T >
void edm::Worker::doWorkAsync ( WaitingTask task,
typename T::MyPrincipal const &  ep,
EventSetup const &  c,
StreamID  stream,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

Definition at line 620 of file Worker.h.

References edm::ModuleCallingContext::kPrefetching, and edm::ModuleCallingContext::setContext().

Referenced by edm::WorkerInPath::runWorkerAsync().

625  {
626  waitingTasks_.add(task);
627  if(T::isEvent_) {
628  timesVisited_.fetch_add(1,std::memory_order_relaxed);
629  }
630 
631  bool expected = false;
632  if(workStarted_.compare_exchange_strong(expected,true)) {
634 
635  //if have TriggerResults based selection we want to reject the event before doing prefetching
636  try {
637  if( not workerhelper::CallImpl<T>::prePrefetchSelection(this,streamID,ep,&moduleCallingContext_) ) {
638  setPassed<T::isEvent_>();
639  waitingTasks_.doneWaiting(nullptr);
640  return;
641  }
642  }catch(...) {}
643 
644  auto runTask = new (tbb::task::allocate_root()) RunModuleTask<T>(
645  this, ep,es,streamID,parentContext,context);
646  prefetchAsync(runTask, parentContext, ep);
647  }
648  }
std::atomic< int > timesVisited_
Definition: Worker.h:401
std::atomic< bool > workStarted_
Definition: Worker.h:419
void add(WaitingTask *)
Adds task to the waiting list.
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:409
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:418
void prefetchAsync(WaitingTask *, ParentContext const &parentContext, Principal const &)
Definition: Worker.cc:205
template<typename T >
void edm::Worker::doWorkNoPrefetchingAsync ( WaitingTask task,
typename T::MyPrincipal const &  principal,
EventSetup const &  c,
StreamID  stream,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

Definition at line 680 of file Worker.h.

References edm::ServiceRegistry::instance(), edm::make_functor_task(), edm::ServiceRegistry::presentToken(), cmsRelvalreport::principal(), and mps_fire::queue.

Referenced by edm::WorkerInPath::runWorkerAsync().

685  {
686  waitingTasks_.add(task);
687  bool expected = false;
688  if(workStarted_.compare_exchange_strong(expected,true)) {
689  auto serviceToken = ServiceRegistry::instance().presentToken();
690 
691  auto toDo =[this, &principal, &es, streamID,parentContext,context, serviceToken]()
692  {
693  std::exception_ptr exceptionPtr;
694  try {
695  //Need to make the services available
696  ServiceRegistry::Operate guard(serviceToken);
697 
698  this->runModule<T>(principal,
699  es,
700  streamID,
701  parentContext,
702  context);
703  } catch( ... ) {
704  exceptionPtr = std::current_exception();
705  }
706  this->waitingTasks_.doneWaiting(exceptionPtr);
707  };
708  if(auto queue = this->serializeRunModule()) {
709  queue->push( toDo);
710  } else {
711  auto task = make_functor_task( tbb::task::allocate_root(), toDo);
712  tbb::task::spawn(*task);
713  }
714  }
715  }
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
std::atomic< bool > workStarted_
Definition: Worker.h:419
void add(WaitingTask *)
Adds task to the waiting list.
virtual SerialTaskQueueChain * serializeRunModule()=0
ServiceToken presentToken() const
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
def principal(options)
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:418
static ServiceRegistry & instance()
void edm::Worker::emitPostModuleEventPrefetchingSignal ( )
inlineprivate

Definition at line 306 of file Worker.h.

306  {
307  actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),moduleCallingContext_);
308  }
StreamContext const * getStreamContext() const
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:414
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:409
void edm::Worker::endJob ( void  )

Definition at line 261 of file Worker.cc.

References actReg_, cms::Exception::addContext(), description(), Exception, implEndJob(), edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), state_, and edm::convertException::wrap().

261  {
262  try {
263  convertException::wrap([&]() {
264  ModuleEndJobSignalSentry cpp(actReg_.get(), description());
265  implEndJob();
266  });
267  }
268  catch(cms::Exception& ex) {
269  state_ = Exception;
270  std::ostringstream ost;
271  ost << "Calling endJob for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
272  ex.addContext(ost.str());
273  throw;
274  }
275  }
ModuleDescription const & description() const
Definition: Worker.h:138
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:414
std::string const & moduleName() const
std::string const & moduleLabel() const
std::atomic< State > state_
Definition: Worker.h:405
virtual void implEndJob()=0
void addContext(std::string const &context)
Definition: Exception.cc:227
auto wrap(F iFunc) -> decltype(iFunc())
void edm::Worker::endStream ( StreamID  id,
StreamContext streamContext 
)

Definition at line 301 of file Worker.cc.

References actReg_, cms::Exception::addContext(), description(), Exception, implEndStream(), edm::LuminosityBlockIndex::invalidLuminosityBlockIndex(), edm::RunIndex::invalidRunIndex(), edm::StreamContext::kEndStream, edm::ModuleCallingContext::kRunning, moduleCallingContext_, edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), edm::StreamContext::setEventID(), edm::StreamContext::setLuminosityBlockIndex(), edm::StreamContext::setRunIndex(), edm::ModuleCallingContext::setState(), edm::StreamContext::setTimestamp(), edm::StreamContext::setTransition(), state_, and edm::convertException::wrap().

301  {
302  try {
303  convertException::wrap([&]() {
304  streamContext.setTransition(StreamContext::Transition::kEndStream);
305  streamContext.setEventID(EventID(0, 0, 0));
306  streamContext.setRunIndex(RunIndex::invalidRunIndex());
307  streamContext.setLuminosityBlockIndex(LuminosityBlockIndex::invalidLuminosityBlockIndex());
308  streamContext.setTimestamp(Timestamp());
309  ParentContext parentContext(&streamContext);
310  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
312  ModuleEndStreamSignalSentry endSentry(actReg_.get(), streamContext, moduleCallingContext_);
313  implEndStream(id);
314  });
315  }
316  catch(cms::Exception& ex) {
317  state_ = Exception;
318  std::ostringstream ost;
319  ost << "Calling endStream for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
320  ex.addContext(ost.str());
321  throw;
322  }
323  }
ModuleDescription const & description() const
Definition: Worker.h:138
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:414
std::string const & moduleName() const
std::string const & moduleLabel() const
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:409
static RunIndex invalidRunIndex()
Definition: RunIndex.cc:9
std::atomic< State > state_
Definition: Worker.h:405
static LuminosityBlockIndex invalidLuminosityBlockIndex()
void addContext(std::string const &context)
Definition: Exception.cc:227
virtual void implEndStream(StreamID)=0
auto wrap(F iFunc) -> decltype(iFunc())
void edm::Worker::exceptionContext ( cms::Exception ex,
ModuleCallingContext const *  mcc 
)
staticprivate

Definition at line 99 of file Worker.cc.

References cms::Exception::addContext(), edm::exceptionContext(), edm::ModuleCallingContext::globalContext(), edm::ModuleCallingContext::internalContext(), edm::ParentContext::kGlobal, edm::ParentContext::kInternal, edm::ParentContext::kModule, edm::ParentContext::kPlaceInPath, edm::ModuleCallingContext::kPrefetching, edm::ParentContext::kStream, edm::InternalContext::moduleCallingContext(), edm::ModuleCallingContext::moduleCallingContext(), edm::ModuleCallingContext::moduleDescription(), edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), or, edm::PlaceInPathContext::pathContext(), edm::PathContext::pathName(), edm::ModuleCallingContext::placeInPathContext(), edm::ModuleCallingContext::state(), edm::PathContext::streamContext(), edm::ModuleCallingContext::streamContext(), and edm::ModuleCallingContext::type().

101  {
102 
103  ModuleCallingContext const* imcc = mcc;
104  while( (imcc->type() == ParentContext::Type::kModule) or
105  (imcc->type() == ParentContext::Type::kInternal) ) {
106  std::ostringstream iost;
107  if( imcc->state() == ModuleCallingContext::State::kPrefetching ) {
108  iost << "Prefetching for module ";
109  } else {
110  iost << "Calling method for module ";
111  }
112  iost << imcc->moduleDescription()->moduleName() << "/'"
113  << imcc->moduleDescription()->moduleLabel() << "'";
114 
115  if(imcc->type() == ParentContext::Type::kInternal) {
116  iost << " (probably inside some kind of mixing module)";
117  imcc = imcc->internalContext()->moduleCallingContext();
118  } else {
119  imcc = imcc->moduleCallingContext();
120  }
121  ex.addContext(iost.str());
122  }
123  std::ostringstream ost;
124  if( imcc->state() == ModuleCallingContext::State::kPrefetching ) {
125  ost << "Prefetching for module ";
126  } else {
127  ost << "Calling method for module ";
128  }
129  ost << imcc->moduleDescription()->moduleName() << "/'"
130  << imcc->moduleDescription()->moduleLabel() << "'";
131  ex.addContext(ost.str());
132 
133  if (imcc->type() == ParentContext::Type::kPlaceInPath) {
134  ost.str("");
135  ost << "Running path '";
136  ost << imcc->placeInPathContext()->pathContext()->pathName() << "'";
137  ex.addContext(ost.str());
138  auto streamContext =imcc->placeInPathContext()->pathContext()->streamContext();
139  if(streamContext) {
140  ost.str("");
141  edm::exceptionContext(ost,*streamContext);
142  ex.addContext(ost.str());
143  }
144  } else {
145  if (imcc->type() == ParentContext::Type::kStream) {
146  ost.str("");
147  edm::exceptionContext(ost, *(imcc->streamContext()) );
148  ex.addContext(ost.str());
149  } else if(imcc->type() == ParentContext::Type::kGlobal) {
150  ost.str("");
151  edm::exceptionContext(ost, *(imcc->globalContext()) );
152  ex.addContext(ost.str());
153  }
154  }
155  }
void exceptionContext(std::ostream &, GlobalContext const &)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
void addContext(std::string const &context)
Definition: Exception.cc:227
virtual void edm::Worker::implBeginJob ( )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by beginJob().

virtual void edm::Worker::implBeginStream ( StreamID  )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by beginStream().

virtual bool edm::Worker::implDo ( EventPrincipal const &  ,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual
virtual bool edm::Worker::implDoBegin ( RunPrincipal const &  rp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual
virtual bool edm::Worker::implDoBegin ( LuminosityBlockPrincipal const &  lbp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

virtual bool edm::Worker::implDoEnd ( RunPrincipal const &  rp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual
virtual bool edm::Worker::implDoEnd ( LuminosityBlockPrincipal const &  lbp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

virtual bool edm::Worker::implDoPrePrefetchSelection ( StreamID  id,
EventPrincipal const &  ep,
ModuleCallingContext const *  mcc 
)
protectedpure virtual
virtual bool edm::Worker::implDoStreamBegin ( StreamID  id,
RunPrincipal const &  rp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual
virtual bool edm::Worker::implDoStreamBegin ( StreamID  id,
LuminosityBlockPrincipal const &  lbp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

virtual bool edm::Worker::implDoStreamEnd ( StreamID  id,
RunPrincipal const &  rp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual
virtual bool edm::Worker::implDoStreamEnd ( StreamID  id,
LuminosityBlockPrincipal const &  lbp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::implEndJob ( )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by endJob().

virtual void edm::Worker::implEndStream ( StreamID  )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by endStream().

virtual void edm::Worker::implRegisterThinnedAssociations ( ProductRegistry const &  ,
ThinnedAssociationsHelper  
)
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::implRespondToCloseInputFile ( FileBlock const &  fb)
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::implRespondToOpenInputFile ( FileBlock const &  fb)
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::itemsMayGet ( BranchType  ,
std::vector< ProductResolverIndexAndSkipBit > &   
) const
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual std::vector<ProductResolverIndex> const& edm::Worker::itemsShouldPutInEvent ( ) const
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::itemsToGet ( BranchType  ,
std::vector< ProductResolverIndexAndSkipBit > &   
) const
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual std::vector<ProductResolverIndexAndSkipBit> const& edm::Worker::itemsToGetFrom ( BranchType  ) const
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by prefetchAsync().

virtual void edm::Worker::modulesWhoseProductsAreConsumed ( std::vector< ModuleDescription const * > &  modules,
ProductRegistry const &  preg,
std::map< std::string, ModuleDescription const * > const &  labelsToDesc 
) const
pure virtual

Implemented in edm::WorkerT< T >.

virtual Types edm::Worker::moduleType ( ) const
pure virtual
Worker& edm::Worker::operator= ( Worker const &  )
delete
void edm::Worker::postDoEvent ( EventPrincipal const &  iEvent)

Definition at line 331 of file Worker.cc.

References earlyDeleteHelper_.

Referenced by edm::WorkerT< T >::implDo().

331  {
332  if(earlyDeleteHelper_) {
333  earlyDeleteHelper_->moduleRan(iEvent);
334  }
335  }
int iEvent
Definition: GenABIO.cc:230
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:416
virtual void edm::Worker::preActionBeforeRunEventAsync ( WaitingTask iTask,
ModuleCallingContext const &  moduleCallingContext,
Principal const &  iPrincipal 
) const
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by prefetchAsync().

void edm::Worker::prefetchAsync ( WaitingTask iTask,
ParentContext const &  parentContext,
Principal const &  iPrincipal 
)
private

Definition at line 205 of file Worker.cc.

References actReg_, edm::Principal::branchType(), edm::ModuleCallingContext::getStreamContext(), edm::InEvent, mps_monitormerge::items, itemsToGetFrom(), edm::ModuleCallingContext::kPrefetching, moduleCallingContext_, preActionBeforeRunEventAsync(), edm::Principal::prefetchAsync(), edm::ProductResolverIndexAmbiguous, and edm::ModuleCallingContext::setContext().

205  {
206  // Prefetch products the module declares it consumes (not including the products it maybe consumes)
207  std::vector<ProductResolverIndexAndSkipBit> const& items = itemsToGetFrom(iPrincipal.branchType());
208 
210 
211  if(iPrincipal.branchType()==InEvent) {
212  actReg_->preModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),moduleCallingContext_);
213  }
214 
215  //Need to be sure the ref count isn't set to 0 immediately
216  iTask->increment_ref_count();
217  for(auto const& item : items) {
218  ProductResolverIndex productResolverIndex = item.productResolverIndex();
219  bool skipCurrentProcess = item.skipCurrentProcess();
220  if(productResolverIndex != ProductResolverIndexAmbiguous) {
221  iPrincipal.prefetchAsync(iTask,productResolverIndex, skipCurrentProcess, &moduleCallingContext_);
222  }
223  }
224 
225  if(iPrincipal.branchType()==InEvent) {
227  }
228 
229  if(0 == iTask->decrement_ref_count()) {
230  //if everything finishes before we leave this routine, we need to launch the task
231  tbb::task::spawn(*iTask);
232  }
233  }
unsigned int ProductResolverIndex
StreamContext const * getStreamContext() const
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:414
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:409
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom(BranchType) const =0
virtual void preActionBeforeRunEventAsync(WaitingTask *iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const =0
void edm::Worker::registerThinnedAssociations ( ProductRegistry const &  registry,
ThinnedAssociationsHelper helper 
)
inline

Definition at line 126 of file Worker.h.

Definition: helper.py:1
virtual void implRegisterThinnedAssociations(ProductRegistry const &, ThinnedAssociationsHelper &)=0
static Interceptor::Registry registry("Interceptor")
void edm::Worker::reset ( void  )
inline

Definition at line 128 of file Worker.h.

References Ready.

Referenced by edm::WorkerManager::resetAll().

128  {
129  cached_exception_ = std::exception_ptr();
130  state_ = Ready;
132  workStarted_ = false;
134  }
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:407
std::atomic< bool > workStarted_
Definition: Worker.h:419
void reset()
Resets access to the resource so that added tasks will wait.
std::exception_ptr cached_exception_
Definition: Worker.h:412
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:418
std::atomic< State > state_
Definition: Worker.h:405
int numberOfPathsOn_
Definition: Worker.h:406
void edm::Worker::resetModuleDescription ( ModuleDescription const *  iDesc)
protected
virtual void edm::Worker::resolvePutIndicies ( BranchType  iBranchType,
std::unordered_multimap< std::string, edm::ProductResolverIndex > const &  iIndicies 
)
pure virtual

Implemented in edm::WorkerT< T >.

void edm::Worker::respondToCloseInputFile ( FileBlock const &  fb)
inline

Definition at line 124 of file Worker.h.

Referenced by edm::Schedule::respondToCloseInputFile().

virtual void implRespondToCloseInputFile(FileBlock const &fb)=0
void edm::Worker::respondToOpenInputFile ( FileBlock const &  fb)
inline

Definition at line 123 of file Worker.h.

Referenced by edm::Schedule::respondToOpenInputFile().

virtual void implRespondToOpenInputFile(FileBlock const &fb)=0
template<typename T >
bool edm::Worker::runModule ( typename T::MyPrincipal const &  ep,
EventSetup const &  c,
StreamID  stream,
ParentContext const &  parentContext,
typename T::Context const *  context 
)
private

Definition at line 834 of file Worker.h.

References edm::exceptionContext(), edm::EventSetup::get(), and edm::convertException::wrap().

838  {
839  //unscheduled producers should advance this
840  //if (T::isEvent_) {
841  // ++timesVisited_;
842  //}
843  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
844  if (T::isEvent_) {
845  timesRun_.fetch_add(1,std::memory_order_relaxed);
846  }
847 
848  bool rc = true;
849  try {
851  {
852  rc = workerhelper::CallImpl<T>::call(this,streamID,ep,es, actReg_.get(), &moduleCallingContext_, context);
853 
854  if (rc) {
855  setPassed<T::isEvent_>();
856  } else {
857  setFailed<T::isEvent_>();
858  }
859  });
860  } catch(cms::Exception& ex) {
862  TransitionIDValue<typename T::MyPrincipal> idValue(ep);
863  if(shouldRethrowException(std::current_exception(), parentContext, T::isEvent_, idValue)) {
864  assert(not cached_exception_);
865  setException<T::isEvent_>(std::current_exception());
866  std::rethrow_exception(cached_exception_);
867  } else {
868  rc = setPassed<T::isEvent_>();
869  }
870  }
871 
872  return rc;
873  }
static void exceptionContext(cms::Exception &ex, ModuleCallingContext const *mcc)
Definition: Worker.cc:99
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:414
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:409
std::exception_ptr cached_exception_
Definition: Worker.h:412
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
Definition: Worker.cc:157
auto wrap(F iFunc) -> decltype(iFunc())
std::atomic< int > timesRun_
Definition: Worker.h:400
template<typename T >
std::exception_ptr edm::Worker::runModuleAfterAsyncPrefetch ( std::exception_ptr const *  iEPtr,
typename T::MyPrincipal const &  ep,
EventSetup const &  es,
StreamID  streamID,
ParentContext const &  parentContext,
typename T::Context const *  context 
)
private

Definition at line 651 of file Worker.h.

References edm::ModuleCallingContext::kInvalid, and edm::ModuleCallingContext::setContext().

Referenced by edm::Worker::RunModuleTask< T >::execute().

656  {
657  std::exception_ptr exceptionPtr;
658  if(iEPtr) {
659  assert(*iEPtr);
660  TransitionIDValue<typename T::MyPrincipal> idValue(ep);
661  if(shouldRethrowException(*iEPtr, parentContext, T::isEvent_, idValue)) {
662  exceptionPtr = *iEPtr;
663  setException<T::isEvent_>(exceptionPtr);
664  } else {
665  setPassed<T::isEvent_>();
666  }
668  } else {
669  try {
670  runModule<T>(ep,es,streamID,parentContext,context);
671  } catch(...) {
672  exceptionPtr = std::current_exception();
673  }
674  }
675  waitingTasks_.doneWaiting(exceptionPtr);
676  return exceptionPtr;
677  }
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:409
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
Definition: Worker.cc:157
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:418
template<typename T >
std::exception_ptr edm::Worker::runModuleDirectly ( typename T::MyPrincipal const &  ep,
EventSetup const &  es,
StreamID  streamID,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

Definition at line 876 of file Worker.h.

Referenced by edm::Path::finished().

880  {
881 
882  timesVisited_.fetch_add(1,std::memory_order_relaxed);
883  std::exception_ptr const* prefetchingException = nullptr; // null because there was no prefetching to do
884  return runModuleAfterAsyncPrefetch<T>(prefetchingException, ep, es, streamID, parentContext, context);
885  }
std::atomic< int > timesVisited_
Definition: Worker.h:401
virtual SerialTaskQueueChain* edm::Worker::serializeRunModule ( )
privatepure virtual
void edm::Worker::setActivityRegistry ( std::shared_ptr< ActivityRegistry areg)

The signals are required to live longer than the last call to 'doWork' this was done to improve performance based on profiling

Definition at line 94 of file Worker.cc.

References actReg_.

94  {
95  actReg_ = areg;
96  }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:414
void edm::Worker::setEarlyDeleteHelper ( EarlyDeleteHelper iHelper)

Definition at line 235 of file Worker.cc.

References earlyDeleteHelper_.

235  {
236  earlyDeleteHelper_=iHelper;
237  }
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:416
template<bool IS_EVENT>
std::exception_ptr edm::Worker::setException ( std::exception_ptr  iException)
inlineprivate

Definition at line 293 of file Worker.h.

References Exception.

293  {
294  if (IS_EVENT) {
295  timesExcept_.fetch_add(1,std::memory_order_relaxed);
296  }
297  cached_exception_ = iException; // propagate_const<T> has no reset() function
298  state_ = Exception;
299  return cached_exception_;
300  }
std::atomic< int > timesExcept_
Definition: Worker.h:404
std::exception_ptr cached_exception_
Definition: Worker.h:412
std::atomic< State > state_
Definition: Worker.h:405
template<bool IS_EVENT>
bool edm::Worker::setFailed ( )
inlineprivate

Definition at line 284 of file Worker.h.

References Fail.

284  {
285  if(IS_EVENT) {
286  timesFailed_.fetch_add(1,std::memory_order_relaxed);
287  }
288  state_ = Fail;
289  return false;
290  }
std::atomic< int > timesFailed_
Definition: Worker.h:403
std::atomic< State > state_
Definition: Worker.h:405
template<bool IS_EVENT>
bool edm::Worker::setPassed ( )
inlineprivate

Definition at line 275 of file Worker.h.

References Pass.

275  {
276  if(IS_EVENT) {
277  timesPassed_.fetch_add(1,std::memory_order_relaxed);
278  }
279  state_ = Pass;
280  return true;
281  }
std::atomic< State > state_
Definition: Worker.h:405
std::atomic< int > timesPassed_
Definition: Worker.h:402
bool edm::Worker::shouldRethrowException ( std::exception_ptr  iPtr,
ParentContext const &  parentContext,
bool  isEvent,
TransitionIDValueBase const &  iID 
) const
private

Definition at line 157 of file Worker.cc.

References mps_alisetup::action, actions_, cms::Exception::category(), description(), edm::exception_actions::FailPath, edm::ExceptionToActionTable::find(), edm::ModuleCallingContext::getTopModuleCallingContext(), edm::exception_actions::IgnoreCompletely, edm::PathContext::isEndPath(), edm::ModuleCallingContext::kInvalid, edm::ParentContext::kPlaceInPath, edm::PlaceInPathContext::pathContext(), edm::ModuleCallingContext::placeInPathContext(), edm::printCmsExceptionWarning(), edm::exception_actions::Rethrow, edm::exception_actions::SkipEvent, edm::ModuleCallingContext::type(), and edm::convertException::wrap().

160  {
161 
162  // NOTE: the warning printed as a result of ignoring or failing
163  // a module will only be printed during the full true processing
164  // pass of this module
165 
166  // Get the action corresponding to this exception. However, if processing
167  // something other than an event (e.g. run, lumi) always rethrow.
168  if(not isEvent) {
169  return true;
170  }
171  try {
172  convertException::wrap([&]() {
173  std::rethrow_exception(iPtr);
174  });
175  } catch(cms::Exception &ex) {
177 
178  if(action == exception_actions::Rethrow) {
179  return true;
180  }
181 
182  ModuleCallingContext tempContext(&description(),ModuleCallingContext::State::kInvalid, parentContext, nullptr);
183 
184  // If we are processing an endpath and the module was scheduled, treat SkipEvent or FailPath
185  // as IgnoreCompletely, so any subsequent OutputModules are still run.
186  // For unscheduled modules only treat FailPath as IgnoreCompletely but still allow SkipEvent to throw
187  ModuleCallingContext const* top_mcc = tempContext.getTopModuleCallingContext();
188  if(top_mcc->type() == ParentContext::Type::kPlaceInPath &&
189  top_mcc->placeInPathContext()->pathContext()->isEndPath()) {
190 
191  if ((action == exception_actions::SkipEvent && tempContext.type() == ParentContext::Type::kPlaceInPath) ||
192  action == exception_actions::FailPath) {
194  }
195  }
197  edm::printCmsExceptionWarning("IgnoreCompletely", ex);
198  return false;
199  }
200  }
201  return true;
202  }
ModuleDescription const & description() const
Definition: Worker.h:138
std::string const & category() const
Definition: Exception.cc:183
exception_actions::ActionCodes find(const std::string &category) const
ExceptionToActionTable const * actions_
Definition: Worker.h:411
auto wrap(F iFunc) -> decltype(iFunc())
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
void edm::Worker::skipOnPath ( )

Definition at line 325 of file Worker.cc.

References cached_exception_, edm::WaitingTaskList::doneWaiting(), numberOfPathsLeftToRun_, and waitingTasks_.

Referenced by edm::WorkerInPath::skipWorker().

325  {
326  if( 0 == --numberOfPathsLeftToRun_) {
328  }
329  }
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:407
std::exception_ptr cached_exception_
Definition: Worker.h:412
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:418
State edm::Worker::state ( ) const
inline

Definition at line 179 of file Worker.h.

Referenced by edm::WorkerInPath::checkResultsOfRunWorker().

179 { return state_; }
std::atomic< State > state_
Definition: Worker.h:405
int edm::Worker::timesExcept ( ) const
inline

Definition at line 178 of file Worker.h.

Referenced by edm::fillWorkerSummaryAux().

178 { return timesExcept_.load(std::memory_order_acquire); }
std::atomic< int > timesExcept_
Definition: Worker.h:404
int edm::Worker::timesFailed ( ) const
inline

Definition at line 177 of file Worker.h.

Referenced by edm::fillWorkerSummaryAux().

177 { return timesFailed_.load(std::memory_order_acquire); }
std::atomic< int > timesFailed_
Definition: Worker.h:403
int edm::Worker::timesPass ( ) const
inline

Definition at line 181 of file Worker.h.

181 { return timesPassed(); } // for backward compatibility only - to be removed soon
int timesPassed() const
Definition: Worker.h:176
int edm::Worker::timesPassed ( ) const
inline

Definition at line 176 of file Worker.h.

Referenced by edm::fillWorkerSummaryAux().

176 { return timesPassed_.load(std::memory_order_acquire); }
std::atomic< int > timesPassed_
Definition: Worker.h:402
int edm::Worker::timesRun ( ) const
inline

Definition at line 174 of file Worker.h.

Referenced by edm::fillWorkerSummaryAux().

174 { return timesRun_.load(std::memory_order_acquire); }
std::atomic< int > timesRun_
Definition: Worker.h:400
int edm::Worker::timesVisited ( ) const
inline

Definition at line 175 of file Worker.h.

Referenced by edm::fillWorkerSummaryAux().

175 { return timesVisited_.load(std::memory_order_acquire); }
std::atomic< int > timesVisited_
Definition: Worker.h:401
virtual void edm::Worker::updateLookup ( BranchType  iBranchType,
ProductResolverIndexHelper const &   
)
pure virtual

Implemented in edm::WorkerT< T >.

Referenced by edm::Schedule::changeModule().

virtual std::string edm::Worker::workerType ( ) const
protectedpure virtual

Implemented in edm::WorkerT< T >.

Friends And Related Function Documentation

template<typename O >
friend class workerhelper::CallImpl
friend

Definition at line 184 of file Worker.h.

Member Data Documentation

ExceptionToActionTable const* edm::Worker::actions_
private

Definition at line 411 of file Worker.h.

Referenced by shouldRethrowException().

std::shared_ptr<ActivityRegistry> edm::Worker::actReg_
private

Definition at line 414 of file Worker.h.

Referenced by beginJob(), beginStream(), endJob(), endStream(), prefetchAsync(), and setActivityRegistry().

std::exception_ptr edm::Worker::cached_exception_
private

Definition at line 412 of file Worker.h.

Referenced by skipOnPath().

edm::propagate_const<EarlyDeleteHelper*> edm::Worker::earlyDeleteHelper_
private

Definition at line 416 of file Worker.h.

Referenced by postDoEvent(), and setEarlyDeleteHelper().

ModuleCallingContext edm::Worker::moduleCallingContext_
private

Definition at line 409 of file Worker.h.

Referenced by beginStream(), endStream(), prefetchAsync(), and resetModuleDescription().

std::atomic<int> edm::Worker::numberOfPathsLeftToRun_
private

Definition at line 407 of file Worker.h.

Referenced by skipOnPath().

int edm::Worker::numberOfPathsOn_
private

Definition at line 406 of file Worker.h.

std::atomic<State> edm::Worker::state_
private

Definition at line 405 of file Worker.h.

Referenced by beginJob(), beginStream(), endJob(), and endStream().

std::atomic<int> edm::Worker::timesExcept_
private

Definition at line 404 of file Worker.h.

std::atomic<int> edm::Worker::timesFailed_
private

Definition at line 403 of file Worker.h.

std::atomic<int> edm::Worker::timesPassed_
private

Definition at line 402 of file Worker.h.

std::atomic<int> edm::Worker::timesRun_
private

Definition at line 400 of file Worker.h.

std::atomic<int> edm::Worker::timesVisited_
private

Definition at line 401 of file Worker.h.

edm::WaitingTaskList edm::Worker::waitingTasks_
private

Definition at line 418 of file Worker.h.

Referenced by skipOnPath().

std::atomic<bool> edm::Worker::workStarted_
private

Definition at line 419 of file Worker.h.