CMS 3D CMS Logo

List of all members | Classes | Public Types | Public Member Functions | Protected Member Functions | Private Member Functions | Static Private Member Functions | Private Attributes | Friends
edm::Worker Class Referenceabstract

#include <Worker.h>

Inheritance diagram for edm::Worker:
edm::WorkerT< T >

Classes

class  RunModuleTask
 
class  TransitionIDValue
 
class  TransitionIDValueBase
 

Public Types

enum  State { Ready, Pass, Fail, Exception }
 
enum  Types { kAnalyzer, kFilter, kProducer, kOutputModule }
 

Public Member Functions

void addedToPath ()
 
void beginJob ()
 
void beginStream (StreamID id, StreamContext &streamContext)
 
void callWhenDoneAsync (WaitingTask *task)
 
void clearCounters ()
 
virtual std::vector< ConsumesInfoconsumesInfo () const =0
 
virtual void convertCurrentProcessAlias (std::string const &processName)=0
 
ModuleDescription const * descPtr () const
 
ModuleDescription const & description () const
 
template<typename T >
bool doWork (typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
 
template<typename T >
void doWorkAsync (WaitingTask *task, typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
 
template<typename T >
void doWorkNoPrefetchingAsync (WaitingTask *task, typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
 
void endJob ()
 
void endStream (StreamID id, StreamContext &streamContext)
 
virtual void modulesWhoseProductsAreConsumed (std::vector< ModuleDescription const * > &modules, ProductRegistry const &preg, std::map< std::string, ModuleDescription const * > const &labelsToDesc) const =0
 
virtual Types moduleType () const =0
 
Workeroperator= (Worker const &)=delete
 
void postDoEvent (EventPrincipal const &)
 
void postForkReacquireResources (unsigned int iChildIndex, unsigned int iNumberOfChildren)
 
void preForkReleaseResources ()
 
void registerThinnedAssociations (ProductRegistry const &registry, ThinnedAssociationsHelper &helper)
 
void reset ()
 
virtual void resolvePutIndicies (BranchType iBranchType, std::unordered_multimap< std::string, edm::ProductResolverIndex > const &iIndicies)=0
 
void respondToCloseInputFile (FileBlock const &fb)
 
void respondToOpenInputFile (FileBlock const &fb)
 
void setActivityRegistry (std::shared_ptr< ActivityRegistry > areg)
 
void setEarlyDeleteHelper (EarlyDeleteHelper *iHelper)
 
void skipOnPath ()
 
State state () const
 
int timesExcept () const
 
int timesFailed () const
 
int timesPass () const
 
int timesPassed () const
 
int timesRun () const
 
int timesVisited () const
 
virtual void updateLookup (BranchType iBranchType, ProductResolverIndexHelper const &)=0
 
 Worker (ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
 
 Worker (Worker const &)=delete
 
virtual ~Worker ()
 

Protected Member Functions

ActivityRegistryactivityRegistry ()
 
virtual void implBeginJob ()=0
 
virtual void implBeginStream (StreamID)=0
 
virtual bool implDo (EventPrincipal const &, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoBegin (RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoBegin (LuminosityBlockPrincipal const &lbp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoEnd (RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoEnd (LuminosityBlockPrincipal const &lbp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoPrePrefetchSelection (StreamID id, EventPrincipal const &ep, ModuleCallingContext const *mcc)=0
 
virtual bool implDoStreamBegin (StreamID id, RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoStreamBegin (StreamID id, LuminosityBlockPrincipal const &lbp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoStreamEnd (StreamID id, RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoStreamEnd (StreamID id, LuminosityBlockPrincipal const &lbp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual void implEndJob ()=0
 
virtual void implEndStream (StreamID)=0
 
void resetModuleDescription (ModuleDescription const *)
 
virtual std::string workerType () const =0
 

Private Member Functions

void emitPostModuleEventPrefetchingSignal ()
 
virtual void implPostForkReacquireResources (unsigned int iChildIndex, unsigned int iNumberOfChildren)=0
 
virtual void implPreForkReleaseResources ()=0
 
virtual void implRegisterThinnedAssociations (ProductRegistry const &, ThinnedAssociationsHelper &)=0
 
virtual void implRespondToCloseInputFile (FileBlock const &fb)=0
 
virtual void implRespondToOpenInputFile (FileBlock const &fb)=0
 
virtual void itemsMayGet (BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
 
virtual std::vector< ProductResolverIndex > const & itemsShouldPutInEvent () const =0
 
virtual void itemsToGet (BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
 
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom (BranchType) const =0
 
virtual void preActionBeforeRunEventAsync (WaitingTask *iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const =0
 
void prefetchAsync (WaitingTask *, ParentContext const &parentContext, Principal const &)
 
template<typename T >
bool runModule (typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
 
template<typename T >
void runModuleAfterAsyncPrefetch (std::exception_ptr const *iEPtr, typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
 
virtual SerialTaskQueueChainserializeRunModule ()=0
 
template<bool IS_EVENT>
std::exception_ptr setException (std::exception_ptr iException)
 
template<bool IS_EVENT>
bool setFailed ()
 
template<bool IS_EVENT>
bool setPassed ()
 
bool shouldRethrowException (std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
 

Static Private Member Functions

static void exceptionContext (cms::Exception &ex, ModuleCallingContext const *mcc)
 

Private Attributes

ExceptionToActionTable const * actions_
 
std::shared_ptr< ActivityRegistryactReg_
 
std::exception_ptr cached_exception_
 
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
 
ModuleCallingContext moduleCallingContext_
 
std::atomic< int > numberOfPathsLeftToRun_
 
int numberOfPathsOn_
 
std::atomic< Statestate_
 
std::atomic< int > timesExcept_
 
std::atomic< int > timesFailed_
 
std::atomic< int > timesPassed_
 
std::atomic< int > timesRun_
 
std::atomic< int > timesVisited_
 
edm::WaitingTaskList waitingTasks_
 
std::atomic< bool > workStarted_
 

Friends

template<typename O >
class workerhelper::CallImpl
 

Detailed Description

Definition at line 76 of file Worker.h.

Member Enumeration Documentation

Enumerator
Ready 
Pass 
Fail 
Exception 

Definition at line 78 of file Worker.h.

Enumerator
kAnalyzer 
kFilter 
kProducer 
kOutputModule 

Definition at line 79 of file Worker.h.

Constructor & Destructor Documentation

edm::Worker::Worker ( ModuleDescription const &  iMD,
ExceptionToActionTable const *  iActions 
)

Definition at line 72 of file Worker.cc.

73  :
74  timesRun_(0),
75  timesVisited_(0),
76  timesPassed_(0),
77  timesFailed_(0),
78  timesExcept_(0),
79  state_(Ready),
83  actions_(iActions),
85  actReg_(),
86  earlyDeleteHelper_(nullptr),
87  workStarted_(false)
88  {
89  }
std::atomic< int > timesVisited_
Definition: Worker.h:398
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:404
std::atomic< int > timesExcept_
Definition: Worker.h:401
std::atomic< bool > workStarted_
Definition: Worker.h:416
std::atomic< int > timesFailed_
Definition: Worker.h:400
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:411
ExceptionToActionTable const * actions_
Definition: Worker.h:408
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:406
std::exception_ptr cached_exception_
Definition: Worker.h:409
std::atomic< State > state_
Definition: Worker.h:402
int numberOfPathsOn_
Definition: Worker.h:403
std::atomic< int > timesPassed_
Definition: Worker.h:399
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:413
std::atomic< int > timesRun_
Definition: Worker.h:397
edm::Worker::~Worker ( )
virtual

Definition at line 91 of file Worker.cc.

91  {
92  }
edm::Worker::Worker ( Worker const &  )
delete

Member Function Documentation

ActivityRegistry* edm::Worker::activityRegistry ( )
inlineprotected

Definition at line 208 of file Worker.h.

References edm::exceptionContext(), and benchmark_cfg::fb.

Referenced by edm::WorkerT< T >::implDo().

208 { return actReg_.get(); }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:411
void edm::Worker::addedToPath ( )
inline

Definition at line 164 of file Worker.h.

Referenced by edm::WorkerInPath::WorkerInPath().

164  {
166  }
int numberOfPathsOn_
Definition: Worker.h:403
void edm::Worker::beginJob ( void  )

Definition at line 245 of file Worker.cc.

References actReg_, cms::Exception::addContext(), description(), Exception, implBeginJob(), edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), state_, and edm::convertException::wrap().

Referenced by edm::WorkerManager::beginJob(), and edm::GlobalSchedule::replaceModule().

245  {
246  try {
247  convertException::wrap([&]() {
248  ModuleBeginJobSignalSentry cpp(actReg_.get(), description());
249  implBeginJob();
250  });
251  }
252  catch(cms::Exception& ex) {
253  state_ = Exception;
254  std::ostringstream ost;
255  ost << "Calling beginJob for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
256  ex.addContext(ost.str());
257  throw;
258  }
259  }
ModuleDescription const & description() const
Definition: Worker.h:132
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:411
std::string const & moduleName() const
std::string const & moduleLabel() const
std::atomic< State > state_
Definition: Worker.h:402
virtual void implBeginJob()=0
void addContext(std::string const &context)
Definition: Exception.cc:227
auto wrap(F iFunc) -> decltype(iFunc())
void edm::Worker::beginStream ( StreamID  id,
StreamContext streamContext 
)

Definition at line 277 of file Worker.cc.

References actReg_, cms::Exception::addContext(), description(), Exception, implBeginStream(), edm::LuminosityBlockIndex::invalidLuminosityBlockIndex(), edm::RunIndex::invalidRunIndex(), edm::StreamContext::kBeginStream, edm::ModuleCallingContext::kRunning, moduleCallingContext_, edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), edm::StreamContext::setEventID(), edm::StreamContext::setLuminosityBlockIndex(), edm::StreamContext::setRunIndex(), edm::ModuleCallingContext::setState(), edm::StreamContext::setTimestamp(), edm::StreamContext::setTransition(), state_, and edm::convertException::wrap().

Referenced by edm::StreamSchedule::replaceModule().

277  {
278  try {
279  convertException::wrap([&]() {
280  streamContext.setTransition(StreamContext::Transition::kBeginStream);
281  streamContext.setEventID(EventID(0, 0, 0));
282  streamContext.setRunIndex(RunIndex::invalidRunIndex());
283  streamContext.setLuminosityBlockIndex(LuminosityBlockIndex::invalidLuminosityBlockIndex());
284  streamContext.setTimestamp(Timestamp());
285  ParentContext parentContext(&streamContext);
286  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
288  ModuleBeginStreamSignalSentry beginSentry(actReg_.get(), streamContext, moduleCallingContext_);
289  implBeginStream(id);
290  });
291  }
292  catch(cms::Exception& ex) {
293  state_ = Exception;
294  std::ostringstream ost;
295  ost << "Calling beginStream for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
296  ex.addContext(ost.str());
297  throw;
298  }
299  }
ModuleDescription const & description() const
Definition: Worker.h:132
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:411
std::string const & moduleName() const
std::string const & moduleLabel() const
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:406
static RunIndex invalidRunIndex()
Definition: RunIndex.cc:9
std::atomic< State > state_
Definition: Worker.h:402
static LuminosityBlockIndex invalidLuminosityBlockIndex()
void addContext(std::string const &context)
Definition: Exception.cc:227
virtual void implBeginStream(StreamID)=0
auto wrap(F iFunc) -> decltype(iFunc())
void edm::Worker::callWhenDoneAsync ( WaitingTask task)
inline

Definition at line 107 of file Worker.h.

References bk::beginJob().

107  {
108  waitingTasks_.add(task);
109  }
void add(WaitingTask *)
Adds task to the waiting list.
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:415
void edm::Worker::clearCounters ( )
inline

Definition at line 156 of file Worker.h.

Referenced by edm::StreamSchedule::clearCounters().

156  {
157  timesRun_.store(0,std::memory_order_release);
158  timesVisited_.store(0,std::memory_order_release);
159  timesPassed_.store(0,std::memory_order_release);
160  timesFailed_.store(0,std::memory_order_release);
161  timesExcept_.store(0,std::memory_order_release);
162  }
std::atomic< int > timesVisited_
Definition: Worker.h:398
std::atomic< int > timesExcept_
Definition: Worker.h:401
std::atomic< int > timesFailed_
Definition: Worker.h:400
std::atomic< int > timesPassed_
Definition: Worker.h:399
std::atomic< int > timesRun_
Definition: Worker.h:397
virtual std::vector<ConsumesInfo> edm::Worker::consumesInfo ( ) const
pure virtual
virtual void edm::Worker::convertCurrentProcessAlias ( std::string const &  processName)
pure virtual

Implemented in edm::WorkerT< T >.

ModuleDescription const* edm::Worker::descPtr ( ) const
inline

Definition at line 133 of file Worker.h.

References modifiedElectrons_cfi::processName, and AlCaHLTBitMon_QueryRunRegistry::string.

ModuleCallingContext moduleCallingContext_
Definition: Worker.h:406
ModuleDescription const * moduleDescription() const
ModuleDescription const& edm::Worker::description ( ) const
inline
template<typename T >
bool edm::Worker::doWork ( typename T::MyPrincipal const &  ep,
EventSetup const &  c,
StreamID  stream,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

Definition at line 714 of file Worker.h.

References Fail, edm::ServiceRegistry::instance(), edm::ModuleCallingContext::kInvalid, edm::ModuleCallingContext::kPrefetching, edm::make_empty_waiting_task(), Pass, edm::ServiceRegistry::presentToken(), mps_fire::queue, Ready, and edm::ModuleCallingContext::setContext().

Referenced by edm::WorkerInPath::runWorker().

718  {
719 
720  if (T::isEvent_) {
721  timesVisited_.fetch_add(1,std::memory_order_relaxed);
722  }
723  bool rc = false;
724 
725  switch(state_) {
726  case Ready: break;
727  case Pass: return true;
728  case Fail: return false;
729  case Exception: {
730  std::rethrow_exception(cached_exception_);
731  }
732  }
733 
734  bool expected = false;
735  if(not workStarted_.compare_exchange_strong(expected, true) ) {
736  //another thread beat us here
737  auto waitTask = edm::make_empty_waiting_task();
738  waitTask->increment_ref_count();
739 
740  waitingTasks_.add(waitTask.get());
741 
742  waitTask->wait_for_all();
743 
744  switch(state_) {
745  case Ready: assert(false);
746  case Pass: return true;
747  case Fail: return false;
748  case Exception: {
749  std::rethrow_exception(cached_exception_);
750  }
751  }
752  }
753 
754  //Need the context to be set until after any exception is resolved
756 
757  auto resetContext = [](ModuleCallingContext* iContext) {iContext->setContext(ModuleCallingContext::State::kInvalid,ParentContext(),nullptr); };
758  std::unique_ptr<ModuleCallingContext, decltype(resetContext)> prefetchSentry(&moduleCallingContext_,resetContext);
759 
760  if (T::isEvent_) {
761  try {
762  //if have TriggerResults based selection we want to reject the event before doing prefetching
763  if( not workerhelper::CallImpl<T>::prePrefetchSelection(this,streamID,ep,&moduleCallingContext_) ) {
764  timesRun_.fetch_add(1,std::memory_order_relaxed);
765  rc = setPassed<T::isEvent_>();
766  waitingTasks_.doneWaiting(nullptr);
767  return true;
768  }
769  }catch(...) {}
770  auto waitTask = edm::make_empty_waiting_task();
771  {
772  //Make sure signal is sent once the prefetching is done
773  // [the 'pre' signal was sent in prefetchAsync]
774  //The purpose of this block is to send the signal after wait_for_all
775  auto sentryFunc = [this](void*) {
777  };
778  std::unique_ptr<ActivityRegistry, decltype(sentryFunc)> signalSentry(actReg_.get(),sentryFunc);
779 
780  //set count to 2 since wait_for_all requires value to not go to 0
781  waitTask->set_ref_count(2);
782 
783  prefetchAsync(waitTask.get(),parentContext, ep);
784  waitTask->decrement_ref_count();
785  waitTask->wait_for_all();
786  }
787  if(waitTask->exceptionPtr() != nullptr) {
788  TransitionIDValue<typename T::MyPrincipal> idValue(ep);
789  if(shouldRethrowException(*waitTask->exceptionPtr(), parentContext, T::isEvent_, idValue)) {
790  setException<T::isEvent_>(*waitTask->exceptionPtr());
792  std::rethrow_exception(cached_exception_);
793  } else {
794  setPassed<T::isEvent_>();
795  waitingTasks_.doneWaiting(nullptr);
796  return true;
797  }
798  }
799  }
800 
801  //successful prefetch so no reset necessary
802  prefetchSentry.release();
803  if(auto queue = serializeRunModule()) {
804  auto serviceToken = ServiceRegistry::instance().presentToken();
805  queue->pushAndWait([&]() {
806  //Need to make the services available
807  ServiceRegistry::Operate guard(serviceToken);
808  try {
809  rc = runModule<T>(ep,es,streamID,parentContext,context);
810  } catch(...) {
811  }
812  });
813  } else {
814  try {
815  rc = runModule<T>(ep,es,streamID,parentContext,context);
816  } catch(...) {
817  }
818  }
819  if(state_ == Exception) {
821  std::rethrow_exception(cached_exception_);
822  }
823 
824  waitingTasks_.doneWaiting(nullptr);
825  return rc;
826  }
std::atomic< int > timesVisited_
Definition: Worker.h:398
std::atomic< bool > workStarted_
Definition: Worker.h:416
void add(WaitingTask *)
Adds task to the waiting list.
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:411
virtual SerialTaskQueueChain * serializeRunModule()=0
ServiceToken presentToken() const
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:406
std::exception_ptr cached_exception_
Definition: Worker.h:409
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
Definition: Worker.cc:157
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:415
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
static ServiceRegistry & instance()
std::atomic< State > state_
Definition: Worker.h:402
void prefetchAsync(WaitingTask *, ParentContext const &parentContext, Principal const &)
Definition: Worker.cc:205
void emitPostModuleEventPrefetchingSignal()
Definition: Worker.h:303
std::atomic< int > timesRun_
Definition: Worker.h:397
template<typename T >
void edm::Worker::doWorkAsync ( WaitingTask task,
typename T::MyPrincipal const &  ep,
EventSetup const &  c,
StreamID  stream,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

Definition at line 617 of file Worker.h.

References edm::ModuleCallingContext::kPrefetching, and edm::ModuleCallingContext::setContext().

Referenced by edm::WorkerInPath::runWorkerAsync().

622  {
623  waitingTasks_.add(task);
624  if(T::isEvent_) {
625  timesVisited_.fetch_add(1,std::memory_order_relaxed);
626  }
627 
628  bool expected = false;
629  if(workStarted_.compare_exchange_strong(expected,true)) {
631 
632  //if have TriggerResults based selection we want to reject the event before doing prefetching
633  try {
634  if( not workerhelper::CallImpl<T>::prePrefetchSelection(this,streamID,ep,&moduleCallingContext_) ) {
635  setPassed<T::isEvent_>();
636  waitingTasks_.doneWaiting(nullptr);
637  return;
638  }
639  }catch(...) {}
640 
641  auto runTask = new (tbb::task::allocate_root()) RunModuleTask<T>(
642  this, ep,es,streamID,parentContext,context);
643  prefetchAsync(runTask, parentContext, ep);
644  }
645  }
std::atomic< int > timesVisited_
Definition: Worker.h:398
std::atomic< bool > workStarted_
Definition: Worker.h:416
void add(WaitingTask *)
Adds task to the waiting list.
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:406
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:415
void prefetchAsync(WaitingTask *, ParentContext const &parentContext, Principal const &)
Definition: Worker.cc:205
template<typename T >
void edm::Worker::doWorkNoPrefetchingAsync ( WaitingTask task,
typename T::MyPrincipal const &  principal,
EventSetup const &  c,
StreamID  stream,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

Definition at line 676 of file Worker.h.

References edm::ServiceRegistry::instance(), edm::make_functor_task(), edm::ServiceRegistry::presentToken(), cmsRelvalreport::principal(), and mps_fire::queue.

Referenced by edm::WorkerInPath::runWorkerAsync().

681  {
682  waitingTasks_.add(task);
683  bool expected = false;
684  if(workStarted_.compare_exchange_strong(expected,true)) {
685  auto serviceToken = ServiceRegistry::instance().presentToken();
686 
687  auto toDo =[this, &principal, &es, streamID,parentContext,context, serviceToken]()
688  {
689  std::exception_ptr exceptionPtr;
690  try {
691  //Need to make the services available
692  ServiceRegistry::Operate guard(serviceToken);
693 
694  this->runModule<T>(principal,
695  es,
696  streamID,
697  parentContext,
698  context);
699  } catch( ... ) {
700  exceptionPtr = std::current_exception();
701  }
702  this->waitingTasks_.doneWaiting(exceptionPtr);
703  };
704  if(auto queue = this->serializeRunModule()) {
705  queue->push( toDo);
706  } else {
707  auto task = make_functor_task( tbb::task::allocate_root(), toDo);
708  tbb::task::spawn(*task);
709  }
710  }
711  }
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
std::atomic< bool > workStarted_
Definition: Worker.h:416
void add(WaitingTask *)
Adds task to the waiting list.
virtual SerialTaskQueueChain * serializeRunModule()=0
ServiceToken presentToken() const
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
def principal(options)
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:415
static ServiceRegistry & instance()
void edm::Worker::emitPostModuleEventPrefetchingSignal ( )
inlineprivate

Definition at line 303 of file Worker.h.

303  {
304  actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),moduleCallingContext_);
305  }
StreamContext const * getStreamContext() const
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:411
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:406
void edm::Worker::endJob ( void  )

Definition at line 261 of file Worker.cc.

References actReg_, cms::Exception::addContext(), description(), Exception, implEndJob(), edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), state_, and edm::convertException::wrap().

261  {
262  try {
263  convertException::wrap([&]() {
264  ModuleEndJobSignalSentry cpp(actReg_.get(), description());
265  implEndJob();
266  });
267  }
268  catch(cms::Exception& ex) {
269  state_ = Exception;
270  std::ostringstream ost;
271  ost << "Calling endJob for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
272  ex.addContext(ost.str());
273  throw;
274  }
275  }
ModuleDescription const & description() const
Definition: Worker.h:132
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:411
std::string const & moduleName() const
std::string const & moduleLabel() const
std::atomic< State > state_
Definition: Worker.h:402
virtual void implEndJob()=0
void addContext(std::string const &context)
Definition: Exception.cc:227
auto wrap(F iFunc) -> decltype(iFunc())
void edm::Worker::endStream ( StreamID  id,
StreamContext streamContext 
)

Definition at line 301 of file Worker.cc.

References actReg_, cms::Exception::addContext(), description(), Exception, implEndStream(), edm::LuminosityBlockIndex::invalidLuminosityBlockIndex(), edm::RunIndex::invalidRunIndex(), edm::StreamContext::kEndStream, edm::ModuleCallingContext::kRunning, moduleCallingContext_, edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), edm::StreamContext::setEventID(), edm::StreamContext::setLuminosityBlockIndex(), edm::StreamContext::setRunIndex(), edm::ModuleCallingContext::setState(), edm::StreamContext::setTimestamp(), edm::StreamContext::setTransition(), state_, and edm::convertException::wrap().

301  {
302  try {
303  convertException::wrap([&]() {
304  streamContext.setTransition(StreamContext::Transition::kEndStream);
305  streamContext.setEventID(EventID(0, 0, 0));
306  streamContext.setRunIndex(RunIndex::invalidRunIndex());
307  streamContext.setLuminosityBlockIndex(LuminosityBlockIndex::invalidLuminosityBlockIndex());
308  streamContext.setTimestamp(Timestamp());
309  ParentContext parentContext(&streamContext);
310  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
312  ModuleEndStreamSignalSentry endSentry(actReg_.get(), streamContext, moduleCallingContext_);
313  implEndStream(id);
314  });
315  }
316  catch(cms::Exception& ex) {
317  state_ = Exception;
318  std::ostringstream ost;
319  ost << "Calling endStream for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
320  ex.addContext(ost.str());
321  throw;
322  }
323  }
ModuleDescription const & description() const
Definition: Worker.h:132
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:411
std::string const & moduleName() const
std::string const & moduleLabel() const
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:406
static RunIndex invalidRunIndex()
Definition: RunIndex.cc:9
std::atomic< State > state_
Definition: Worker.h:402
static LuminosityBlockIndex invalidLuminosityBlockIndex()
void addContext(std::string const &context)
Definition: Exception.cc:227
virtual void implEndStream(StreamID)=0
auto wrap(F iFunc) -> decltype(iFunc())
void edm::Worker::exceptionContext ( cms::Exception ex,
ModuleCallingContext const *  mcc 
)
staticprivate

Definition at line 99 of file Worker.cc.

References cms::Exception::addContext(), edm::exceptionContext(), edm::ModuleCallingContext::globalContext(), edm::ModuleCallingContext::internalContext(), edm::ParentContext::kGlobal, edm::ParentContext::kInternal, edm::ParentContext::kModule, edm::ParentContext::kPlaceInPath, edm::ModuleCallingContext::kPrefetching, edm::ParentContext::kStream, edm::InternalContext::moduleCallingContext(), edm::ModuleCallingContext::moduleCallingContext(), edm::ModuleCallingContext::moduleDescription(), edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), or, edm::PlaceInPathContext::pathContext(), edm::PathContext::pathName(), edm::ModuleCallingContext::placeInPathContext(), edm::ModuleCallingContext::state(), edm::PathContext::streamContext(), edm::ModuleCallingContext::streamContext(), and edm::ModuleCallingContext::type().

101  {
102 
103  ModuleCallingContext const* imcc = mcc;
104  while( (imcc->type() == ParentContext::Type::kModule) or
105  (imcc->type() == ParentContext::Type::kInternal) ) {
106  std::ostringstream iost;
107  if( imcc->state() == ModuleCallingContext::State::kPrefetching ) {
108  iost << "Prefetching for module ";
109  } else {
110  iost << "Calling method for module ";
111  }
112  iost << imcc->moduleDescription()->moduleName() << "/'"
113  << imcc->moduleDescription()->moduleLabel() << "'";
114 
115  if(imcc->type() == ParentContext::Type::kInternal) {
116  iost << " (probably inside some kind of mixing module)";
117  imcc = imcc->internalContext()->moduleCallingContext();
118  } else {
119  imcc = imcc->moduleCallingContext();
120  }
121  ex.addContext(iost.str());
122  }
123  std::ostringstream ost;
124  if( imcc->state() == ModuleCallingContext::State::kPrefetching ) {
125  ost << "Prefetching for module ";
126  } else {
127  ost << "Calling method for module ";
128  }
129  ost << imcc->moduleDescription()->moduleName() << "/'"
130  << imcc->moduleDescription()->moduleLabel() << "'";
131  ex.addContext(ost.str());
132 
133  if (imcc->type() == ParentContext::Type::kPlaceInPath) {
134  ost.str("");
135  ost << "Running path '";
136  ost << imcc->placeInPathContext()->pathContext()->pathName() << "'";
137  ex.addContext(ost.str());
138  auto streamContext =imcc->placeInPathContext()->pathContext()->streamContext();
139  if(streamContext) {
140  ost.str("");
141  edm::exceptionContext(ost,*streamContext);
142  ex.addContext(ost.str());
143  }
144  } else {
145  if (imcc->type() == ParentContext::Type::kStream) {
146  ost.str("");
147  edm::exceptionContext(ost, *(imcc->streamContext()) );
148  ex.addContext(ost.str());
149  } else if(imcc->type() == ParentContext::Type::kGlobal) {
150  ost.str("");
151  edm::exceptionContext(ost, *(imcc->globalContext()) );
152  ex.addContext(ost.str());
153  }
154  }
155  }
void exceptionContext(std::ostream &, GlobalContext const &)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
void addContext(std::string const &context)
Definition: Exception.cc:227
virtual void edm::Worker::implBeginJob ( )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by beginJob().

virtual void edm::Worker::implBeginStream ( StreamID  )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by beginStream().

virtual bool edm::Worker::implDo ( EventPrincipal const &  ,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual
virtual bool edm::Worker::implDoBegin ( RunPrincipal const &  rp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual
virtual bool edm::Worker::implDoBegin ( LuminosityBlockPrincipal const &  lbp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

virtual bool edm::Worker::implDoEnd ( RunPrincipal const &  rp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual
virtual bool edm::Worker::implDoEnd ( LuminosityBlockPrincipal const &  lbp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

virtual bool edm::Worker::implDoPrePrefetchSelection ( StreamID  id,
EventPrincipal const &  ep,
ModuleCallingContext const *  mcc 
)
protectedpure virtual
virtual bool edm::Worker::implDoStreamBegin ( StreamID  id,
RunPrincipal const &  rp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual
virtual bool edm::Worker::implDoStreamBegin ( StreamID  id,
LuminosityBlockPrincipal const &  lbp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

virtual bool edm::Worker::implDoStreamEnd ( StreamID  id,
RunPrincipal const &  rp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual
virtual bool edm::Worker::implDoStreamEnd ( StreamID  id,
LuminosityBlockPrincipal const &  lbp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::implEndJob ( )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by endJob().

virtual void edm::Worker::implEndStream ( StreamID  )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by endStream().

virtual void edm::Worker::implPostForkReacquireResources ( unsigned int  iChildIndex,
unsigned int  iNumberOfChildren 
)
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::implPreForkReleaseResources ( )
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::implRegisterThinnedAssociations ( ProductRegistry const &  ,
ThinnedAssociationsHelper  
)
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::implRespondToCloseInputFile ( FileBlock const &  fb)
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::implRespondToOpenInputFile ( FileBlock const &  fb)
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::itemsMayGet ( BranchType  ,
std::vector< ProductResolverIndexAndSkipBit > &   
) const
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual std::vector<ProductResolverIndex> const& edm::Worker::itemsShouldPutInEvent ( ) const
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::itemsToGet ( BranchType  ,
std::vector< ProductResolverIndexAndSkipBit > &   
) const
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual std::vector<ProductResolverIndexAndSkipBit> const& edm::Worker::itemsToGetFrom ( BranchType  ) const
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by prefetchAsync().

virtual void edm::Worker::modulesWhoseProductsAreConsumed ( std::vector< ModuleDescription const * > &  modules,
ProductRegistry const &  preg,
std::map< std::string, ModuleDescription const * > const &  labelsToDesc 
) const
pure virtual

Implemented in edm::WorkerT< T >.

virtual Types edm::Worker::moduleType ( ) const
pure virtual
Worker& edm::Worker::operator= ( Worker const &  )
delete
void edm::Worker::postDoEvent ( EventPrincipal const &  iEvent)

Definition at line 331 of file Worker.cc.

References earlyDeleteHelper_.

Referenced by edm::WorkerT< T >::implDo().

331  {
332  if(earlyDeleteHelper_) {
333  earlyDeleteHelper_->moduleRan(iEvent);
334  }
335  }
int iEvent
Definition: GenABIO.cc:230
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:413
void edm::Worker::postForkReacquireResources ( unsigned int  iChildIndex,
unsigned int  iNumberOfChildren 
)
inline

Definition at line 119 of file Worker.h.

Referenced by edm::Schedule::postForkReacquireResources().

119 {implPostForkReacquireResources(iChildIndex, iNumberOfChildren);}
virtual void implPostForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren)=0
virtual void edm::Worker::preActionBeforeRunEventAsync ( WaitingTask iTask,
ModuleCallingContext const &  moduleCallingContext,
Principal const &  iPrincipal 
) const
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by prefetchAsync().

void edm::Worker::prefetchAsync ( WaitingTask iTask,
ParentContext const &  parentContext,
Principal const &  iPrincipal 
)
private

Definition at line 205 of file Worker.cc.

References actReg_, edm::Principal::branchType(), edm::ModuleCallingContext::getStreamContext(), edm::InEvent, mps_monitormerge::items, itemsToGetFrom(), edm::ModuleCallingContext::kPrefetching, moduleCallingContext_, preActionBeforeRunEventAsync(), edm::Principal::prefetchAsync(), edm::ProductResolverIndexAmbiguous, and edm::ModuleCallingContext::setContext().

205  {
206  // Prefetch products the module declares it consumes (not including the products it maybe consumes)
207  std::vector<ProductResolverIndexAndSkipBit> const& items = itemsToGetFrom(iPrincipal.branchType());
208 
210 
211  if(iPrincipal.branchType()==InEvent) {
212  actReg_->preModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),moduleCallingContext_);
213  }
214 
215  //Need to be sure the ref count isn't set to 0 immediately
216  iTask->increment_ref_count();
217  for(auto const& item : items) {
218  ProductResolverIndex productResolverIndex = item.productResolverIndex();
219  bool skipCurrentProcess = item.skipCurrentProcess();
220  if(productResolverIndex != ProductResolverIndexAmbiguous) {
221  iPrincipal.prefetchAsync(iTask,productResolverIndex, skipCurrentProcess, &moduleCallingContext_);
222  }
223  }
224 
225  if(iPrincipal.branchType()==InEvent) {
227  }
228 
229  if(0 == iTask->decrement_ref_count()) {
230  //if everything finishes before we leave this routine, we need to launch the task
231  tbb::task::spawn(*iTask);
232  }
233  }
unsigned int ProductResolverIndex
StreamContext const * getStreamContext() const
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:411
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:406
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom(BranchType) const =0
virtual void preActionBeforeRunEventAsync(WaitingTask *iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const =0
void edm::Worker::preForkReleaseResources ( )
inline

Definition at line 118 of file Worker.h.

Referenced by edm::Schedule::preForkReleaseResources().

virtual void implPreForkReleaseResources()=0
void edm::Worker::registerThinnedAssociations ( ProductRegistry const &  registry,
ThinnedAssociationsHelper helper 
)
inline

Definition at line 120 of file Worker.h.

Definition: helper.py:1
virtual void implRegisterThinnedAssociations(ProductRegistry const &, ThinnedAssociationsHelper &)=0
static Interceptor::Registry registry("Interceptor")
void edm::Worker::reset ( void  )
inline

Definition at line 122 of file Worker.h.

References Ready.

Referenced by edm::WorkerManager::resetAll().

122  {
123  cached_exception_ = std::exception_ptr();
124  state_ = Ready;
126  workStarted_ = false;
128  }
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:404
std::atomic< bool > workStarted_
Definition: Worker.h:416
void reset()
Resets access to the resource so that added tasks will wait.
std::exception_ptr cached_exception_
Definition: Worker.h:409
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:415
std::atomic< State > state_
Definition: Worker.h:402
int numberOfPathsOn_
Definition: Worker.h:403
void edm::Worker::resetModuleDescription ( ModuleDescription const *  iDesc)
protected
virtual void edm::Worker::resolvePutIndicies ( BranchType  iBranchType,
std::unordered_multimap< std::string, edm::ProductResolverIndex > const &  iIndicies 
)
pure virtual

Implemented in edm::WorkerT< T >.

void edm::Worker::respondToCloseInputFile ( FileBlock const &  fb)
inline

Definition at line 116 of file Worker.h.

Referenced by edm::Schedule::respondToCloseInputFile().

virtual void implRespondToCloseInputFile(FileBlock const &fb)=0
void edm::Worker::respondToOpenInputFile ( FileBlock const &  fb)
inline

Definition at line 115 of file Worker.h.

Referenced by edm::Schedule::respondToOpenInputFile().

virtual void implRespondToOpenInputFile(FileBlock const &fb)=0
template<typename T >
bool edm::Worker::runModule ( typename T::MyPrincipal const &  ep,
EventSetup const &  c,
StreamID  stream,
ParentContext const &  parentContext,
typename T::Context const *  context 
)
private

Definition at line 830 of file Worker.h.

References edm::exceptionContext(), edm::EventSetup::get(), and edm::convertException::wrap().

834  {
835  //unscheduled producers should advance this
836  //if (T::isEvent_) {
837  // ++timesVisited_;
838  //}
839  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
840  if (T::isEvent_) {
841  timesRun_.fetch_add(1,std::memory_order_relaxed);
842  }
843 
844  bool rc = true;
845  try {
847  {
848  rc = workerhelper::CallImpl<T>::call(this,streamID,ep,es, actReg_.get(), &moduleCallingContext_, context);
849 
850  if (rc) {
851  setPassed<T::isEvent_>();
852  } else {
853  setFailed<T::isEvent_>();
854  }
855  });
856  } catch(cms::Exception& ex) {
858  TransitionIDValue<typename T::MyPrincipal> idValue(ep);
859  if(shouldRethrowException(std::current_exception(), parentContext, T::isEvent_, idValue)) {
860  assert(not cached_exception_);
861  setException<T::isEvent_>(std::current_exception());
862  std::rethrow_exception(cached_exception_);
863  } else {
864  rc = setPassed<T::isEvent_>();
865  }
866  }
867 
868  return rc;
869  }
static void exceptionContext(cms::Exception &ex, ModuleCallingContext const *mcc)
Definition: Worker.cc:99
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:411
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:406
std::exception_ptr cached_exception_
Definition: Worker.h:409
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
Definition: Worker.cc:157
auto wrap(F iFunc) -> decltype(iFunc())
std::atomic< int > timesRun_
Definition: Worker.h:397
template<typename T >
void edm::Worker::runModuleAfterAsyncPrefetch ( std::exception_ptr const *  iEPtr,
typename T::MyPrincipal const &  ep,
EventSetup const &  es,
StreamID  streamID,
ParentContext const &  parentContext,
typename T::Context const *  context 
)
private

Definition at line 648 of file Worker.h.

References edm::ModuleCallingContext::kInvalid, and edm::ModuleCallingContext::setContext().

Referenced by edm::Worker::RunModuleTask< T >::execute().

653  {
654  std::exception_ptr exceptionPtr;
655  if(iEPtr) {
656  assert(*iEPtr);
657  TransitionIDValue<typename T::MyPrincipal> idValue(ep);
658  if(shouldRethrowException(*iEPtr, parentContext, T::isEvent_, idValue)) {
659  exceptionPtr = *iEPtr;
660  setException<T::isEvent_>(exceptionPtr);
661  } else {
662  setPassed<T::isEvent_>();
663  }
665  } else {
666  try {
667  runModule<T>(ep,es,streamID,parentContext,context);
668  } catch(...) {
669  exceptionPtr = std::current_exception();
670  }
671  }
672  waitingTasks_.doneWaiting(exceptionPtr);
673  }
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:406
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
Definition: Worker.cc:157
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:415
virtual SerialTaskQueueChain* edm::Worker::serializeRunModule ( )
privatepure virtual
void edm::Worker::setActivityRegistry ( std::shared_ptr< ActivityRegistry areg)

The signals are required to live longer than the last call to 'doWork' this was done to improve performance based on profiling

Definition at line 94 of file Worker.cc.

References actReg_.

94  {
95  actReg_ = areg;
96  }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:411
void edm::Worker::setEarlyDeleteHelper ( EarlyDeleteHelper iHelper)

Definition at line 235 of file Worker.cc.

References earlyDeleteHelper_.

235  {
236  earlyDeleteHelper_=iHelper;
237  }
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:413
template<bool IS_EVENT>
std::exception_ptr edm::Worker::setException ( std::exception_ptr  iException)
inlineprivate

Definition at line 290 of file Worker.h.

References Exception.

290  {
291  if (IS_EVENT) {
292  timesExcept_.fetch_add(1,std::memory_order_relaxed);
293  }
294  cached_exception_ = iException; // propagate_const<T> has no reset() function
295  state_ = Exception;
296  return cached_exception_;
297  }
std::atomic< int > timesExcept_
Definition: Worker.h:401
std::exception_ptr cached_exception_
Definition: Worker.h:409
std::atomic< State > state_
Definition: Worker.h:402
template<bool IS_EVENT>
bool edm::Worker::setFailed ( )
inlineprivate

Definition at line 281 of file Worker.h.

References Fail.

281  {
282  if(IS_EVENT) {
283  timesFailed_.fetch_add(1,std::memory_order_relaxed);
284  }
285  state_ = Fail;
286  return false;
287  }
std::atomic< int > timesFailed_
Definition: Worker.h:400
std::atomic< State > state_
Definition: Worker.h:402
template<bool IS_EVENT>
bool edm::Worker::setPassed ( )
inlineprivate

Definition at line 272 of file Worker.h.

References Pass.

272  {
273  if(IS_EVENT) {
274  timesPassed_.fetch_add(1,std::memory_order_relaxed);
275  }
276  state_ = Pass;
277  return true;
278  }
std::atomic< State > state_
Definition: Worker.h:402
std::atomic< int > timesPassed_
Definition: Worker.h:399
bool edm::Worker::shouldRethrowException ( std::exception_ptr  iPtr,
ParentContext const &  parentContext,
bool  isEvent,
TransitionIDValueBase const &  iID 
) const
private

Definition at line 157 of file Worker.cc.

References mps_alisetup::action, actions_, cms::Exception::category(), description(), edm::exception_actions::FailPath, edm::ExceptionToActionTable::find(), edm::ModuleCallingContext::getTopModuleCallingContext(), edm::exception_actions::IgnoreCompletely, edm::PathContext::isEndPath(), edm::ModuleCallingContext::kInvalid, edm::ParentContext::kPlaceInPath, edm::PlaceInPathContext::pathContext(), edm::ModuleCallingContext::placeInPathContext(), edm::printCmsExceptionWarning(), edm::exception_actions::Rethrow, edm::exception_actions::SkipEvent, edm::ModuleCallingContext::type(), and edm::convertException::wrap().

160  {
161 
162  // NOTE: the warning printed as a result of ignoring or failing
163  // a module will only be printed during the full true processing
164  // pass of this module
165 
166  // Get the action corresponding to this exception. However, if processing
167  // something other than an event (e.g. run, lumi) always rethrow.
168  if(not isEvent) {
169  return true;
170  }
171  try {
172  convertException::wrap([&]() {
173  std::rethrow_exception(iPtr);
174  });
175  } catch(cms::Exception &ex) {
177 
178  if(action == exception_actions::Rethrow) {
179  return true;
180  }
181 
182  ModuleCallingContext tempContext(&description(),ModuleCallingContext::State::kInvalid, parentContext, nullptr);
183 
184  // If we are processing an endpath and the module was scheduled, treat SkipEvent or FailPath
185  // as IgnoreCompletely, so any subsequent OutputModules are still run.
186  // For unscheduled modules only treat FailPath as IgnoreCompletely but still allow SkipEvent to throw
187  ModuleCallingContext const* top_mcc = tempContext.getTopModuleCallingContext();
188  if(top_mcc->type() == ParentContext::Type::kPlaceInPath &&
189  top_mcc->placeInPathContext()->pathContext()->isEndPath()) {
190 
191  if ((action == exception_actions::SkipEvent && tempContext.type() == ParentContext::Type::kPlaceInPath) ||
192  action == exception_actions::FailPath) {
194  }
195  }
197  edm::printCmsExceptionWarning("IgnoreCompletely", ex);
198  return false;
199  }
200  }
201  return true;
202  }
ModuleDescription const & description() const
Definition: Worker.h:132
std::string const & category() const
Definition: Exception.cc:183
exception_actions::ActionCodes find(const std::string &category) const
ExceptionToActionTable const * actions_
Definition: Worker.h:408
auto wrap(F iFunc) -> decltype(iFunc())
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
void edm::Worker::skipOnPath ( )

Definition at line 325 of file Worker.cc.

References cached_exception_, edm::WaitingTaskList::doneWaiting(), numberOfPathsLeftToRun_, and waitingTasks_.

Referenced by edm::WorkerInPath::skipWorker().

325  {
326  if( 0 == --numberOfPathsLeftToRun_) {
328  }
329  }
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:404
std::exception_ptr cached_exception_
Definition: Worker.h:409
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:415
State edm::Worker::state ( ) const
inline

Definition at line 173 of file Worker.h.

Referenced by edm::WorkerInPath::checkResultsOfRunWorker().

173 { return state_; }
std::atomic< State > state_
Definition: Worker.h:402
int edm::Worker::timesExcept ( ) const
inline

Definition at line 172 of file Worker.h.

Referenced by edm::fillWorkerSummaryAux().

172 { return timesExcept_.load(std::memory_order_acquire); }
std::atomic< int > timesExcept_
Definition: Worker.h:401
int edm::Worker::timesFailed ( ) const
inline

Definition at line 171 of file Worker.h.

Referenced by edm::fillWorkerSummaryAux().

171 { return timesFailed_.load(std::memory_order_acquire); }
std::atomic< int > timesFailed_
Definition: Worker.h:400
int edm::Worker::timesPass ( ) const
inline

Definition at line 175 of file Worker.h.

175 { return timesPassed(); } // for backward compatibility only - to be removed soon
int timesPassed() const
Definition: Worker.h:170
int edm::Worker::timesPassed ( ) const
inline

Definition at line 170 of file Worker.h.

Referenced by edm::fillWorkerSummaryAux().

170 { return timesPassed_.load(std::memory_order_acquire); }
std::atomic< int > timesPassed_
Definition: Worker.h:399
int edm::Worker::timesRun ( ) const
inline

Definition at line 168 of file Worker.h.

Referenced by edm::fillWorkerSummaryAux().

168 { return timesRun_.load(std::memory_order_acquire); }
std::atomic< int > timesRun_
Definition: Worker.h:397
int edm::Worker::timesVisited ( ) const
inline

Definition at line 169 of file Worker.h.

Referenced by edm::fillWorkerSummaryAux().

169 { return timesVisited_.load(std::memory_order_acquire); }
std::atomic< int > timesVisited_
Definition: Worker.h:398
virtual void edm::Worker::updateLookup ( BranchType  iBranchType,
ProductResolverIndexHelper const &   
)
pure virtual

Implemented in edm::WorkerT< T >.

Referenced by edm::Schedule::changeModule().

virtual std::string edm::Worker::workerType ( ) const
protectedpure virtual

Implemented in edm::WorkerT< T >.

Friends And Related Function Documentation

template<typename O >
friend class workerhelper::CallImpl
friend

Definition at line 178 of file Worker.h.

Member Data Documentation

ExceptionToActionTable const* edm::Worker::actions_
private

Definition at line 408 of file Worker.h.

Referenced by shouldRethrowException().

std::shared_ptr<ActivityRegistry> edm::Worker::actReg_
private

Definition at line 411 of file Worker.h.

Referenced by beginJob(), beginStream(), endJob(), endStream(), prefetchAsync(), and setActivityRegistry().

std::exception_ptr edm::Worker::cached_exception_
private

Definition at line 409 of file Worker.h.

Referenced by skipOnPath().

edm::propagate_const<EarlyDeleteHelper*> edm::Worker::earlyDeleteHelper_
private

Definition at line 413 of file Worker.h.

Referenced by postDoEvent(), and setEarlyDeleteHelper().

ModuleCallingContext edm::Worker::moduleCallingContext_
private

Definition at line 406 of file Worker.h.

Referenced by beginStream(), endStream(), prefetchAsync(), and resetModuleDescription().

std::atomic<int> edm::Worker::numberOfPathsLeftToRun_
private

Definition at line 404 of file Worker.h.

Referenced by skipOnPath().

int edm::Worker::numberOfPathsOn_
private

Definition at line 403 of file Worker.h.

std::atomic<State> edm::Worker::state_
private

Definition at line 402 of file Worker.h.

Referenced by beginJob(), beginStream(), endJob(), and endStream().

std::atomic<int> edm::Worker::timesExcept_
private

Definition at line 401 of file Worker.h.

std::atomic<int> edm::Worker::timesFailed_
private

Definition at line 400 of file Worker.h.

std::atomic<int> edm::Worker::timesPassed_
private

Definition at line 399 of file Worker.h.

std::atomic<int> edm::Worker::timesRun_
private

Definition at line 397 of file Worker.h.

std::atomic<int> edm::Worker::timesVisited_
private

Definition at line 398 of file Worker.h.

edm::WaitingTaskList edm::Worker::waitingTasks_
private

Definition at line 415 of file Worker.h.

Referenced by skipOnPath().

std::atomic<bool> edm::Worker::workStarted_
private

Definition at line 416 of file Worker.h.