CMS 3D CMS Logo

List of all members | Public Types | Public Member Functions | Private Attributes
edm::WorkerManager Class Reference

#include <WorkerManager.h>

Public Types

typedef std::vector< Worker * > AllWorkers
 

Public Member Functions

ExceptionToActionTable const & actionTable () const
 
void addToAllWorkers (Worker *w)
 
void addToUnscheduledWorkers (ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
 
AllWorkers const & allWorkers () const
 
void beginJob (ProductRegistry const &iRegistry, eventsetup::ESRecordsToProxyIndices const &)
 
void beginStream (StreamID iID, StreamContext &streamContext)
 
void endJob ()
 
void endJob (ExceptionCollector &collector)
 
void endStream (StreamID iID, StreamContext &streamContext)
 
WorkergetWorker (ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
 
template<typename T >
void processAccumulatorsAsync (WaitingTask *task, typename T::MyPrincipal const &ep, EventSetupImpl const &es, ServiceToken const &token, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
 
template<typename T , typename U >
void processOneOccurrence (typename T::MyPrincipal &principal, EventSetupImpl const &eventSetup, StreamID streamID, typename T::Context const *topContext, U const *context, bool cleaningUpAfterException=false)
 
template<typename T , typename U >
void processOneOccurrenceAsync (WaitingTask *task, typename T::MyPrincipal &principal, EventSetupImpl const &eventSetup, ServiceToken const &token, StreamID streamID, typename T::Context const *topContext, U const *context)
 
void resetAll ()
 
void setupOnDemandSystem (Principal &principal, EventSetupImpl const &es)
 
 WorkerManager (std::shared_ptr< ActivityRegistry > actReg, ExceptionToActionTable const &actions)
 
 WorkerManager (std::shared_ptr< ModuleRegistry > modReg, std::shared_ptr< ActivityRegistry > actReg, ExceptionToActionTable const &actions)
 
 WorkerManager (WorkerManager &&)=default
 

Private Attributes

ExceptionToActionTable const * actionTable_
 
AllWorkers allWorkers_
 
void const * lastSetupEventPrincipal_
 
UnscheduledCallProducer unscheduled_
 
WorkerRegistry workerReg_
 

Detailed Description

Definition at line 33 of file WorkerManager.h.

Member Typedef Documentation

◆ AllWorkers

typedef std::vector<Worker*> edm::WorkerManager::AllWorkers

Definition at line 35 of file WorkerManager.h.

Constructor & Destructor Documentation

◆ WorkerManager() [1/3]

edm::WorkerManager::WorkerManager ( std::shared_ptr< ActivityRegistry actReg,
ExceptionToActionTable const &  actions 
)

Definition at line 17 of file WorkerManager.cc.

18  : workerReg_(areg),
20  allWorkers_(),
21  unscheduled_(*areg),
22  lastSetupEventPrincipal_(nullptr) {} // WorkerManager::WorkerManager

◆ WorkerManager() [2/3]

edm::WorkerManager::WorkerManager ( WorkerManager &&  )
default

◆ WorkerManager() [3/3]

edm::WorkerManager::WorkerManager ( std::shared_ptr< ModuleRegistry modReg,
std::shared_ptr< ActivityRegistry actReg,
ExceptionToActionTable const &  actions 
)

Definition at line 24 of file WorkerManager.cc.

27  : workerReg_(areg, modReg),
29  allWorkers_(),
30  unscheduled_(*areg),
31  lastSetupEventPrincipal_(nullptr) {} // WorkerManager::WorkerManager

Member Function Documentation

◆ actionTable()

ExceptionToActionTable const& edm::WorkerManager::actionTable ( ) const
inline

Definition at line 89 of file WorkerManager.h.

89 { return *actionTable_; }

References actionTable_.

Referenced by edm::StreamSchedule::actionTable().

◆ addToAllWorkers()

void edm::WorkerManager::addToAllWorkers ( Worker w)

Definition at line 119 of file WorkerManager.cc.

119  {
120  if (!search_all(allWorkers_, w)) {
121  allWorkers_.push_back(w);
122  }
123  }

References allWorkers_, edm::search_all(), and w.

Referenced by edm::StreamSchedule::addToAllWorkers(), and addToUnscheduledWorkers().

◆ addToUnscheduledWorkers()

void edm::WorkerManager::addToUnscheduledWorkers ( ParameterSet pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration processConfiguration,
std::string  label,
std::set< std::string > &  unscheduledLabels,
std::vector< std::string > &  shouldBeUsedLabels 
)

Definition at line 42 of file WorkerManager.cc.

48  {
49  //Need to
50  // 1) create worker
51  // 2) if it is a WorkerT<EDProducer>, add it to our list
52  auto modType = pset.getParameter<std::string>("@module_edm_type");
53  if (modType == kProducerType || modType == kFilterType) {
54  Worker* newWorker = getWorker(pset, preg, prealloc, processConfiguration, label);
55  assert(newWorker->moduleType() == Worker::kProducer || newWorker->moduleType() == Worker::kFilter);
56  unscheduledLabels.insert(label);
57  unscheduled_.addWorker(newWorker);
58  //add to list so it gets reset each new event
59  addToAllWorkers(newWorker);
60  } else {
61  shouldBeUsedLabels.push_back(label);
62  }
63  }

References addToAllWorkers(), edm::UnscheduledCallProducer::addWorker(), cms::cuda::assert(), getWorker(), edm::Worker::kFilter, kFilterType(), edm::Worker::kProducer, kProducerType(), label, edm::Worker::moduleType(), muonDTDigis_cfi::pset, AlCaHLTBitMon_QueryRunRegistry::string, and unscheduled_.

Referenced by edm::SecondaryEventProvider::SecondaryEventProvider(), and edm::StreamSchedule::StreamSchedule().

◆ allWorkers()

AllWorkers const& edm::WorkerManager::allWorkers ( ) const
inline

Definition at line 85 of file WorkerManager.h.

85 { return allWorkers_; }

References allWorkers_.

Referenced by edm::StreamSchedule::allWorkers().

◆ beginJob()

void edm::WorkerManager::beginJob ( ProductRegistry const &  iRegistry,
eventsetup::ESRecordsToProxyIndices const &  iESIndices 
)

Definition at line 81 of file WorkerManager.cc.

82  {
83  auto const runLookup = iRegistry.productLookup(InRun);
84  auto const lumiLookup = iRegistry.productLookup(InLumi);
85  auto const eventLookup = iRegistry.productLookup(InEvent);
86  if (!allWorkers_.empty()) {
87  auto const& processName = allWorkers_[0]->description().processName();
88  auto runModuleToIndicies = runLookup->indiciesForModulesInProcess(processName);
89  auto lumiModuleToIndicies = lumiLookup->indiciesForModulesInProcess(processName);
90  auto eventModuleToIndicies = eventLookup->indiciesForModulesInProcess(processName);
91  for (auto& worker : allWorkers_) {
92  worker->updateLookup(InRun, *runLookup);
93  worker->updateLookup(InLumi, *lumiLookup);
94  worker->updateLookup(InEvent, *eventLookup);
95  worker->updateLookup(iESIndices);
96  worker->resolvePutIndicies(InRun, runModuleToIndicies);
97  worker->resolvePutIndicies(InLumi, lumiModuleToIndicies);
98  worker->resolvePutIndicies(InEvent, eventModuleToIndicies);
99  }
100 
101  for_all(allWorkers_, std::bind(&Worker::beginJob, std::placeholders::_1));
102  }
103  }

References allWorkers_, edm::Worker::beginJob(), edm::for_all(), edm::InEvent, edm::InLumi, edm::InRun, SimL1EmulatorRepack_CalouGT_cff::processName, and edm::ProductRegistry::productLookup().

Referenced by edm::SecondaryEventProvider::beginJob().

◆ beginStream()

void edm::WorkerManager::beginStream ( StreamID  iID,
StreamContext streamContext 
)

Definition at line 105 of file WorkerManager.cc.

105  {
106  for (auto& worker : allWorkers_) {
107  worker->beginStream(iID, streamContext);
108  }
109  }

References allWorkers_.

Referenced by edm::SecondaryEventProvider::beginStream(), and edm::StreamSchedule::beginStream().

◆ endJob() [1/2]

void edm::WorkerManager::endJob ( void  )

Definition at line 65 of file WorkerManager.cc.

65  {
66  for (auto& worker : allWorkers_) {
67  worker->endJob();
68  }
69  }

References allWorkers_.

Referenced by edm::SecondaryEventProvider::endJob().

◆ endJob() [2/2]

void edm::WorkerManager::endJob ( ExceptionCollector collector)

Definition at line 71 of file WorkerManager.cc.

71  {
72  for (auto& worker : allWorkers_) {
73  try {
74  convertException::wrap([&]() { worker->endJob(); });
75  } catch (cms::Exception const& ex) {
76  collector.addException(ex);
77  }
78  }
79  }

References edm::ExceptionCollector::addException(), allWorkers_, and edm::convertException::wrap().

◆ endStream()

void edm::WorkerManager::endStream ( StreamID  iID,
StreamContext streamContext 
)

Definition at line 111 of file WorkerManager.cc.

111  {
112  for (auto& worker : allWorkers_) {
113  worker->endStream(iID, streamContext);
114  }
115  }

References allWorkers_.

Referenced by edm::SecondaryEventProvider::endStream(), and edm::StreamSchedule::endStream().

◆ getWorker()

Worker * edm::WorkerManager::getWorker ( ParameterSet pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
std::string const &  label 
)

Definition at line 33 of file WorkerManager.cc.

37  {
38  WorkerParams params(&pset, preg, prealloc, processConfiguration, *actionTable_);
40  }

References actionTable_, edm::WorkerRegistry::getWorker(), label, CalibrationSummaryClient_cfi::params, muonDTDigis_cfi::pset, and workerReg_.

Referenced by addToUnscheduledWorkers(), and edm::StreamSchedule::fillWorkers().

◆ processAccumulatorsAsync()

template<typename T >
void edm::WorkerManager::processAccumulatorsAsync ( WaitingTask task,
typename T::MyPrincipal const &  ep,
EventSetupImpl const &  es,
ServiceToken const &  token,
StreamID  streamID,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

◆ processOneOccurrence()

template<typename T , typename U >
void edm::WorkerManager::processOneOccurrence ( typename T::MyPrincipal &  principal,
EventSetupImpl const &  eventSetup,
StreamID  streamID,
typename T::Context const *  topContext,
U const *  context,
bool  cleaningUpAfterException = false 
)

Definition at line 108 of file WorkerManager.h.

113  {
114  this->resetAll();
115 
116  auto waitTask = make_empty_waiting_task();
117  waitTask->increment_ref_count();
118  processOneOccurrenceAsync<T, U>(
119  waitTask.get(), ep, es, ServiceRegistry::instance().presentToken(), streamID, topContext, context);
120  waitTask->wait_for_all();
121  if (waitTask->exceptionPtr() != nullptr) {
122  try {
123  convertException::wrap([&]() { std::rethrow_exception(*(waitTask->exceptionPtr())); });
124  } catch (cms::Exception& ex) {
125  if (ex.context().empty()) {
127  "Calling function WorkerManager::processOneOccurrence", ex, cleaningUpAfterException);
128  } else {
129  addContextAndPrintException("", ex, cleaningUpAfterException);
130  }
131  throw;
132  }
133  }
134  }

References edm::addContextAndPrintException(), cms::Exception::context(), SiStripBadComponentsDQMServiceTemplate_cfg::ep, edm::ServiceRegistry::instance(), edm::make_empty_waiting_task(), edm::ServiceRegistry::presentToken(), resetAll(), and edm::convertException::wrap().

Referenced by edm::SecondaryEventProvider::beginLuminosityBlock(), edm::SecondaryEventProvider::beginRun(), edm::SecondaryEventProvider::endLuminosityBlock(), and edm::SecondaryEventProvider::endRun().

◆ processOneOccurrenceAsync()

template<typename T , typename U >
void edm::WorkerManager::processOneOccurrenceAsync ( WaitingTask task,
typename T::MyPrincipal &  principal,
EventSetupImpl const &  eventSetup,
ServiceToken const &  token,
StreamID  streamID,
typename T::Context const *  topContext,
U const *  context 
)

Definition at line 137 of file WorkerManager.h.

143  {
144  //make sure the unscheduled items see this run or lumi transition
145  unscheduled_.runNowAsync<T, U>(task, ep, es, token, streamID, topContext, context);
146  }

References SiStripBadComponentsDQMServiceTemplate_cfg::ep, edm::UnscheduledCallProducer::runNowAsync(), TrackValidation_cff::task, unpackBuffers-CaloStage2::token, mitigatedMETSequence_cff::U, and unscheduled_.

Referenced by edm::StreamSchedule::processOneStreamAsync().

◆ resetAll()

void edm::WorkerManager::resetAll ( )

Definition at line 117 of file WorkerManager.cc.

117 { for_all(allWorkers_, std::bind(&Worker::reset, std::placeholders::_1)); }

References allWorkers_, edm::for_all(), and edm::Worker::reset().

Referenced by processOneOccurrence(), edm::StreamSchedule::processOneStreamAsync(), and setupOnDemandSystem().

◆ setupOnDemandSystem()

void edm::WorkerManager::setupOnDemandSystem ( Principal principal,
EventSetupImpl const &  es 
)

Member Data Documentation

◆ actionTable_

ExceptionToActionTable const* edm::WorkerManager::actionTable_
private

Definition at line 101 of file WorkerManager.h.

Referenced by actionTable(), and getWorker().

◆ allWorkers_

AllWorkers edm::WorkerManager::allWorkers_
private

◆ lastSetupEventPrincipal_

void const* edm::WorkerManager::lastSetupEventPrincipal_
private

Definition at line 104 of file WorkerManager.h.

Referenced by setupOnDemandSystem().

◆ unscheduled_

UnscheduledCallProducer edm::WorkerManager::unscheduled_
private

◆ workerReg_

WorkerRegistry edm::WorkerManager::workerReg_
private

Definition at line 100 of file WorkerManager.h.

Referenced by getWorker().

kProducerType
static const std::string kProducerType("EDProducer")
edm::WorkerManager::addToAllWorkers
void addToAllWorkers(Worker *w)
Definition: WorkerManager.cc:119
CalibrationSummaryClient_cfi.params
params
Definition: CalibrationSummaryClient_cfi.py:14
edm::WorkerManager::resetAll
void resetAll()
Definition: WorkerManager.cc:117
edm::Worker::beginJob
void beginJob()
Definition: Worker.cc:285
kFilterType
static const std::string kFilterType("EDFilter")
cms::cuda::assert
assert(be >=bs)
edm::Worker::kProducer
Definition: Worker.h:86
edm::InRun
Definition: BranchType.h:11
edm::Worker::reset
void reset()
Definition: Worker.h:180
edm::for_all
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
config
Definition: config.py:1
edm::WorkerRegistry::getWorker
Worker * getWorker(WorkerParams const &p, std::string const &moduleLabel)
Retrieve the particular instance of the worker.
Definition: WorkerRegistry.cc:28
edm::Worker::kFilter
Definition: Worker.h:86
w
const double w
Definition: UKUtility.cc:23
edm::convertException::wrap
auto wrap(F iFunc) -> decltype(iFunc())
Definition: ConvertException.h:19
edm::WorkerManager::unscheduled_
UnscheduledCallProducer unscheduled_
Definition: WorkerManager.h:103
TrackValidation_cff.task
task
Definition: TrackValidation_cff.py:252
edm::InEvent
Definition: BranchType.h:11
looper.config
config
Definition: looper.py:291
edm::WorkerManager::actionTable_
ExceptionToActionTable const * actionTable_
Definition: WorkerManager.h:101
edm::WorkerManager::workerReg_
WorkerRegistry workerReg_
Definition: WorkerManager.h:100
mitigatedMETSequence_cff.U
U
Definition: mitigatedMETSequence_cff.py:36
AlCaHLTBitMon_QueryRunRegistry.string
string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
cms::Exception::context
std::list< std::string > const & context() const
Definition: Exception.cc:147
edm::InLumi
Definition: BranchType.h:11
edm::UnscheduledCallProducer::addWorker
void addWorker(Worker *aWorker)
Definition: UnscheduledCallProducer.h:44
edm::ServiceRegistry::presentToken
ServiceToken presentToken() const
Definition: ServiceRegistry.cc:63
edm::UnscheduledCallProducer::setEventSetup
void setEventSetup(EventSetupImpl const &iSetup)
Definition: UnscheduledCallProducer.h:52
edm::addContextAndPrintException
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
Definition: ExceptionHelpers.cc:11
edm::WorkerManager::allWorkers_
AllWorkers allWorkers_
Definition: WorkerManager.h:102
edm::ServiceRegistry::instance
static ServiceRegistry & instance()
Definition: ServiceRegistry.cc:90
edm::make_empty_waiting_task
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
Definition: WaitingTaskList.h:96
SimL1EmulatorRepack_CalouGT_cff.processName
processName
Definition: SimL1EmulatorRepack_CalouGT_cff.py:17
edm::UnscheduledCallProducer::auxiliary
UnscheduledAuxiliary const & auxiliary() const
Definition: UnscheduledCallProducer.h:54
T
long double T
Definition: Basic3DVectorLD.h:48
UnscheduledConfigurator
edm::UnscheduledCallProducer::runNowAsync
void runNowAsync(WaitingTask *task, typename T::MyPrincipal &p, EventSetupImpl const &es, ServiceToken const &token, StreamID streamID, typename T::Context const *topContext, U const *context) const
Definition: UnscheduledCallProducer.h:60
edm::WorkerManager::getWorker
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
Definition: WorkerManager.cc:33
actions
roAction_t actions[nactions]
Definition: GenABIO.cc:181
edm::search_all
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:36
cms::Exception
Definition: Exception.h:70
SiStripBadComponentsDQMServiceTemplate_cfg.ep
ep
Definition: SiStripBadComponentsDQMServiceTemplate_cfg.py:86
label
const char * label
Definition: PFTauDecayModeTools.cc:11
edm::UnscheduledCallProducer::runAccumulatorsAsync
void runAccumulatorsAsync(WaitingTask *task, typename T::MyPrincipal const &ep, EventSetupImpl const &es, ServiceToken const &token, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
Definition: UnscheduledCallProducer.h:83
muonDTDigis_cfi.pset
pset
Definition: muonDTDigis_cfi.py:27
unpackBuffers-CaloStage2.token
token
Definition: unpackBuffers-CaloStage2.py:316
edm::WorkerManager::lastSetupEventPrincipal_
void const * lastSetupEventPrincipal_
Definition: WorkerManager.h:104