CMS 3D CMS Logo

List of all members | Public Types | Public Member Functions | Private Attributes
edm::WorkerManager Class Reference

#include <WorkerManager.h>

Public Types

typedef std::vector< Worker * > AllWorkers
 

Public Member Functions

ExceptionToActionTable const & actionTable () const
 
void addToAllWorkers (Worker *w)
 
void addToUnscheduledWorkers (ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
 
AllWorkers const & allWorkers () const
 
void beginJob (ProductRegistry const &iRegistry, eventsetup::ESRecordsToProductResolverIndices const &, ProcessBlockHelperBase const &)
 
void beginStream (StreamID iID, StreamContext &streamContext)
 
void deleteModuleIfExists (std::string const &moduleLabel)
 
void endJob ()
 
void endJob (ExceptionCollector &collector)
 
void endStream (StreamID iID, StreamContext &streamContext)
 
WorkergetWorker (ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
 
template<typename T >
void processAccumulatorsAsync (WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
 
template<typename T , typename U >
void processOneOccurrenceAsync (WaitingTaskHolder, typename T::TransitionInfoType &, ServiceToken const &, StreamID, typename T::Context const *topContext, U const *context) noexcept
 
void resetAll ()
 
void setupOnDemandSystem (EventTransitionInfo const &)
 
void setupResolvers (Principal &principal)
 
AllWorkers const & unscheduledWorkers () const
 
 WorkerManager (std::shared_ptr< ActivityRegistry > actReg, ExceptionToActionTable const &actions, ModuleTypeResolverMaker const *typeResolverMaker)
 
 WorkerManager (WorkerManager &&)=default
 
 WorkerManager (std::shared_ptr< ModuleRegistry > modReg, std::shared_ptr< ActivityRegistry > actReg, ExceptionToActionTable const &actions)
 

Private Attributes

ExceptionToActionTable const * actionTable_
 
AllWorkers allWorkers_
 
void const * lastSetupEventPrincipal_
 
UnscheduledCallProducer unscheduled_
 
WorkerRegistry workerReg_
 

Detailed Description

Definition at line 29 of file WorkerManager.h.

Member Typedef Documentation

◆ AllWorkers

typedef std::vector<Worker*> edm::WorkerManager::AllWorkers

Definition at line 31 of file WorkerManager.h.

Constructor & Destructor Documentation

◆ WorkerManager() [1/3]

edm::WorkerManager::WorkerManager ( std::shared_ptr< ActivityRegistry actReg,
ExceptionToActionTable const &  actions,
ModuleTypeResolverMaker const *  typeResolverMaker 
)

Definition at line 21 of file WorkerManager.cc.

24  : workerReg_(areg, typeResolverMaker),
26  allWorkers_(),
27  unscheduled_(*areg),
28  lastSetupEventPrincipal_(nullptr) {} // WorkerManager::WorkerManager
roAction_t actions[nactions]
Definition: GenABIO.cc:181
UnscheduledCallProducer unscheduled_
Definition: WorkerManager.h:99
AllWorkers allWorkers_
Definition: WorkerManager.h:98
ExceptionToActionTable const * actionTable_
Definition: WorkerManager.h:97
void const * lastSetupEventPrincipal_
WorkerRegistry workerReg_
Definition: WorkerManager.h:96

◆ WorkerManager() [2/3]

edm::WorkerManager::WorkerManager ( WorkerManager &&  )
default

◆ WorkerManager() [3/3]

edm::WorkerManager::WorkerManager ( std::shared_ptr< ModuleRegistry modReg,
std::shared_ptr< ActivityRegistry actReg,
ExceptionToActionTable const &  actions 
)

Definition at line 30 of file WorkerManager.cc.

33  : workerReg_(areg, modReg),
35  allWorkers_(),
36  unscheduled_(*areg),
37  lastSetupEventPrincipal_(nullptr) {} // WorkerManager::WorkerManager
roAction_t actions[nactions]
Definition: GenABIO.cc:181
UnscheduledCallProducer unscheduled_
Definition: WorkerManager.h:99
AllWorkers allWorkers_
Definition: WorkerManager.h:98
ExceptionToActionTable const * actionTable_
Definition: WorkerManager.h:97
void const * lastSetupEventPrincipal_
WorkerRegistry workerReg_
Definition: WorkerManager.h:96

Member Function Documentation

◆ actionTable()

ExceptionToActionTable const& edm::WorkerManager::actionTable ( ) const
inline

Definition at line 85 of file WorkerManager.h.

References actionTable_.

Referenced by edm::StreamSchedule::actionTable().

85 { return *actionTable_; }
ExceptionToActionTable const * actionTable_
Definition: WorkerManager.h:97

◆ addToAllWorkers()

void edm::WorkerManager::addToAllWorkers ( Worker w)

Definition at line 141 of file WorkerManager.cc.

References allWorkers_, edm::search_all(), and w().

Referenced by edm::StreamSchedule::addToAllWorkers(), addToUnscheduledWorkers(), and edm::StreamSchedule::StreamSchedule().

141  {
142  if (!search_all(allWorkers_, w)) {
143  allWorkers_.push_back(w);
144  }
145  }
T w() const
AllWorkers allWorkers_
Definition: WorkerManager.h:98
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:36

◆ addToUnscheduledWorkers()

void edm::WorkerManager::addToUnscheduledWorkers ( ParameterSet pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
std::string  label,
std::set< std::string > &  unscheduledLabels,
std::vector< std::string > &  shouldBeUsedLabels 
)

Definition at line 58 of file WorkerManager.cc.

References addToAllWorkers(), edm::UnscheduledCallProducer::addWorker(), cms::cuda::assert(), getWorker(), edm::Worker::kFilter, kFilterType(), edm::Worker::kProducer, kProducerType(), label, edm::Worker::moduleType(), muonDTDigis_cfi::pset, AlCaHLTBitMon_QueryRunRegistry::string, and unscheduled_.

Referenced by edm::SecondaryEventProvider::SecondaryEventProvider(), and edm::StreamSchedule::StreamSchedule().

64  {
65  //Need to
66  // 1) create worker
67  // 2) if it is a WorkerT<EDProducer>, add it to our list
68  auto modType = pset.getParameter<std::string>("@module_edm_type");
69  if (modType == kProducerType || modType == kFilterType) {
70  Worker* newWorker = getWorker(pset, preg, prealloc, processConfiguration, label);
71  assert(newWorker->moduleType() == Worker::kProducer || newWorker->moduleType() == Worker::kFilter);
72  unscheduledLabels.insert(label);
73  unscheduled_.addWorker(newWorker);
74  //add to list so it gets reset each new event
75  addToAllWorkers(newWorker);
76  } else {
77  shouldBeUsedLabels.push_back(label);
78  }
79  }
static const std::string kFilterType("EDFilter")
UnscheduledCallProducer unscheduled_
Definition: WorkerManager.h:99
assert(be >=bs)
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
char const * label
static const std::string kProducerType("EDProducer")
void addToAllWorkers(Worker *w)

◆ allWorkers()

AllWorkers const& edm::WorkerManager::allWorkers ( ) const
inline

◆ beginJob()

void edm::WorkerManager::beginJob ( ProductRegistry const &  iRegistry,
eventsetup::ESRecordsToProductResolverIndices const &  iESIndices,
ProcessBlockHelperBase const &  processBlockHelperBase 
)

Definition at line 97 of file WorkerManager.cc.

References allWorkers_, edm::Worker::beginJob(), edm::for_all(), edm::InEvent, edm::InLumi, edm::InProcess, edm::InRun, SimL1EmulatorRepack_CalouGT_cff::processName, and edm::ProductRegistry::productLookup().

Referenced by edm::SecondaryEventProvider::beginJob().

99  {
100  auto const processBlockLookup = iRegistry.productLookup(InProcess);
101  auto const runLookup = iRegistry.productLookup(InRun);
102  auto const lumiLookup = iRegistry.productLookup(InLumi);
103  auto const eventLookup = iRegistry.productLookup(InEvent);
104  if (!allWorkers_.empty()) {
105  auto const& processName = allWorkers_[0]->description()->processName();
106  auto processBlockModuleToIndicies = processBlockLookup->indiciesForModulesInProcess(processName);
107  auto runModuleToIndicies = runLookup->indiciesForModulesInProcess(processName);
108  auto lumiModuleToIndicies = lumiLookup->indiciesForModulesInProcess(processName);
109  auto eventModuleToIndicies = eventLookup->indiciesForModulesInProcess(processName);
110  for (auto& worker : allWorkers_) {
111  worker->updateLookup(InProcess, *processBlockLookup);
112  worker->updateLookup(InRun, *runLookup);
113  worker->updateLookup(InLumi, *lumiLookup);
114  worker->updateLookup(InEvent, *eventLookup);
115  worker->updateLookup(iESIndices);
116  worker->resolvePutIndicies(InProcess, processBlockModuleToIndicies);
117  worker->resolvePutIndicies(InRun, runModuleToIndicies);
118  worker->resolvePutIndicies(InLumi, lumiModuleToIndicies);
119  worker->resolvePutIndicies(InEvent, eventModuleToIndicies);
120  worker->selectInputProcessBlocks(iRegistry, processBlockHelperBase);
121  }
122 
123  for_all(allWorkers_, std::bind(&Worker::beginJob, std::placeholders::_1));
124  }
125  }
AllWorkers allWorkers_
Definition: WorkerManager.h:98
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
void beginJob()
Definition: Worker.cc:300

◆ beginStream()

void edm::WorkerManager::beginStream ( StreamID  iID,
StreamContext streamContext 
)

Definition at line 127 of file WorkerManager.cc.

References allWorkers_.

Referenced by edm::SecondaryEventProvider::beginStream(), and edm::StreamSchedule::beginStream().

127  {
128  for (auto& worker : allWorkers_) {
129  worker->beginStream(iID, streamContext);
130  }
131  }
AllWorkers allWorkers_
Definition: WorkerManager.h:98

◆ deleteModuleIfExists()

void edm::WorkerManager::deleteModuleIfExists ( std::string const &  moduleLabel)

Definition at line 39 of file WorkerManager.cc.

References allWorkers_, edm::WorkerRegistry::deleteModule(), edm::WorkerRegistry::get(), HerwigMaxPtPartonFilter_cfi::moduleLabel, MatrixUtil::remove(), edm::UnscheduledCallProducer::removeWorker(), unscheduled_, and workerReg_.

Referenced by edm::StreamSchedule::deleteModule().

39  {
40  auto worker = workerReg_.get(moduleLabel);
41  if (worker != nullptr) {
42  auto eraseBeg = std::remove(allWorkers_.begin(), allWorkers_.end(), worker);
43  allWorkers_.erase(eraseBeg, allWorkers_.end());
44  unscheduled_.removeWorker(worker);
46  }
47  }
void removeWorker(Worker const *worker)
UnscheduledCallProducer unscheduled_
Definition: WorkerManager.h:99
AllWorkers allWorkers_
Definition: WorkerManager.h:98
Worker const * get(std::string const &moduleLabel) const
void deleteModule(std::string const &moduleLabel)
Deletes the module of the Worker, but the Worker continues to exist.
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:223
WorkerRegistry workerReg_
Definition: WorkerManager.h:96

◆ endJob() [1/2]

void edm::WorkerManager::endJob ( void  )

Definition at line 81 of file WorkerManager.cc.

References allWorkers_.

Referenced by edm::SecondaryEventProvider::endJob().

81  {
82  for (auto& worker : allWorkers_) {
83  worker->endJob();
84  }
85  }
AllWorkers allWorkers_
Definition: WorkerManager.h:98

◆ endJob() [2/2]

void edm::WorkerManager::endJob ( ExceptionCollector collector)

Definition at line 87 of file WorkerManager.cc.

References edm::ExceptionCollector::addException(), allWorkers_, and edm::convertException::wrap().

87  {
88  for (auto& worker : allWorkers_) {
89  try {
90  convertException::wrap([&]() { worker->endJob(); });
91  } catch (cms::Exception const& ex) {
92  collector.addException(ex);
93  }
94  }
95  }
AllWorkers allWorkers_
Definition: WorkerManager.h:98
auto wrap(F iFunc) -> decltype(iFunc())

◆ endStream()

void edm::WorkerManager::endStream ( StreamID  iID,
StreamContext streamContext 
)

Definition at line 133 of file WorkerManager.cc.

References allWorkers_.

Referenced by edm::SecondaryEventProvider::endStream(), and edm::StreamSchedule::endStream().

133  {
134  for (auto& worker : allWorkers_) {
135  worker->endStream(iID, streamContext);
136  }
137  }
AllWorkers allWorkers_
Definition: WorkerManager.h:98

◆ getWorker()

Worker * edm::WorkerManager::getWorker ( ParameterSet pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
std::string const &  label 
)

Definition at line 49 of file WorkerManager.cc.

References actionTable_, edm::WorkerRegistry::getWorker(), label, submitPVValidationJobs::params, muonDTDigis_cfi::pset, and workerReg_.

Referenced by addToUnscheduledWorkers().

53  {
54  WorkerParams params(&pset, preg, prealloc, processConfiguration, *actionTable_);
56  }
ExceptionToActionTable const * actionTable_
Definition: WorkerManager.h:97
char const * label
WorkerRegistry workerReg_
Definition: WorkerManager.h:96
Worker * getWorker(WorkerParams const &p, std::string const &moduleLabel)
Retrieve the particular instance of the worker.

◆ processAccumulatorsAsync()

template<typename T >
void edm::WorkerManager::processAccumulatorsAsync ( WaitingTaskHolder  task,
typename T::TransitionInfoType const &  info,
ServiceToken const &  token,
StreamID  streamID,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

Definition at line 133 of file WorkerManager.h.

References visDQMUpload::context, info(), eostools::move(), edm::UnscheduledCallProducer::runAccumulatorsAsync(), TrackValidation_cff::task, unpackBuffers-CaloStage2::token, and unscheduled_.

Referenced by edm::StreamSchedule::processOneEventAsync().

138  {
139  unscheduled_.runAccumulatorsAsync<T>(std::move(task), info, token, streamID, parentContext, context);
140  }
static const TGPicture * info(bool iBackgroundIsBlack)
UnscheduledCallProducer unscheduled_
Definition: WorkerManager.h:99
void runAccumulatorsAsync(WaitingTaskHolder task, typename T::TransitionInfoType const &info, ServiceToken const &token, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context) noexcept
long double T
def move(src, dest)
Definition: eostools.py:511

◆ processOneOccurrenceAsync()

template<typename T , typename U >
void edm::WorkerManager::processOneOccurrenceAsync ( WaitingTaskHolder  task,
typename T::TransitionInfoType &  info,
ServiceToken const &  token,
StreamID  streamID,
typename T::Context const *  topContext,
U const *  context 
)
noexcept

Definition at line 104 of file WorkerManager.h.

References visDQMUpload::context, edm::Worker::doWorkNoPrefetchingAsync(), info(), ALPAKA_ACCELERATOR_NAMESPACE::vertexFinder::it, TrackValidation_cff::task, and unpackBuffers-CaloStage2::token.

Referenced by edm::StreamSchedule::processOneStreamAsync().

109  {
110  static_assert(!T::isEvent_);
111 
112  // Spawn them in reverse order. At least in the single threaded case that makes
113  // them run in forward order (and more likely to with multiple threads).
114  for (auto it = allWorkers_.rbegin(), itEnd = allWorkers_.rend(); it != itEnd; ++it) {
115  Worker* worker = *it;
116 
117  ParentContext parentContext(context);
118 
119  // We do not need to run prefetching here because this only handles
120  // stream begin/end transitions for runs and lumis. There are no products
121  // put into the runs or lumis in stream transitions, so there can be
122  // no data dependencies which require prefetching. Prefetching is
123  // needed for global transitions, but they are run elsewhere.
124  // (One exception, the SecondaryEventProvider (used for mixing) sends
125  // global begin/end run/lumi transitions through here. They shouldn't
126  // need prefetching either and for some years nothing has been using
127  // that part of the code anyway...)
128  worker->doWorkNoPrefetchingAsync<T>(task, info, token, streamID, parentContext, topContext);
129  }
130  }
static const TGPicture * info(bool iBackgroundIsBlack)
AllWorkers allWorkers_
Definition: WorkerManager.h:98
long double T

◆ resetAll()

void edm::WorkerManager::resetAll ( )

Definition at line 139 of file WorkerManager.cc.

References allWorkers_, edm::for_all(), and edm::Worker::reset().

Referenced by edm::GlobalSchedule::processOneGlobalAsync(), edm::StreamSchedule::processOneStreamAsync(), and setupResolvers().

139 { for_all(allWorkers_, std::bind(&Worker::reset, std::placeholders::_1)); }
AllWorkers allWorkers_
Definition: WorkerManager.h:98
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
void reset()
Definition: Worker.h:191

◆ setupOnDemandSystem()

void edm::WorkerManager::setupOnDemandSystem ( EventTransitionInfo const &  info)

Definition at line 156 of file WorkerManager.cc.

References info(), edm::UnscheduledCallProducer::setEventTransitionInfo(), and unscheduled_.

Referenced by edm::StreamSchedule::processOneEventAsync(), and edm::SecondaryEventProvider::setupPileUpEvent().

156  {
158  }
static const TGPicture * info(bool iBackgroundIsBlack)
UnscheduledCallProducer unscheduled_
Definition: WorkerManager.h:99
void setEventTransitionInfo(EventTransitionInfo const &info)

◆ setupResolvers()

void edm::WorkerManager::setupResolvers ( Principal principal)

◆ unscheduledWorkers()

AllWorkers const& edm::WorkerManager::unscheduledWorkers ( ) const
inline

Member Data Documentation

◆ actionTable_

ExceptionToActionTable const* edm::WorkerManager::actionTable_
private

Definition at line 97 of file WorkerManager.h.

Referenced by actionTable(), and getWorker().

◆ allWorkers_

AllWorkers edm::WorkerManager::allWorkers_
private

◆ lastSetupEventPrincipal_

void const* edm::WorkerManager::lastSetupEventPrincipal_
private

Definition at line 100 of file WorkerManager.h.

Referenced by setupResolvers().

◆ unscheduled_

UnscheduledCallProducer edm::WorkerManager::unscheduled_
private

◆ workerReg_

WorkerRegistry edm::WorkerManager::workerReg_
private

Definition at line 96 of file WorkerManager.h.

Referenced by deleteModuleIfExists(), and getWorker().