CMS 3D CMS Logo

List of all members | Public Types | Public Member Functions | Private Attributes
edm::WorkerManager Class Reference

#include <WorkerManager.h>

Public Types

typedef std::vector< Worker * > AllWorkers
 

Public Member Functions

ExceptionToActionTable const & actionTable () const
 
void addToAllWorkers (Worker *w)
 
void addToUnscheduledWorkers (ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
 
AllWorkers const & allWorkers () const
 
void beginJob (ProductRegistry const &iRegistry, eventsetup::ESRecordsToProductResolverIndices const &, ProcessBlockHelperBase const &, GlobalContext const &)
 
void beginStream (StreamID, StreamContext const &)
 
void deleteModuleIfExists (std::string const &moduleLabel)
 
void endJob (ExceptionCollector &, GlobalContext const &)
 
void endStream (StreamID, StreamContext const &, ExceptionCollector &, std::mutex &collectorMutex) noexcept
 
WorkergetWorker (ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
 
template<typename T >
void processAccumulatorsAsync (WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
 
template<typename T , typename U >
void processOneOccurrenceAsync (WaitingTaskHolder, typename T::TransitionInfoType &, ServiceToken const &, StreamID, typename T::Context const *topContext, U const *context) noexcept
 
void resetAll ()
 
void setupOnDemandSystem (EventTransitionInfo const &)
 
void setupResolvers (Principal &principal)
 
AllWorkers const & unscheduledWorkers () const
 
 WorkerManager (std::shared_ptr< ActivityRegistry > actReg, ExceptionToActionTable const &actions, ModuleTypeResolverMaker const *typeResolverMaker)
 
 WorkerManager (WorkerManager &&)=default
 
 WorkerManager (std::shared_ptr< ModuleRegistry > modReg, std::shared_ptr< ActivityRegistry > actReg, ExceptionToActionTable const &actions)
 

Private Attributes

ExceptionToActionTable const * actionTable_
 
AllWorkers allWorkers_
 
void const * lastSetupEventPrincipal_
 
UnscheduledCallProducer unscheduled_
 
WorkerRegistry workerReg_
 

Detailed Description

Definition at line 31 of file WorkerManager.h.

Member Typedef Documentation

◆ AllWorkers

typedef std::vector<Worker*> edm::WorkerManager::AllWorkers

Definition at line 33 of file WorkerManager.h.

Constructor & Destructor Documentation

◆ WorkerManager() [1/3]

edm::WorkerManager::WorkerManager ( std::shared_ptr< ActivityRegistry actReg,
ExceptionToActionTable const &  actions,
ModuleTypeResolverMaker const *  typeResolverMaker 
)

Definition at line 23 of file WorkerManager.cc.

26  : workerReg_(areg, typeResolverMaker),
28  allWorkers_(),
29  unscheduled_(*areg),
30  lastSetupEventPrincipal_(nullptr) {} // WorkerManager::WorkerManager
roAction_t actions[nactions]
Definition: GenABIO.cc:181
UnscheduledCallProducer unscheduled_
AllWorkers allWorkers_
ExceptionToActionTable const * actionTable_
Definition: WorkerManager.h:99
void const * lastSetupEventPrincipal_
WorkerRegistry workerReg_
Definition: WorkerManager.h:98

◆ WorkerManager() [2/3]

edm::WorkerManager::WorkerManager ( WorkerManager &&  )
default

◆ WorkerManager() [3/3]

edm::WorkerManager::WorkerManager ( std::shared_ptr< ModuleRegistry modReg,
std::shared_ptr< ActivityRegistry actReg,
ExceptionToActionTable const &  actions 
)

Definition at line 32 of file WorkerManager.cc.

35  : workerReg_(areg, modReg),
37  allWorkers_(),
38  unscheduled_(*areg),
39  lastSetupEventPrincipal_(nullptr) {} // WorkerManager::WorkerManager
roAction_t actions[nactions]
Definition: GenABIO.cc:181
UnscheduledCallProducer unscheduled_
AllWorkers allWorkers_
ExceptionToActionTable const * actionTable_
Definition: WorkerManager.h:99
void const * lastSetupEventPrincipal_
WorkerRegistry workerReg_
Definition: WorkerManager.h:98

Member Function Documentation

◆ actionTable()

ExceptionToActionTable const& edm::WorkerManager::actionTable ( ) const
inline

Definition at line 87 of file WorkerManager.h.

References actionTable_.

Referenced by edm::StreamSchedule::actionTable().

87 { return *actionTable_; }
ExceptionToActionTable const * actionTable_
Definition: WorkerManager.h:99

◆ addToAllWorkers()

void edm::WorkerManager::addToAllWorkers ( Worker w)

Definition at line 167 of file WorkerManager.cc.

References allWorkers_, edm::search_all(), and w().

Referenced by edm::StreamSchedule::addToAllWorkers(), addToUnscheduledWorkers(), and edm::StreamSchedule::StreamSchedule().

167  {
168  if (!search_all(allWorkers_, w)) {
169  allWorkers_.push_back(w);
170  }
171  }
T w() const
AllWorkers allWorkers_
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:36

◆ addToUnscheduledWorkers()

void edm::WorkerManager::addToUnscheduledWorkers ( ParameterSet pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
std::string  label,
std::set< std::string > &  unscheduledLabels,
std::vector< std::string > &  shouldBeUsedLabels 
)

Definition at line 60 of file WorkerManager.cc.

References addToAllWorkers(), edm::UnscheduledCallProducer::addWorker(), cms::cuda::assert(), getWorker(), edm::Worker::kFilter, kFilterType(), edm::Worker::kProducer, kProducerType(), label, edm::Worker::moduleType(), muonDTDigis_cfi::pset, AlCaHLTBitMon_QueryRunRegistry::string, and unscheduled_.

Referenced by edm::SecondaryEventProvider::SecondaryEventProvider(), and edm::StreamSchedule::StreamSchedule().

66  {
67  //Need to
68  // 1) create worker
69  // 2) if it is a WorkerT<EDProducer>, add it to our list
70  auto modType = pset.getParameter<std::string>("@module_edm_type");
71  if (modType == kProducerType || modType == kFilterType) {
72  Worker* newWorker = getWorker(pset, preg, prealloc, processConfiguration, label);
73  assert(newWorker->moduleType() == Worker::kProducer || newWorker->moduleType() == Worker::kFilter);
74  unscheduledLabels.insert(label);
75  unscheduled_.addWorker(newWorker);
76  //add to list so it gets reset each new event
77  addToAllWorkers(newWorker);
78  } else {
79  shouldBeUsedLabels.push_back(label);
80  }
81  }
static const std::string kFilterType("EDFilter")
UnscheduledCallProducer unscheduled_
assert(be >=bs)
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
char const * label
static const std::string kProducerType("EDProducer")
void addToAllWorkers(Worker *w)

◆ allWorkers()

AllWorkers const& edm::WorkerManager::allWorkers ( ) const
inline

◆ beginJob()

void edm::WorkerManager::beginJob ( ProductRegistry const &  iRegistry,
eventsetup::ESRecordsToProductResolverIndices const &  iESIndices,
ProcessBlockHelperBase const &  processBlockHelperBase,
GlobalContext const &  globalContext 
)

Definition at line 83 of file WorkerManager.cc.

References allWorkers_, CMS_SA_ALLOW, edm::InEvent, edm::InLumi, edm::InProcess, edm::InRun, SimL1EmulatorRepack_CalouGT_cff::processName, and edm::ProductRegistry::productLookup().

Referenced by edm::SecondaryEventProvider::beginJob().

86  {
87  std::exception_ptr exceptionPtr;
88  CMS_SA_ALLOW try {
89  auto const processBlockLookup = iRegistry.productLookup(InProcess);
90  auto const runLookup = iRegistry.productLookup(InRun);
91  auto const lumiLookup = iRegistry.productLookup(InLumi);
92  auto const eventLookup = iRegistry.productLookup(InEvent);
93  if (!allWorkers_.empty()) {
94  auto const& processName = allWorkers_[0]->description()->processName();
95  auto processBlockModuleToIndicies = processBlockLookup->indiciesForModulesInProcess(processName);
96  auto runModuleToIndicies = runLookup->indiciesForModulesInProcess(processName);
97  auto lumiModuleToIndicies = lumiLookup->indiciesForModulesInProcess(processName);
98  auto eventModuleToIndicies = eventLookup->indiciesForModulesInProcess(processName);
99  for (auto& worker : allWorkers_) {
100  worker->updateLookup(InProcess, *processBlockLookup);
101  worker->updateLookup(InRun, *runLookup);
102  worker->updateLookup(InLumi, *lumiLookup);
103  worker->updateLookup(InEvent, *eventLookup);
104  worker->updateLookup(iESIndices);
105  worker->resolvePutIndicies(InProcess, processBlockModuleToIndicies);
106  worker->resolvePutIndicies(InRun, runModuleToIndicies);
107  worker->resolvePutIndicies(InLumi, lumiModuleToIndicies);
108  worker->resolvePutIndicies(InEvent, eventModuleToIndicies);
109  worker->selectInputProcessBlocks(iRegistry, processBlockHelperBase);
110  }
111  }
112  } catch (...) {
113  exceptionPtr = std::current_exception();
114  }
115 
116  for (auto& worker : allWorkers_) {
117  CMS_SA_ALLOW try { worker->beginJob(globalContext); } catch (...) {
118  if (!exceptionPtr) {
119  exceptionPtr = std::current_exception();
120  }
121  }
122  }
123  if (exceptionPtr) {
124  std::rethrow_exception(exceptionPtr);
125  }
126  }
#define CMS_SA_ALLOW
AllWorkers allWorkers_

◆ beginStream()

void edm::WorkerManager::beginStream ( StreamID  streamID,
StreamContext const &  streamContext 
)

Definition at line 138 of file WorkerManager.cc.

References allWorkers_, and CMS_SA_ALLOW.

Referenced by edm::SecondaryEventProvider::beginStream(), and edm::StreamSchedule::beginStream().

138  {
139  std::exception_ptr exceptionPtr;
140  for (auto& worker : allWorkers_) {
141  CMS_SA_ALLOW try { worker->beginStream(streamID, streamContext); } catch (...) {
142  if (!exceptionPtr) {
143  exceptionPtr = std::current_exception();
144  }
145  }
146  }
147  if (exceptionPtr) {
148  std::rethrow_exception(exceptionPtr);
149  }
150  }
#define CMS_SA_ALLOW
AllWorkers allWorkers_

◆ deleteModuleIfExists()

void edm::WorkerManager::deleteModuleIfExists ( std::string const &  moduleLabel)

Definition at line 41 of file WorkerManager.cc.

References allWorkers_, edm::WorkerRegistry::deleteModule(), edm::WorkerRegistry::get(), HerwigMaxPtPartonFilter_cfi::moduleLabel, MatrixUtil::remove(), edm::UnscheduledCallProducer::removeWorker(), unscheduled_, and workerReg_.

Referenced by edm::StreamSchedule::deleteModule().

41  {
42  auto worker = workerReg_.get(moduleLabel);
43  if (worker != nullptr) {
44  auto eraseBeg = std::remove(allWorkers_.begin(), allWorkers_.end(), worker);
45  allWorkers_.erase(eraseBeg, allWorkers_.end());
46  unscheduled_.removeWorker(worker);
48  }
49  }
void removeWorker(Worker const *worker)
UnscheduledCallProducer unscheduled_
AllWorkers allWorkers_
Worker const * get(std::string const &moduleLabel) const
void deleteModule(std::string const &moduleLabel)
Deletes the module of the Worker, but the Worker continues to exist.
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:234
WorkerRegistry workerReg_
Definition: WorkerManager.h:98

◆ endJob()

void edm::WorkerManager::endJob ( ExceptionCollector collector,
GlobalContext const &  globalContext 
)

Definition at line 128 of file WorkerManager.cc.

References edm::ExceptionCollector::addException(), allWorkers_, and edm::convertException::wrap().

Referenced by edm::SecondaryEventProvider::endJob().

128  {
129  for (auto& worker : allWorkers_) {
130  try {
131  convertException::wrap([&worker, &globalContext]() { worker->endJob(globalContext); });
132  } catch (cms::Exception const& ex) {
133  collector.addException(ex);
134  }
135  }
136  }
AllWorkers allWorkers_
auto wrap(F iFunc) -> decltype(iFunc())

◆ endStream()

void edm::WorkerManager::endStream ( StreamID  streamID,
StreamContext const &  streamContext,
ExceptionCollector collector,
std::mutex collectorMutex 
)
noexcept

Definition at line 152 of file WorkerManager.cc.

References CMS_SA_ALLOW.

Referenced by edm::SecondaryEventProvider::endStream(), and edm::StreamSchedule::endStream().

155  {
156  for (auto& worker : allWorkers_) {
157  CMS_SA_ALLOW try { worker->endStream(streamID, streamContext); } catch (...) {
158  std::exception_ptr exceptionPtr = std::current_exception();
159  std::lock_guard<std::mutex> collectorLock(collectorMutex);
160  collector.call([&exceptionPtr]() { std::rethrow_exception(exceptionPtr); });
161  }
162  }
163  }
#define CMS_SA_ALLOW
AllWorkers allWorkers_

◆ getWorker()

Worker * edm::WorkerManager::getWorker ( ParameterSet pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
std::string const &  label 
)

Definition at line 51 of file WorkerManager.cc.

References actionTable_, edm::WorkerRegistry::getWorker(), label, submitPVValidationJobs::params, muonDTDigis_cfi::pset, and workerReg_.

Referenced by addToUnscheduledWorkers().

55  {
56  WorkerParams params(&pset, preg, prealloc, processConfiguration, *actionTable_);
58  }
ExceptionToActionTable const * actionTable_
Definition: WorkerManager.h:99
char const * label
WorkerRegistry workerReg_
Definition: WorkerManager.h:98
Worker * getWorker(WorkerParams const &p, std::string const &moduleLabel)
Retrieve the particular instance of the worker.

◆ processAccumulatorsAsync()

template<typename T >
void edm::WorkerManager::processAccumulatorsAsync ( WaitingTaskHolder  task,
typename T::TransitionInfoType const &  info,
ServiceToken const &  token,
StreamID  streamID,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

Definition at line 135 of file WorkerManager.h.

References visDQMUpload::context, info(), eostools::move(), edm::UnscheduledCallProducer::runAccumulatorsAsync(), TrackValidation_cff::task, unpackBuffers-CaloStage2::token, and unscheduled_.

Referenced by edm::StreamSchedule::processOneEventAsync().

140  {
141  unscheduled_.runAccumulatorsAsync<T>(std::move(task), info, token, streamID, parentContext, context);
142  }
static const TGPicture * info(bool iBackgroundIsBlack)
UnscheduledCallProducer unscheduled_
void runAccumulatorsAsync(WaitingTaskHolder task, typename T::TransitionInfoType const &info, ServiceToken const &token, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context) noexcept
long double T
def move(src, dest)
Definition: eostools.py:511

◆ processOneOccurrenceAsync()

template<typename T , typename U >
void edm::WorkerManager::processOneOccurrenceAsync ( WaitingTaskHolder  task,
typename T::TransitionInfoType &  info,
ServiceToken const &  token,
StreamID  streamID,
typename T::Context const *  topContext,
U const *  context 
)
noexcept

Definition at line 106 of file WorkerManager.h.

References visDQMUpload::context, edm::Worker::doWorkNoPrefetchingAsync(), info(), ALPAKA_ACCELERATOR_NAMESPACE::vertexFinder::it, TrackValidation_cff::task, and unpackBuffers-CaloStage2::token.

Referenced by edm::StreamSchedule::processOneStreamAsync().

111  {
112  static_assert(!T::isEvent_);
113 
114  // Spawn them in reverse order. At least in the single threaded case that makes
115  // them run in forward order (and more likely to with multiple threads).
116  for (auto it = allWorkers_.rbegin(), itEnd = allWorkers_.rend(); it != itEnd; ++it) {
117  Worker* worker = *it;
118 
119  ParentContext parentContext(context);
120 
121  // We do not need to run prefetching here because this only handles
122  // stream begin/end transitions for runs and lumis. There are no products
123  // put into the runs or lumis in stream transitions, so there can be
124  // no data dependencies which require prefetching. Prefetching is
125  // needed for global transitions, but they are run elsewhere.
126  // (One exception, the SecondaryEventProvider (used for mixing) sends
127  // global begin/end run/lumi transitions through here. They shouldn't
128  // need prefetching either and for some years nothing has been using
129  // that part of the code anyway...)
130  worker->doWorkNoPrefetchingAsync<T>(task, info, token, streamID, parentContext, topContext);
131  }
132  }
static const TGPicture * info(bool iBackgroundIsBlack)
AllWorkers allWorkers_
long double T

◆ resetAll()

void edm::WorkerManager::resetAll ( )

Definition at line 165 of file WorkerManager.cc.

References allWorkers_, edm::for_all(), and edm::Worker::reset().

Referenced by edm::GlobalSchedule::processOneGlobalAsync(), edm::StreamSchedule::processOneStreamAsync(), and setupResolvers().

165 { for_all(allWorkers_, std::bind(&Worker::reset, std::placeholders::_1)); }
AllWorkers allWorkers_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
void reset()
Definition: Worker.h:190

◆ setupOnDemandSystem()

void edm::WorkerManager::setupOnDemandSystem ( EventTransitionInfo const &  info)

Definition at line 182 of file WorkerManager.cc.

References info(), edm::UnscheduledCallProducer::setEventTransitionInfo(), and unscheduled_.

Referenced by edm::StreamSchedule::processOneEventAsync(), and edm::SecondaryEventProvider::setupPileUpEvent().

182  {
184  }
static const TGPicture * info(bool iBackgroundIsBlack)
UnscheduledCallProducer unscheduled_
void setEventTransitionInfo(EventTransitionInfo const &info)

◆ setupResolvers()

void edm::WorkerManager::setupResolvers ( Principal principal)

◆ unscheduledWorkers()

AllWorkers const& edm::WorkerManager::unscheduledWorkers ( ) const
inline

Member Data Documentation

◆ actionTable_

ExceptionToActionTable const* edm::WorkerManager::actionTable_
private

Definition at line 99 of file WorkerManager.h.

Referenced by actionTable(), and getWorker().

◆ allWorkers_

AllWorkers edm::WorkerManager::allWorkers_
private

◆ lastSetupEventPrincipal_

void const* edm::WorkerManager::lastSetupEventPrincipal_
private

Definition at line 102 of file WorkerManager.h.

Referenced by setupResolvers().

◆ unscheduled_

UnscheduledCallProducer edm::WorkerManager::unscheduled_
private

◆ workerReg_

WorkerRegistry edm::WorkerManager::workerReg_
private

Definition at line 98 of file WorkerManager.h.

Referenced by deleteModuleIfExists(), and getWorker().