CMS 3D CMS Logo

List of all members | Classes | Public Types | Public Member Functions | Protected Member Functions | Private Member Functions | Static Private Member Functions | Private Attributes | Friends
edm::Worker Class Referenceabstract

#include <Worker.h>

Inheritance diagram for edm::Worker:
edm::WorkerT< T >

Classes

class  RunModuleTask
 
struct  TaskQueueAdaptor
 
class  TransitionIDValue
 
class  TransitionIDValueBase
 

Public Types

enum  State { Ready, Pass, Fail, Exception }
 
enum  Types { kAnalyzer, kFilter, kProducer, kOutputModule }
 

Public Member Functions

void addedToPath ()
 
void beginJob ()
 
void beginStream (StreamID id, StreamContext &streamContext)
 
void callWhenDoneAsync (WaitingTask *task)
 
void clearCounters ()
 
virtual std::vector< ConsumesInfoconsumesInfo () const =0
 
virtual void convertCurrentProcessAlias (std::string const &processName)=0
 
ModuleDescription const * descPtr () const
 
ModuleDescription const & description () const
 
template<typename T >
bool doWork (typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
 
template<typename T >
void doWorkAsync (WaitingTask *task, typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
 
template<typename T >
void doWorkNoPrefetchingAsync (WaitingTask *task, typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
 
void endJob ()
 
void endStream (StreamID id, StreamContext &streamContext)
 
virtual void modulesWhoseProductsAreConsumed (std::vector< ModuleDescription const * > &modules, ProductRegistry const &preg, std::map< std::string, ModuleDescription const * > const &labelsToDesc) const =0
 
virtual Types moduleType () const =0
 
Workeroperator= (Worker const &)=delete
 
void postDoEvent (EventPrincipal const &)
 
void registerThinnedAssociations (ProductRegistry const &registry, ThinnedAssociationsHelper &helper)
 
void reset ()
 
virtual void resolvePutIndicies (BranchType iBranchType, std::unordered_multimap< std::string, std::tuple< TypeID const *, const char *, edm::ProductResolverIndex >> const &iIndicies)=0
 
void respondToCloseInputFile (FileBlock const &fb)
 
void respondToOpenInputFile (FileBlock const &fb)
 
template<typename T >
std::exception_ptr runModuleDirectly (typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
 
void setActivityRegistry (std::shared_ptr< ActivityRegistry > areg)
 
void setEarlyDeleteHelper (EarlyDeleteHelper *iHelper)
 
void skipOnPath ()
 
State state () const
 
int timesExcept () const
 
int timesFailed () const
 
int timesPass () const
 
int timesPassed () const
 
int timesRun () const
 
int timesVisited () const
 
virtual void updateLookup (BranchType iBranchType, ProductResolverIndexHelper const &)=0
 
 Worker (ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
 
 Worker (Worker const &)=delete
 
virtual ~Worker ()
 

Protected Member Functions

ActivityRegistryactivityRegistry ()
 
virtual void implBeginJob ()=0
 
virtual void implBeginStream (StreamID)=0
 
virtual bool implDo (EventPrincipal const &, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoBegin (RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoBegin (LuminosityBlockPrincipal const &lbp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoEnd (RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoEnd (LuminosityBlockPrincipal const &lbp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoPrePrefetchSelection (StreamID id, EventPrincipal const &ep, ModuleCallingContext const *mcc)=0
 
virtual bool implDoStreamBegin (StreamID id, RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoStreamBegin (StreamID id, LuminosityBlockPrincipal const &lbp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoStreamEnd (StreamID id, RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual bool implDoStreamEnd (StreamID id, LuminosityBlockPrincipal const &lbp, EventSetup const &c, ModuleCallingContext const *mcc)=0
 
virtual void implEndJob ()=0
 
virtual void implEndStream (StreamID)=0
 
void resetModuleDescription (ModuleDescription const *)
 
virtual std::string workerType () const =0
 

Private Member Functions

void emitPostModuleEventPrefetchingSignal ()
 
virtual void implRegisterThinnedAssociations (ProductRegistry const &, ThinnedAssociationsHelper &)=0
 
virtual void implRespondToCloseInputFile (FileBlock const &fb)=0
 
virtual void implRespondToOpenInputFile (FileBlock const &fb)=0
 
virtual void itemsMayGet (BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
 
virtual std::vector< ProductResolverIndex > const & itemsShouldPutInEvent () const =0
 
virtual void itemsToGet (BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
 
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom (BranchType) const =0
 
virtual void preActionBeforeRunEventAsync (WaitingTask *iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const =0
 
void prefetchAsync (WaitingTask *, ParentContext const &parentContext, Principal const &)
 
template<typename T >
bool runModule (typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
 
template<typename T >
std::exception_ptr runModuleAfterAsyncPrefetch (std::exception_ptr const *iEPtr, typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
 
virtual TaskQueueAdaptor serializeRunModule ()=0
 
template<bool IS_EVENT>
std::exception_ptr setException (std::exception_ptr iException)
 
template<bool IS_EVENT>
bool setFailed ()
 
template<bool IS_EVENT>
bool setPassed ()
 
bool shouldRethrowException (std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
 

Static Private Member Functions

static void exceptionContext (cms::Exception &ex, ModuleCallingContext const *mcc)
 

Private Attributes

ExceptionToActionTable const * actions_
 
std::shared_ptr< ActivityRegistryactReg_
 
std::exception_ptr cached_exception_
 
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
 
ModuleCallingContext moduleCallingContext_
 
std::atomic< int > numberOfPathsLeftToRun_
 
int numberOfPathsOn_
 
std::atomic< Statestate_
 
std::atomic< int > timesExcept_
 
std::atomic< int > timesFailed_
 
std::atomic< int > timesPassed_
 
std::atomic< int > timesRun_
 
std::atomic< int > timesVisited_
 
edm::WaitingTaskList waitingTasks_
 
std::atomic< bool > workStarted_
 

Friends

template<typename O >
class workerhelper::CallImpl
 

Detailed Description

Definition at line 78 of file Worker.h.

Member Enumeration Documentation

Enumerator
Ready 
Pass 
Fail 
Exception 

Definition at line 80 of file Worker.h.

Enumerator
kAnalyzer 
kFilter 
kProducer 
kOutputModule 

Definition at line 81 of file Worker.h.

Constructor & Destructor Documentation

edm::Worker::Worker ( ModuleDescription const &  iMD,
ExceptionToActionTable const *  iActions 
)

Definition at line 72 of file Worker.cc.

73  :
74  timesRun_(0),
75  timesVisited_(0),
76  timesPassed_(0),
77  timesFailed_(0),
78  timesExcept_(0),
79  state_(Ready),
83  actions_(iActions),
85  actReg_(),
86  earlyDeleteHelper_(nullptr),
87  workStarted_(false)
88  {
89  }
std::atomic< int > timesVisited_
Definition: Worker.h:430
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:436
std::atomic< int > timesExcept_
Definition: Worker.h:433
std::atomic< bool > workStarted_
Definition: Worker.h:448
std::atomic< int > timesFailed_
Definition: Worker.h:432
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:443
ExceptionToActionTable const * actions_
Definition: Worker.h:440
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:438
std::exception_ptr cached_exception_
Definition: Worker.h:441
std::atomic< State > state_
Definition: Worker.h:434
int numberOfPathsOn_
Definition: Worker.h:435
std::atomic< int > timesPassed_
Definition: Worker.h:431
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:445
std::atomic< int > timesRun_
Definition: Worker.h:429
edm::Worker::~Worker ( )
virtual

Definition at line 91 of file Worker.cc.

91  {
92  }
edm::Worker::Worker ( Worker const &  )
delete

Member Function Documentation

ActivityRegistry* edm::Worker::activityRegistry ( )
inlineprotected

Definition at line 243 of file Worker.h.

References edm::exceptionContext(), and benchmark_cfg::fb.

Referenced by edm::WorkerT< T >::implDo().

243 { return actReg_.get(); }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:443
void edm::Worker::addedToPath ( )
inline

Definition at line 199 of file Worker.h.

Referenced by edm::WorkerInPath::WorkerInPath().

199  {
201  }
int numberOfPathsOn_
Definition: Worker.h:435
void edm::Worker::beginJob ( void  )

Definition at line 245 of file Worker.cc.

References actReg_, cms::Exception::addContext(), description(), Exception, implBeginJob(), edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), state_, and edm::convertException::wrap().

Referenced by edm::WorkerManager::beginJob(), and edm::GlobalSchedule::replaceModule().

245  {
246  try {
247  convertException::wrap([&]() {
248  ModuleBeginJobSignalSentry cpp(actReg_.get(), description());
249  implBeginJob();
250  });
251  }
252  catch(cms::Exception& ex) {
253  state_ = Exception;
254  std::ostringstream ost;
255  ost << "Calling beginJob for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
256  ex.addContext(ost.str());
257  throw;
258  }
259  }
ModuleDescription const & description() const
Definition: Worker.h:167
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:443
std::string const & moduleName() const
std::string const & moduleLabel() const
std::atomic< State > state_
Definition: Worker.h:434
virtual void implBeginJob()=0
void addContext(std::string const &context)
Definition: Exception.cc:227
auto wrap(F iFunc) -> decltype(iFunc())
void edm::Worker::beginStream ( StreamID  id,
StreamContext streamContext 
)

Definition at line 277 of file Worker.cc.

References actReg_, cms::Exception::addContext(), description(), Exception, implBeginStream(), edm::LuminosityBlockIndex::invalidLuminosityBlockIndex(), edm::RunIndex::invalidRunIndex(), edm::StreamContext::kBeginStream, edm::ModuleCallingContext::kRunning, moduleCallingContext_, edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), edm::StreamContext::setEventID(), edm::StreamContext::setLuminosityBlockIndex(), edm::StreamContext::setRunIndex(), edm::ModuleCallingContext::setState(), edm::StreamContext::setTimestamp(), edm::StreamContext::setTransition(), state_, and edm::convertException::wrap().

Referenced by edm::StreamSchedule::replaceModule().

277  {
278  try {
279  convertException::wrap([&]() {
280  streamContext.setTransition(StreamContext::Transition::kBeginStream);
281  streamContext.setEventID(EventID(0, 0, 0));
282  streamContext.setRunIndex(RunIndex::invalidRunIndex());
283  streamContext.setLuminosityBlockIndex(LuminosityBlockIndex::invalidLuminosityBlockIndex());
284  streamContext.setTimestamp(Timestamp());
285  ParentContext parentContext(&streamContext);
286  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
288  ModuleBeginStreamSignalSentry beginSentry(actReg_.get(), streamContext, moduleCallingContext_);
289  implBeginStream(id);
290  });
291  }
292  catch(cms::Exception& ex) {
293  state_ = Exception;
294  std::ostringstream ost;
295  ost << "Calling beginStream for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
296  ex.addContext(ost.str());
297  throw;
298  }
299  }
ModuleDescription const & description() const
Definition: Worker.h:167
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:443
std::string const & moduleName() const
std::string const & moduleLabel() const
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:438
static RunIndex invalidRunIndex()
Definition: RunIndex.cc:9
std::atomic< State > state_
Definition: Worker.h:434
static LuminosityBlockIndex invalidLuminosityBlockIndex()
void addContext(std::string const &context)
Definition: Exception.cc:227
virtual void implBeginStream(StreamID)=0
auto wrap(F iFunc) -> decltype(iFunc())
void edm::Worker::callWhenDoneAsync ( WaitingTask task)
inline

Definition at line 144 of file Worker.h.

References bk::beginJob().

144  {
145  waitingTasks_.add(task);
146  }
void add(WaitingTask *)
Adds task to the waiting list.
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:447
void edm::Worker::clearCounters ( )
inline

Definition at line 191 of file Worker.h.

Referenced by edm::StreamSchedule::clearCounters().

191  {
192  timesRun_.store(0,std::memory_order_release);
193  timesVisited_.store(0,std::memory_order_release);
194  timesPassed_.store(0,std::memory_order_release);
195  timesFailed_.store(0,std::memory_order_release);
196  timesExcept_.store(0,std::memory_order_release);
197  }
std::atomic< int > timesVisited_
Definition: Worker.h:430
std::atomic< int > timesExcept_
Definition: Worker.h:433
std::atomic< int > timesFailed_
Definition: Worker.h:432
std::atomic< int > timesPassed_
Definition: Worker.h:431
std::atomic< int > timesRun_
Definition: Worker.h:429
virtual std::vector<ConsumesInfo> edm::Worker::consumesInfo ( ) const
pure virtual
virtual void edm::Worker::convertCurrentProcessAlias ( std::string const &  processName)
pure virtual

Implemented in edm::WorkerT< T >.

ModuleDescription const* edm::Worker::descPtr ( ) const
inline

Definition at line 168 of file Worker.h.

References modifiedElectrons_cfi::processName, and AlCaHLTBitMon_QueryRunRegistry::string.

ModuleCallingContext moduleCallingContext_
Definition: Worker.h:438
ModuleDescription const * moduleDescription() const
ModuleDescription const& edm::Worker::description ( ) const
inline
template<typename T >
bool edm::Worker::doWork ( typename T::MyPrincipal const &  ep,
EventSetup const &  c,
StreamID  stream,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

Definition at line 747 of file Worker.h.

References Fail, edm::ServiceRegistry::instance(), edm::ModuleCallingContext::kInvalid, edm::ModuleCallingContext::kPrefetching, edm::make_empty_waiting_task(), Pass, edm::ServiceRegistry::presentToken(), mps_fire::queue, Ready, and edm::ModuleCallingContext::setContext().

751  {
752 
753  if (T::isEvent_) {
754  timesVisited_.fetch_add(1,std::memory_order_relaxed);
755  }
756  bool rc = false;
757 
758  switch(state_) {
759  case Ready: break;
760  case Pass: return true;
761  case Fail: return false;
762  case Exception: {
763  std::rethrow_exception(cached_exception_);
764  }
765  }
766 
767  bool expected = false;
768  if(not workStarted_.compare_exchange_strong(expected, true) ) {
769  //another thread beat us here
770  auto waitTask = edm::make_empty_waiting_task();
771  waitTask->increment_ref_count();
772 
773  waitingTasks_.add(waitTask.get());
774 
775  waitTask->wait_for_all();
776 
777  switch(state_) {
778  case Ready: assert(false);
779  case Pass: return true;
780  case Fail: return false;
781  case Exception: {
782  std::rethrow_exception(cached_exception_);
783  }
784  }
785  }
786 
787  //Need the context to be set until after any exception is resolved
789 
790  auto resetContext = [](ModuleCallingContext* iContext) {iContext->setContext(ModuleCallingContext::State::kInvalid,ParentContext(),nullptr); };
791  std::unique_ptr<ModuleCallingContext, decltype(resetContext)> prefetchSentry(&moduleCallingContext_,resetContext);
792 
793  if (T::isEvent_) {
794  try {
795  //if have TriggerResults based selection we want to reject the event before doing prefetching
796  if( not workerhelper::CallImpl<T>::prePrefetchSelection(this,streamID,ep,&moduleCallingContext_) ) {
797  timesRun_.fetch_add(1,std::memory_order_relaxed);
798  rc = setPassed<T::isEvent_>();
799  waitingTasks_.doneWaiting(nullptr);
800  return true;
801  }
802  }catch(...) {}
803  auto waitTask = edm::make_empty_waiting_task();
804  {
805  //Make sure signal is sent once the prefetching is done
806  // [the 'pre' signal was sent in prefetchAsync]
807  //The purpose of this block is to send the signal after wait_for_all
808  auto sentryFunc = [this](void*) {
810  };
811  std::unique_ptr<ActivityRegistry, decltype(sentryFunc)> signalSentry(actReg_.get(),sentryFunc);
812 
813  //set count to 2 since wait_for_all requires value to not go to 0
814  waitTask->set_ref_count(2);
815 
816  prefetchAsync(waitTask.get(),parentContext, ep);
817  waitTask->decrement_ref_count();
818  waitTask->wait_for_all();
819  }
820  if(waitTask->exceptionPtr() != nullptr) {
821  TransitionIDValue<typename T::MyPrincipal> idValue(ep);
822  if(shouldRethrowException(*waitTask->exceptionPtr(), parentContext, T::isEvent_, idValue)) {
823  setException<T::isEvent_>(*waitTask->exceptionPtr());
825  std::rethrow_exception(cached_exception_);
826  } else {
827  setPassed<T::isEvent_>();
828  waitingTasks_.doneWaiting(nullptr);
829  return true;
830  }
831  }
832  }
833 
834  //successful prefetch so no reset necessary
835  prefetchSentry.release();
836  if(auto queue = serializeRunModule()) {
837  auto serviceToken = ServiceRegistry::instance().presentToken();
838  queue.pushAndWait([&]() {
839  //Need to make the services available
840  ServiceRegistry::Operate guard(serviceToken);
841  try {
842  rc = runModule<T>(ep,es,streamID,parentContext,context);
843  } catch(...) {
844  }
845  });
846  } else {
847  try {
848  rc = runModule<T>(ep,es,streamID,parentContext,context);
849  } catch(...) {
850  }
851  }
852  if(state_ == Exception) {
854  std::rethrow_exception(cached_exception_);
855  }
856 
857  waitingTasks_.doneWaiting(nullptr);
858  return rc;
859  }
std::atomic< int > timesVisited_
Definition: Worker.h:430
std::atomic< bool > workStarted_
Definition: Worker.h:448
void add(WaitingTask *)
Adds task to the waiting list.
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:443
ServiceToken presentToken() const
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:438
std::exception_ptr cached_exception_
Definition: Worker.h:441
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
Definition: Worker.cc:157
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:447
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
static ServiceRegistry & instance()
std::atomic< State > state_
Definition: Worker.h:434
virtual TaskQueueAdaptor serializeRunModule()=0
void prefetchAsync(WaitingTask *, ParentContext const &parentContext, Principal const &)
Definition: Worker.cc:205
void emitPostModuleEventPrefetchingSignal()
Definition: Worker.h:335
std::atomic< int > timesRun_
Definition: Worker.h:429
template<typename T >
void edm::Worker::doWorkAsync ( WaitingTask task,
typename T::MyPrincipal const &  ep,
EventSetup const &  c,
StreamID  stream,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

Definition at line 649 of file Worker.h.

References edm::ModuleCallingContext::kPrefetching, and edm::ModuleCallingContext::setContext().

Referenced by edm::WorkerInPath::runWorkerAsync().

654  {
655  waitingTasks_.add(task);
656  if(T::isEvent_) {
657  timesVisited_.fetch_add(1,std::memory_order_relaxed);
658  }
659 
660  bool expected = false;
661  if(workStarted_.compare_exchange_strong(expected,true)) {
663 
664  //if have TriggerResults based selection we want to reject the event before doing prefetching
665  try {
666  if( not workerhelper::CallImpl<T>::prePrefetchSelection(this,streamID,ep,&moduleCallingContext_) ) {
667  setPassed<T::isEvent_>();
668  waitingTasks_.doneWaiting(nullptr);
669  return;
670  }
671  }catch(...) {}
672 
673  auto runTask = new (tbb::task::allocate_root()) RunModuleTask<T>(
674  this, ep,es,streamID,parentContext,context);
675  prefetchAsync(runTask, parentContext, ep);
676  }
677  }
std::atomic< int > timesVisited_
Definition: Worker.h:430
std::atomic< bool > workStarted_
Definition: Worker.h:448
void add(WaitingTask *)
Adds task to the waiting list.
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:438
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:447
void prefetchAsync(WaitingTask *, ParentContext const &parentContext, Principal const &)
Definition: Worker.cc:205
template<typename T >
void edm::Worker::doWorkNoPrefetchingAsync ( WaitingTask task,
typename T::MyPrincipal const &  principal,
EventSetup const &  c,
StreamID  stream,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

Definition at line 709 of file Worker.h.

References edm::ServiceRegistry::instance(), edm::make_functor_task(), edm::ServiceRegistry::presentToken(), cmsRelvalreport::principal(), and mps_fire::queue.

Referenced by edm::WorkerInPath::runWorkerAsync().

714  {
715  waitingTasks_.add(task);
716  bool expected = false;
717  if(workStarted_.compare_exchange_strong(expected,true)) {
718  auto serviceToken = ServiceRegistry::instance().presentToken();
719 
720  auto toDo =[this, &principal, &es, streamID,parentContext,context, serviceToken]()
721  {
722  std::exception_ptr exceptionPtr;
723  try {
724  //Need to make the services available
725  ServiceRegistry::Operate guard(serviceToken);
726 
727  this->runModule<T>(principal,
728  es,
729  streamID,
730  parentContext,
731  context);
732  } catch( ... ) {
733  exceptionPtr = std::current_exception();
734  }
735  this->waitingTasks_.doneWaiting(exceptionPtr);
736  };
737  if(auto queue = this->serializeRunModule()) {
738  queue.push( toDo);
739  } else {
740  auto task = make_functor_task( tbb::task::allocate_root(), toDo);
741  tbb::task::spawn(*task);
742  }
743  }
744  }
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
std::atomic< bool > workStarted_
Definition: Worker.h:448
void add(WaitingTask *)
Adds task to the waiting list.
ServiceToken presentToken() const
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
def principal(options)
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:447
static ServiceRegistry & instance()
virtual TaskQueueAdaptor serializeRunModule()=0
void edm::Worker::emitPostModuleEventPrefetchingSignal ( )
inlineprivate

Definition at line 335 of file Worker.h.

335  {
336  actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),moduleCallingContext_);
337  }
StreamContext const * getStreamContext() const
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:443
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:438
void edm::Worker::endJob ( void  )

Definition at line 261 of file Worker.cc.

References actReg_, cms::Exception::addContext(), description(), Exception, implEndJob(), edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), state_, and edm::convertException::wrap().

261  {
262  try {
263  convertException::wrap([&]() {
264  ModuleEndJobSignalSentry cpp(actReg_.get(), description());
265  implEndJob();
266  });
267  }
268  catch(cms::Exception& ex) {
269  state_ = Exception;
270  std::ostringstream ost;
271  ost << "Calling endJob for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
272  ex.addContext(ost.str());
273  throw;
274  }
275  }
ModuleDescription const & description() const
Definition: Worker.h:167
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:443
std::string const & moduleName() const
std::string const & moduleLabel() const
std::atomic< State > state_
Definition: Worker.h:434
virtual void implEndJob()=0
void addContext(std::string const &context)
Definition: Exception.cc:227
auto wrap(F iFunc) -> decltype(iFunc())
void edm::Worker::endStream ( StreamID  id,
StreamContext streamContext 
)

Definition at line 301 of file Worker.cc.

References actReg_, cms::Exception::addContext(), description(), Exception, implEndStream(), edm::LuminosityBlockIndex::invalidLuminosityBlockIndex(), edm::RunIndex::invalidRunIndex(), edm::StreamContext::kEndStream, edm::ModuleCallingContext::kRunning, moduleCallingContext_, edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), edm::StreamContext::setEventID(), edm::StreamContext::setLuminosityBlockIndex(), edm::StreamContext::setRunIndex(), edm::ModuleCallingContext::setState(), edm::StreamContext::setTimestamp(), edm::StreamContext::setTransition(), state_, and edm::convertException::wrap().

301  {
302  try {
303  convertException::wrap([&]() {
304  streamContext.setTransition(StreamContext::Transition::kEndStream);
305  streamContext.setEventID(EventID(0, 0, 0));
306  streamContext.setRunIndex(RunIndex::invalidRunIndex());
307  streamContext.setLuminosityBlockIndex(LuminosityBlockIndex::invalidLuminosityBlockIndex());
308  streamContext.setTimestamp(Timestamp());
309  ParentContext parentContext(&streamContext);
310  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
312  ModuleEndStreamSignalSentry endSentry(actReg_.get(), streamContext, moduleCallingContext_);
313  implEndStream(id);
314  });
315  }
316  catch(cms::Exception& ex) {
317  state_ = Exception;
318  std::ostringstream ost;
319  ost << "Calling endStream for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
320  ex.addContext(ost.str());
321  throw;
322  }
323  }
ModuleDescription const & description() const
Definition: Worker.h:167
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:443
std::string const & moduleName() const
std::string const & moduleLabel() const
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:438
static RunIndex invalidRunIndex()
Definition: RunIndex.cc:9
std::atomic< State > state_
Definition: Worker.h:434
static LuminosityBlockIndex invalidLuminosityBlockIndex()
void addContext(std::string const &context)
Definition: Exception.cc:227
virtual void implEndStream(StreamID)=0
auto wrap(F iFunc) -> decltype(iFunc())
void edm::Worker::exceptionContext ( cms::Exception ex,
ModuleCallingContext const *  mcc 
)
staticprivate

Definition at line 99 of file Worker.cc.

References cms::Exception::addContext(), edm::exceptionContext(), edm::ModuleCallingContext::globalContext(), edm::ModuleCallingContext::internalContext(), edm::ParentContext::kGlobal, edm::ParentContext::kInternal, edm::ParentContext::kModule, edm::ParentContext::kPlaceInPath, edm::ModuleCallingContext::kPrefetching, edm::ParentContext::kStream, edm::InternalContext::moduleCallingContext(), edm::ModuleCallingContext::moduleCallingContext(), edm::ModuleCallingContext::moduleDescription(), edm::ModuleDescription::moduleLabel(), edm::ModuleDescription::moduleName(), or, edm::PlaceInPathContext::pathContext(), edm::PathContext::pathName(), edm::ModuleCallingContext::placeInPathContext(), edm::ModuleCallingContext::state(), edm::PathContext::streamContext(), edm::ModuleCallingContext::streamContext(), and edm::ModuleCallingContext::type().

101  {
102 
103  ModuleCallingContext const* imcc = mcc;
104  while( (imcc->type() == ParentContext::Type::kModule) or
105  (imcc->type() == ParentContext::Type::kInternal) ) {
106  std::ostringstream iost;
107  if( imcc->state() == ModuleCallingContext::State::kPrefetching ) {
108  iost << "Prefetching for module ";
109  } else {
110  iost << "Calling method for module ";
111  }
112  iost << imcc->moduleDescription()->moduleName() << "/'"
113  << imcc->moduleDescription()->moduleLabel() << "'";
114 
115  if(imcc->type() == ParentContext::Type::kInternal) {
116  iost << " (probably inside some kind of mixing module)";
117  imcc = imcc->internalContext()->moduleCallingContext();
118  } else {
119  imcc = imcc->moduleCallingContext();
120  }
121  ex.addContext(iost.str());
122  }
123  std::ostringstream ost;
124  if( imcc->state() == ModuleCallingContext::State::kPrefetching ) {
125  ost << "Prefetching for module ";
126  } else {
127  ost << "Calling method for module ";
128  }
129  ost << imcc->moduleDescription()->moduleName() << "/'"
130  << imcc->moduleDescription()->moduleLabel() << "'";
131  ex.addContext(ost.str());
132 
133  if (imcc->type() == ParentContext::Type::kPlaceInPath) {
134  ost.str("");
135  ost << "Running path '";
136  ost << imcc->placeInPathContext()->pathContext()->pathName() << "'";
137  ex.addContext(ost.str());
138  auto streamContext =imcc->placeInPathContext()->pathContext()->streamContext();
139  if(streamContext) {
140  ost.str("");
141  edm::exceptionContext(ost,*streamContext);
142  ex.addContext(ost.str());
143  }
144  } else {
145  if (imcc->type() == ParentContext::Type::kStream) {
146  ost.str("");
147  edm::exceptionContext(ost, *(imcc->streamContext()) );
148  ex.addContext(ost.str());
149  } else if(imcc->type() == ParentContext::Type::kGlobal) {
150  ost.str("");
151  edm::exceptionContext(ost, *(imcc->globalContext()) );
152  ex.addContext(ost.str());
153  }
154  }
155  }
void exceptionContext(std::ostream &, GlobalContext const &)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
void addContext(std::string const &context)
Definition: Exception.cc:227
virtual void edm::Worker::implBeginJob ( )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by beginJob().

virtual void edm::Worker::implBeginStream ( StreamID  )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by beginStream().

virtual bool edm::Worker::implDo ( EventPrincipal const &  ,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual
virtual bool edm::Worker::implDoBegin ( RunPrincipal const &  rp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual
virtual bool edm::Worker::implDoBegin ( LuminosityBlockPrincipal const &  lbp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

virtual bool edm::Worker::implDoEnd ( RunPrincipal const &  rp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual
virtual bool edm::Worker::implDoEnd ( LuminosityBlockPrincipal const &  lbp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

virtual bool edm::Worker::implDoPrePrefetchSelection ( StreamID  id,
EventPrincipal const &  ep,
ModuleCallingContext const *  mcc 
)
protectedpure virtual
virtual bool edm::Worker::implDoStreamBegin ( StreamID  id,
RunPrincipal const &  rp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual
virtual bool edm::Worker::implDoStreamBegin ( StreamID  id,
LuminosityBlockPrincipal const &  lbp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

virtual bool edm::Worker::implDoStreamEnd ( StreamID  id,
RunPrincipal const &  rp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual
virtual bool edm::Worker::implDoStreamEnd ( StreamID  id,
LuminosityBlockPrincipal const &  lbp,
EventSetup const &  c,
ModuleCallingContext const *  mcc 
)
protectedpure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::implEndJob ( )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by endJob().

virtual void edm::Worker::implEndStream ( StreamID  )
protectedpure virtual

Implemented in edm::WorkerT< T >.

Referenced by endStream().

virtual void edm::Worker::implRegisterThinnedAssociations ( ProductRegistry const &  ,
ThinnedAssociationsHelper  
)
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::implRespondToCloseInputFile ( FileBlock const &  fb)
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::implRespondToOpenInputFile ( FileBlock const &  fb)
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::itemsMayGet ( BranchType  ,
std::vector< ProductResolverIndexAndSkipBit > &   
) const
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual std::vector<ProductResolverIndex> const& edm::Worker::itemsShouldPutInEvent ( ) const
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual void edm::Worker::itemsToGet ( BranchType  ,
std::vector< ProductResolverIndexAndSkipBit > &   
) const
privatepure virtual

Implemented in edm::WorkerT< T >.

virtual std::vector<ProductResolverIndexAndSkipBit> const& edm::Worker::itemsToGetFrom ( BranchType  ) const
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by prefetchAsync().

virtual void edm::Worker::modulesWhoseProductsAreConsumed ( std::vector< ModuleDescription const * > &  modules,
ProductRegistry const &  preg,
std::map< std::string, ModuleDescription const * > const &  labelsToDesc 
) const
pure virtual

Implemented in edm::WorkerT< T >.

virtual Types edm::Worker::moduleType ( ) const
pure virtual
Worker& edm::Worker::operator= ( Worker const &  )
delete
void edm::Worker::postDoEvent ( EventPrincipal const &  iEvent)

Definition at line 331 of file Worker.cc.

References earlyDeleteHelper_.

Referenced by edm::WorkerT< T >::implDo().

331  {
332  if(earlyDeleteHelper_) {
333  earlyDeleteHelper_->moduleRan(iEvent);
334  }
335  }
int iEvent
Definition: GenABIO.cc:230
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:445
virtual void edm::Worker::preActionBeforeRunEventAsync ( WaitingTask iTask,
ModuleCallingContext const &  moduleCallingContext,
Principal const &  iPrincipal 
) const
privatepure virtual

Implemented in edm::WorkerT< T >.

Referenced by prefetchAsync().

void edm::Worker::prefetchAsync ( WaitingTask iTask,
ParentContext const &  parentContext,
Principal const &  iPrincipal 
)
private

Definition at line 205 of file Worker.cc.

References actReg_, edm::Principal::branchType(), edm::ModuleCallingContext::getStreamContext(), edm::InEvent, mps_monitormerge::items, itemsToGetFrom(), edm::ModuleCallingContext::kPrefetching, moduleCallingContext_, preActionBeforeRunEventAsync(), edm::Principal::prefetchAsync(), edm::ProductResolverIndexAmbiguous, and edm::ModuleCallingContext::setContext().

205  {
206  // Prefetch products the module declares it consumes (not including the products it maybe consumes)
207  std::vector<ProductResolverIndexAndSkipBit> const& items = itemsToGetFrom(iPrincipal.branchType());
208 
210 
211  if(iPrincipal.branchType()==InEvent) {
212  actReg_->preModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),moduleCallingContext_);
213  }
214 
215  //Need to be sure the ref count isn't set to 0 immediately
216  iTask->increment_ref_count();
217  for(auto const& item : items) {
218  ProductResolverIndex productResolverIndex = item.productResolverIndex();
219  bool skipCurrentProcess = item.skipCurrentProcess();
220  if(productResolverIndex != ProductResolverIndexAmbiguous) {
221  iPrincipal.prefetchAsync(iTask,productResolverIndex, skipCurrentProcess, &moduleCallingContext_);
222  }
223  }
224 
225  if(iPrincipal.branchType()==InEvent) {
227  }
228 
229  if(0 == iTask->decrement_ref_count()) {
230  //if everything finishes before we leave this routine, we need to launch the task
231  tbb::task::spawn(*iTask);
232  }
233  }
unsigned int ProductResolverIndex
StreamContext const * getStreamContext() const
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:443
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:438
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom(BranchType) const =0
virtual void preActionBeforeRunEventAsync(WaitingTask *iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const =0
void edm::Worker::registerThinnedAssociations ( ProductRegistry const &  registry,
ThinnedAssociationsHelper helper 
)
inline

Definition at line 155 of file Worker.h.

Definition: helper.py:1
virtual void implRegisterThinnedAssociations(ProductRegistry const &, ThinnedAssociationsHelper &)=0
static Interceptor::Registry registry("Interceptor")
void edm::Worker::reset ( void  )
inline

Definition at line 157 of file Worker.h.

References Ready.

Referenced by edm::WorkerManager::resetAll().

157  {
158  cached_exception_ = std::exception_ptr();
159  state_ = Ready;
161  workStarted_ = false;
163  }
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:436
std::atomic< bool > workStarted_
Definition: Worker.h:448
void reset()
Resets access to the resource so that added tasks will wait.
std::exception_ptr cached_exception_
Definition: Worker.h:441
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:447
std::atomic< State > state_
Definition: Worker.h:434
int numberOfPathsOn_
Definition: Worker.h:435
void edm::Worker::resetModuleDescription ( ModuleDescription const *  iDesc)
protected
virtual void edm::Worker::resolvePutIndicies ( BranchType  iBranchType,
std::unordered_multimap< std::string, std::tuple< TypeID const *, const char *, edm::ProductResolverIndex >> const &  iIndicies 
)
pure virtual

Implemented in edm::WorkerT< T >.

Referenced by edm::Schedule::changeModule().

void edm::Worker::respondToCloseInputFile ( FileBlock const &  fb)
inline

Definition at line 153 of file Worker.h.

Referenced by edm::Schedule::respondToCloseInputFile().

virtual void implRespondToCloseInputFile(FileBlock const &fb)=0
void edm::Worker::respondToOpenInputFile ( FileBlock const &  fb)
inline

Definition at line 152 of file Worker.h.

Referenced by edm::Schedule::respondToOpenInputFile().

virtual void implRespondToOpenInputFile(FileBlock const &fb)=0
template<typename T >
bool edm::Worker::runModule ( typename T::MyPrincipal const &  ep,
EventSetup const &  c,
StreamID  stream,
ParentContext const &  parentContext,
typename T::Context const *  context 
)
private

Definition at line 863 of file Worker.h.

References edm::exceptionContext(), edm::EventSetup::get(), and edm::convertException::wrap().

867  {
868  //unscheduled producers should advance this
869  //if (T::isEvent_) {
870  // ++timesVisited_;
871  //}
872  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
873  if (T::isEvent_) {
874  timesRun_.fetch_add(1,std::memory_order_relaxed);
875  }
876 
877  bool rc = true;
878  try {
880  {
881  rc = workerhelper::CallImpl<T>::call(this,streamID,ep,es, actReg_.get(), &moduleCallingContext_, context);
882 
883  if (rc) {
884  setPassed<T::isEvent_>();
885  } else {
886  setFailed<T::isEvent_>();
887  }
888  });
889  } catch(cms::Exception& ex) {
891  TransitionIDValue<typename T::MyPrincipal> idValue(ep);
892  if(shouldRethrowException(std::current_exception(), parentContext, T::isEvent_, idValue)) {
893  assert(not cached_exception_);
894  setException<T::isEvent_>(std::current_exception());
895  std::rethrow_exception(cached_exception_);
896  } else {
897  rc = setPassed<T::isEvent_>();
898  }
899  }
900 
901  return rc;
902  }
static void exceptionContext(cms::Exception &ex, ModuleCallingContext const *mcc)
Definition: Worker.cc:99
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:443
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:438
std::exception_ptr cached_exception_
Definition: Worker.h:441
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
Definition: Worker.cc:157
auto wrap(F iFunc) -> decltype(iFunc())
std::atomic< int > timesRun_
Definition: Worker.h:429
template<typename T >
std::exception_ptr edm::Worker::runModuleAfterAsyncPrefetch ( std::exception_ptr const *  iEPtr,
typename T::MyPrincipal const &  ep,
EventSetup const &  es,
StreamID  streamID,
ParentContext const &  parentContext,
typename T::Context const *  context 
)
private

Definition at line 680 of file Worker.h.

References edm::ModuleCallingContext::kInvalid, and edm::ModuleCallingContext::setContext().

Referenced by edm::Worker::RunModuleTask< T >::execute().

685  {
686  std::exception_ptr exceptionPtr;
687  if(iEPtr) {
688  assert(*iEPtr);
689  TransitionIDValue<typename T::MyPrincipal> idValue(ep);
690  if(shouldRethrowException(*iEPtr, parentContext, T::isEvent_, idValue)) {
691  exceptionPtr = *iEPtr;
692  setException<T::isEvent_>(exceptionPtr);
693  } else {
694  setPassed<T::isEvent_>();
695  }
697  } else {
698  try {
699  runModule<T>(ep,es,streamID,parentContext,context);
700  } catch(...) {
701  exceptionPtr = std::current_exception();
702  }
703  }
704  waitingTasks_.doneWaiting(exceptionPtr);
705  return exceptionPtr;
706  }
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:438
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
Definition: Worker.cc:157
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:447
template<typename T >
std::exception_ptr edm::Worker::runModuleDirectly ( typename T::MyPrincipal const &  ep,
EventSetup const &  es,
StreamID  streamID,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

Definition at line 905 of file Worker.h.

Referenced by edm::Path::finished().

909  {
910 
911  timesVisited_.fetch_add(1,std::memory_order_relaxed);
912  std::exception_ptr const* prefetchingException = nullptr; // null because there was no prefetching to do
913  return runModuleAfterAsyncPrefetch<T>(prefetchingException, ep, es, streamID, parentContext, context);
914  }
std::atomic< int > timesVisited_
Definition: Worker.h:430
virtual TaskQueueAdaptor edm::Worker::serializeRunModule ( )
privatepure virtual
void edm::Worker::setActivityRegistry ( std::shared_ptr< ActivityRegistry areg)

The signals are required to live longer than the last call to 'doWork' this was done to improve performance based on profiling

Definition at line 94 of file Worker.cc.

References actReg_.

94  {
95  actReg_ = areg;
96  }
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:443
void edm::Worker::setEarlyDeleteHelper ( EarlyDeleteHelper iHelper)

Definition at line 235 of file Worker.cc.

References earlyDeleteHelper_.

235  {
236  earlyDeleteHelper_=iHelper;
237  }
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:445
template<bool IS_EVENT>
std::exception_ptr edm::Worker::setException ( std::exception_ptr  iException)
inlineprivate

Definition at line 322 of file Worker.h.

References Exception.

322  {
323  if (IS_EVENT) {
324  timesExcept_.fetch_add(1,std::memory_order_relaxed);
325  }
326  cached_exception_ = iException; // propagate_const<T> has no reset() function
327  state_ = Exception;
328  return cached_exception_;
329  }
std::atomic< int > timesExcept_
Definition: Worker.h:433
std::exception_ptr cached_exception_
Definition: Worker.h:441
std::atomic< State > state_
Definition: Worker.h:434
template<bool IS_EVENT>
bool edm::Worker::setFailed ( )
inlineprivate

Definition at line 313 of file Worker.h.

References Fail.

313  {
314  if(IS_EVENT) {
315  timesFailed_.fetch_add(1,std::memory_order_relaxed);
316  }
317  state_ = Fail;
318  return false;
319  }
std::atomic< int > timesFailed_
Definition: Worker.h:432
std::atomic< State > state_
Definition: Worker.h:434
template<bool IS_EVENT>
bool edm::Worker::setPassed ( )
inlineprivate

Definition at line 304 of file Worker.h.

References Pass.

304  {
305  if(IS_EVENT) {
306  timesPassed_.fetch_add(1,std::memory_order_relaxed);
307  }
308  state_ = Pass;
309  return true;
310  }
std::atomic< State > state_
Definition: Worker.h:434
std::atomic< int > timesPassed_
Definition: Worker.h:431
bool edm::Worker::shouldRethrowException ( std::exception_ptr  iPtr,
ParentContext const &  parentContext,
bool  isEvent,
TransitionIDValueBase const &  iID 
) const
private

Definition at line 157 of file Worker.cc.

References mps_alisetup::action, actions_, cms::Exception::category(), description(), edm::exception_actions::FailPath, edm::ExceptionToActionTable::find(), edm::ModuleCallingContext::getTopModuleCallingContext(), edm::exception_actions::IgnoreCompletely, edm::PathContext::isEndPath(), edm::ModuleCallingContext::kInvalid, edm::ParentContext::kPlaceInPath, edm::PlaceInPathContext::pathContext(), edm::ModuleCallingContext::placeInPathContext(), edm::printCmsExceptionWarning(), edm::exception_actions::Rethrow, edm::exception_actions::SkipEvent, edm::ModuleCallingContext::type(), and edm::convertException::wrap().

160  {
161 
162  // NOTE: the warning printed as a result of ignoring or failing
163  // a module will only be printed during the full true processing
164  // pass of this module
165 
166  // Get the action corresponding to this exception. However, if processing
167  // something other than an event (e.g. run, lumi) always rethrow.
168  if(not isEvent) {
169  return true;
170  }
171  try {
172  convertException::wrap([&]() {
173  std::rethrow_exception(iPtr);
174  });
175  } catch(cms::Exception &ex) {
177 
178  if(action == exception_actions::Rethrow) {
179  return true;
180  }
181 
182  ModuleCallingContext tempContext(&description(),ModuleCallingContext::State::kInvalid, parentContext, nullptr);
183 
184  // If we are processing an endpath and the module was scheduled, treat SkipEvent or FailPath
185  // as IgnoreCompletely, so any subsequent OutputModules are still run.
186  // For unscheduled modules only treat FailPath as IgnoreCompletely but still allow SkipEvent to throw
187  ModuleCallingContext const* top_mcc = tempContext.getTopModuleCallingContext();
188  if(top_mcc->type() == ParentContext::Type::kPlaceInPath &&
189  top_mcc->placeInPathContext()->pathContext()->isEndPath()) {
190 
191  if ((action == exception_actions::SkipEvent && tempContext.type() == ParentContext::Type::kPlaceInPath) ||
192  action == exception_actions::FailPath) {
194  }
195  }
197  edm::printCmsExceptionWarning("IgnoreCompletely", ex);
198  return false;
199  }
200  }
201  return true;
202  }
ModuleDescription const & description() const
Definition: Worker.h:167
std::string const & category() const
Definition: Exception.cc:183
exception_actions::ActionCodes find(const std::string &category) const
ExceptionToActionTable const * actions_
Definition: Worker.h:440
auto wrap(F iFunc) -> decltype(iFunc())
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
void edm::Worker::skipOnPath ( )

Definition at line 325 of file Worker.cc.

References cached_exception_, edm::WaitingTaskList::doneWaiting(), numberOfPathsLeftToRun_, and waitingTasks_.

Referenced by edm::WorkerInPath::skipWorker().

325  {
326  if( 0 == --numberOfPathsLeftToRun_) {
328  }
329  }
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:436
std::exception_ptr cached_exception_
Definition: Worker.h:441
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:447
State edm::Worker::state ( ) const
inline

Definition at line 208 of file Worker.h.

Referenced by edm::WorkerInPath::checkResultsOfRunWorker().

208 { return state_; }
std::atomic< State > state_
Definition: Worker.h:434
int edm::Worker::timesExcept ( ) const
inline

Definition at line 207 of file Worker.h.

Referenced by edm::fillWorkerSummaryAux().

207 { return timesExcept_.load(std::memory_order_acquire); }
std::atomic< int > timesExcept_
Definition: Worker.h:433
int edm::Worker::timesFailed ( ) const
inline

Definition at line 206 of file Worker.h.

Referenced by edm::fillWorkerSummaryAux().

206 { return timesFailed_.load(std::memory_order_acquire); }
std::atomic< int > timesFailed_
Definition: Worker.h:432
int edm::Worker::timesPass ( ) const
inline

Definition at line 210 of file Worker.h.

210 { return timesPassed(); } // for backward compatibility only - to be removed soon
int timesPassed() const
Definition: Worker.h:205
int edm::Worker::timesPassed ( ) const
inline

Definition at line 205 of file Worker.h.

Referenced by edm::fillWorkerSummaryAux().

205 { return timesPassed_.load(std::memory_order_acquire); }
std::atomic< int > timesPassed_
Definition: Worker.h:431
int edm::Worker::timesRun ( ) const
inline

Definition at line 203 of file Worker.h.

Referenced by edm::fillWorkerSummaryAux().

203 { return timesRun_.load(std::memory_order_acquire); }
std::atomic< int > timesRun_
Definition: Worker.h:429
int edm::Worker::timesVisited ( ) const
inline

Definition at line 204 of file Worker.h.

Referenced by edm::fillWorkerSummaryAux().

204 { return timesVisited_.load(std::memory_order_acquire); }
std::atomic< int > timesVisited_
Definition: Worker.h:430
virtual void edm::Worker::updateLookup ( BranchType  iBranchType,
ProductResolverIndexHelper const &   
)
pure virtual

Implemented in edm::WorkerT< T >.

Referenced by edm::Schedule::changeModule().

virtual std::string edm::Worker::workerType ( ) const
protectedpure virtual

Implemented in edm::WorkerT< T >.

Friends And Related Function Documentation

template<typename O >
friend class workerhelper::CallImpl
friend

Definition at line 213 of file Worker.h.

Member Data Documentation

ExceptionToActionTable const* edm::Worker::actions_
private

Definition at line 440 of file Worker.h.

Referenced by shouldRethrowException().

std::shared_ptr<ActivityRegistry> edm::Worker::actReg_
private

Definition at line 443 of file Worker.h.

Referenced by beginJob(), beginStream(), endJob(), endStream(), prefetchAsync(), and setActivityRegistry().

std::exception_ptr edm::Worker::cached_exception_
private

Definition at line 441 of file Worker.h.

Referenced by skipOnPath().

edm::propagate_const<EarlyDeleteHelper*> edm::Worker::earlyDeleteHelper_
private

Definition at line 445 of file Worker.h.

Referenced by postDoEvent(), and setEarlyDeleteHelper().

ModuleCallingContext edm::Worker::moduleCallingContext_
private

Definition at line 438 of file Worker.h.

Referenced by beginStream(), endStream(), prefetchAsync(), and resetModuleDescription().

std::atomic<int> edm::Worker::numberOfPathsLeftToRun_
private

Definition at line 436 of file Worker.h.

Referenced by skipOnPath().

int edm::Worker::numberOfPathsOn_
private

Definition at line 435 of file Worker.h.

std::atomic<State> edm::Worker::state_
private

Definition at line 434 of file Worker.h.

Referenced by beginJob(), beginStream(), endJob(), and endStream().

std::atomic<int> edm::Worker::timesExcept_
private

Definition at line 433 of file Worker.h.

std::atomic<int> edm::Worker::timesFailed_
private

Definition at line 432 of file Worker.h.

std::atomic<int> edm::Worker::timesPassed_
private

Definition at line 431 of file Worker.h.

std::atomic<int> edm::Worker::timesRun_
private

Definition at line 429 of file Worker.h.

std::atomic<int> edm::Worker::timesVisited_
private

Definition at line 430 of file Worker.h.

edm::WaitingTaskList edm::Worker::waitingTasks_
private

Definition at line 447 of file Worker.h.

Referenced by skipOnPath().

std::atomic<bool> edm::Worker::workStarted_
private

Definition at line 448 of file Worker.h.