CMS 3D CMS Logo

List of all members | Public Types | Public Member Functions | Private Attributes
edm::WorkerManager Class Reference

#include <WorkerManager.h>

Public Types

typedef std::vector< Worker * > AllWorkers
 

Public Member Functions

ExceptionToActionTable const & actionTable () const
 
void addToAllWorkers (Worker *w)
 
void addToUnscheduledWorkers (ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
 
AllWorkers const & allWorkers () const
 
void beginJob (ProductRegistry const &iRegistry)
 
void beginStream (StreamID iID, StreamContext &streamContext)
 
void endJob ()
 
void endJob (ExceptionCollector &collector)
 
void endStream (StreamID iID, StreamContext &streamContext)
 
WorkergetWorker (ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
 
template<typename T >
void processAccumulatorsAsync (WaitingTask *task, typename T::MyPrincipal const &ep, EventSetup const &es, ServiceToken const &token, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
 
template<typename T , typename U >
void processOneOccurrence (typename T::MyPrincipal &principal, EventSetup const &eventSetup, StreamID streamID, typename T::Context const *topContext, U const *context, bool cleaningUpAfterException=false)
 
template<typename T , typename U >
void processOneOccurrenceAsync (WaitingTask *task, typename T::MyPrincipal &principal, EventSetup const &eventSetup, ServiceToken const &token, StreamID streamID, typename T::Context const *topContext, U const *context)
 
void resetAll ()
 
void setOnDemandProducts (ProductRegistry &pregistry, std::set< std::string > const &unscheduledLabels) const
 
void setupOnDemandSystem (Principal &principal, EventSetup const &es)
 
 WorkerManager (std::shared_ptr< ActivityRegistry > actReg, ExceptionToActionTable const &actions)
 
 WorkerManager (WorkerManager &&)=default
 
 WorkerManager (std::shared_ptr< ModuleRegistry > modReg, std::shared_ptr< ActivityRegistry > actReg, ExceptionToActionTable const &actions)
 

Private Attributes

ExceptionToActionTable const * actionTable_
 
AllWorkers allWorkers_
 
void const * lastSetupEventPrincipal_
 
UnscheduledCallProducer unscheduled_
 
WorkerRegistry workerReg_
 

Detailed Description

Definition at line 31 of file WorkerManager.h.

Member Typedef Documentation

typedef std::vector<Worker*> edm::WorkerManager::AllWorkers

Definition at line 33 of file WorkerManager.h.

Constructor & Destructor Documentation

edm::WorkerManager::WorkerManager ( std::shared_ptr< ActivityRegistry actReg,
ExceptionToActionTable const &  actions 
)

Definition at line 17 of file WorkerManager.cc.

17  :
18  workerReg_(areg),
20  allWorkers_(),
21  unscheduled_(*areg),
23  {
24 
25  } // WorkerManager::WorkerManager
roAction_t actions[nactions]
Definition: GenABIO.cc:187
UnscheduledCallProducer unscheduled_
AllWorkers allWorkers_
ExceptionToActionTable const * actionTable_
void const * lastSetupEventPrincipal_
WorkerRegistry workerReg_
edm::WorkerManager::WorkerManager ( WorkerManager &&  )
default
edm::WorkerManager::WorkerManager ( std::shared_ptr< ModuleRegistry modReg,
std::shared_ptr< ActivityRegistry actReg,
ExceptionToActionTable const &  actions 
)

Definition at line 27 of file WorkerManager.cc.

29  :
30  workerReg_(areg,modReg),
32  allWorkers_(),
33  unscheduled_(*areg),
35  {
36  } // WorkerManager::WorkerManager
roAction_t actions[nactions]
Definition: GenABIO.cc:187
UnscheduledCallProducer unscheduled_
AllWorkers allWorkers_
ExceptionToActionTable const * actionTable_
void const * lastSetupEventPrincipal_
WorkerRegistry workerReg_

Member Function Documentation

ExceptionToActionTable const& edm::WorkerManager::actionTable ( ) const
inline

Definition at line 90 of file WorkerManager.h.

References actionTable_, getWorker(), label, muonDTDigis_cfi::pset, resetAll(), and AlCaHLTBitMon_QueryRunRegistry::string.

90 {return *actionTable_;}
ExceptionToActionTable const * actionTable_
void edm::WorkerManager::addToAllWorkers ( Worker w)

Definition at line 142 of file WorkerManager.cc.

References allWorkers_, and edm::search_all().

Referenced by edm::StreamSchedule::addToAllWorkers(), addToUnscheduledWorkers(), and allWorkers().

142  {
143  if(!search_all(allWorkers_, w)) {
144  allWorkers_.push_back(w);
145  }
146  }
const double w
Definition: UKUtility.cc:23
AllWorkers allWorkers_
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:46
void edm::WorkerManager::addToUnscheduledWorkers ( ParameterSet pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration processConfiguration,
std::string  label,
std::set< std::string > &  unscheduledLabels,
std::vector< std::string > &  shouldBeUsedLabels 
)

Definition at line 47 of file WorkerManager.cc.

References addToAllWorkers(), edm::UnscheduledCallProducer::addWorker(), edm::ParameterSet::getParameter(), getWorker(), edm::Worker::kFilter, kFilterType(), edm::Worker::kProducer, kProducerType(), edm::Worker::moduleType(), AlCaHLTBitMon_QueryRunRegistry::string, and unscheduled_.

Referenced by edm::SecondaryEventProvider::SecondaryEventProvider(), and edm::StreamSchedule::StreamSchedule().

53  {
54  //Need to
55  // 1) create worker
56  // 2) if it is a WorkerT<EDProducer>, add it to our list
57  auto modType = pset.getParameter<std::string>("@module_edm_type");
58  if(modType == kProducerType || modType == kFilterType) {
59  Worker* newWorker = getWorker(pset, preg, prealloc, processConfiguration, label);
60  assert(newWorker->moduleType() == Worker::kProducer || newWorker->moduleType() == Worker::kFilter);
61  unscheduledLabels.insert(label);
62  unscheduled_.addWorker(newWorker);
63  //add to list so it gets reset each new event
64  addToAllWorkers(newWorker);
65  } else {
66  shouldBeUsedLabels.push_back(label);
67  }
68  }
static const std::string kFilterType("EDFilter")
UnscheduledCallProducer unscheduled_
char const * label
static const std::string kProducerType("EDProducer")
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
void addToAllWorkers(Worker *w)
AllWorkers const& edm::WorkerManager::allWorkers ( ) const
inline

Definition at line 86 of file WorkerManager.h.

References addToAllWorkers(), allWorkers_, and w.

86 {return allWorkers_;}
AllWorkers allWorkers_
void edm::WorkerManager::beginJob ( ProductRegistry const &  iRegistry)

Definition at line 100 of file WorkerManager.cc.

References allWorkers_, edm::Worker::beginJob(), edm::for_all(), edm::InEvent, edm::InLumi, edm::InRun, modifiedElectrons_cfi::processName, and edm::ProductRegistry::productLookup().

Referenced by edm::SecondaryEventProvider::beginJob().

100  {
101  auto const runLookup = iRegistry.productLookup(InRun);
102  auto const lumiLookup = iRegistry.productLookup(InLumi);
103  auto const eventLookup = iRegistry.productLookup(InEvent);
104  if(!allWorkers_.empty()) {
105  auto const& processName = allWorkers_[0]->description().processName();
106  auto runModuleToIndicies = runLookup->indiciesForModulesInProcess(processName);
107  auto lumiModuleToIndicies = lumiLookup->indiciesForModulesInProcess(processName);
108  auto eventModuleToIndicies = eventLookup->indiciesForModulesInProcess(processName);
109  for(auto& worker : allWorkers_) {
110  worker->updateLookup(InRun,*runLookup);
111  worker->updateLookup(InLumi,*lumiLookup);
112  worker->updateLookup(InEvent,*eventLookup);
113  worker->resolvePutIndicies(InRun,runModuleToIndicies);
114  worker->resolvePutIndicies(InLumi,lumiModuleToIndicies);
115  worker->resolvePutIndicies(InEvent,eventModuleToIndicies);
116  }
117 
118  for_all(allWorkers_, std::bind(&Worker::beginJob, std::placeholders::_1));
119  }
120  }
AllWorkers allWorkers_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void beginJob()
Definition: Worker.cc:289
void edm::WorkerManager::beginStream ( StreamID  iID,
StreamContext streamContext 
)

Definition at line 123 of file WorkerManager.cc.

References allWorkers_.

Referenced by edm::SecondaryEventProvider::beginStream(), and edm::StreamSchedule::beginStream().

123  {
124  for(auto& worker: allWorkers_) {
125  worker->beginStream(iID, streamContext);
126  }
127  }
AllWorkers allWorkers_
void edm::WorkerManager::endJob ( void  )

Definition at line 80 of file WorkerManager.cc.

References allWorkers_.

Referenced by edm::SecondaryEventProvider::endJob().

80  {
81  for(auto& worker : allWorkers_) {
82  worker->endJob();
83  }
84  }
AllWorkers allWorkers_
void edm::WorkerManager::endJob ( ExceptionCollector collector)

Definition at line 86 of file WorkerManager.cc.

References edm::ExceptionCollector::addException(), allWorkers_, and edm::convertException::wrap().

86  {
87  for(auto& worker : allWorkers_) {
88  try {
90  worker->endJob();
91  });
92  }
93  catch (cms::Exception const& ex) {
94  collector.addException(ex);
95  }
96  }
97  }
AllWorkers allWorkers_
auto wrap(F iFunc) -> decltype(iFunc())
void edm::WorkerManager::endStream ( StreamID  iID,
StreamContext streamContext 
)

Definition at line 130 of file WorkerManager.cc.

References allWorkers_.

Referenced by edm::SecondaryEventProvider::endStream(), and edm::StreamSchedule::endStream().

130  {
131  for(auto& worker: allWorkers_) {
132  worker->endStream(iID, streamContext);
133  }
134  }
AllWorkers allWorkers_
Worker * edm::WorkerManager::getWorker ( ParameterSet pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
std::string const &  label 
)

Definition at line 38 of file WorkerManager.cc.

References actionTable_, edm::WorkerRegistry::getWorker(), and workerReg_.

Referenced by actionTable(), addToUnscheduledWorkers(), and edm::StreamSchedule::fillWorkers().

42  {
43  WorkerParams params(&pset, preg, prealloc, processConfiguration, *actionTable_);
44  return workerReg_.getWorker(params, label);
45  }
ExceptionToActionTable const * actionTable_
char const * label
WorkerRegistry workerReg_
Worker * getWorker(WorkerParams const &p, std::string const &moduleLabel)
Retrieve the particular instance of the worker.
template<typename T >
void edm::WorkerManager::processAccumulatorsAsync ( WaitingTask task,
typename T::MyPrincipal const &  ep,
EventSetup const &  es,
ServiceToken const &  token,
StreamID  streamID,
ParentContext const &  parentContext,
typename T::Context const *  context 
)

Definition at line 154 of file WorkerManager.h.

References edm::UnscheduledCallProducer::runAccumulatorsAsync(), TrackValidation_cff::task, and unscheduled_.

Referenced by edm::StreamSchedule::processOneEventAsync().

160  {
161  unscheduled_.runAccumulatorsAsync<T>(task, ep, es, token, streamID, parentContext, context);
162  }
UnscheduledCallProducer unscheduled_
void runAccumulatorsAsync(WaitingTask *task, typename T::MyPrincipal const &ep, EventSetup const &es, ServiceToken const &token, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
long double T
template<typename T , typename U >
void edm::WorkerManager::processOneOccurrence ( typename T::MyPrincipal &  principal,
EventSetup const &  eventSetup,
StreamID  streamID,
typename T::Context const *  topContext,
U const *  context,
bool  cleaningUpAfterException = false 
)

Definition at line 111 of file WorkerManager.h.

References edm::addContextAndPrintException(), cms::Exception::context(), edm::ServiceRegistry::instance(), edm::make_empty_waiting_task(), edm::ServiceRegistry::presentToken(), resetAll(), and edm::convertException::wrap().

Referenced by edm::SecondaryEventProvider::beginLuminosityBlock(), edm::SecondaryEventProvider::beginRun(), edm::SecondaryEventProvider::endLuminosityBlock(), and edm::SecondaryEventProvider::endRun().

116  {
117  this->resetAll();
118 
119  auto waitTask = make_empty_waiting_task();
120  waitTask->increment_ref_count();
121  processOneOccurrenceAsync<T,U>(waitTask.get(), ep, es, ServiceRegistry::instance().presentToken(), streamID, topContext, context);
122  waitTask->wait_for_all();
123  if(waitTask->exceptionPtr() != nullptr) {
124  try{
125  convertException::wrap([&]() {
126  std::rethrow_exception(* (waitTask->exceptionPtr()) );
127  });
128  } catch(cms::Exception& ex) {
129  if (ex.context().empty()) {
130  addContextAndPrintException("Calling function WorkerManager::processOneOccurrence", ex, cleaningUpAfterException);
131  } else {
132  addContextAndPrintException("", ex, cleaningUpAfterException);
133  }
134  throw;
135  }
136  }
137  }
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
ServiceToken presentToken() const
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
std::list< std::string > const & context() const
Definition: Exception.cc:191
static ServiceRegistry & instance()
auto wrap(F iFunc) -> decltype(iFunc())
template<typename T , typename U >
void edm::WorkerManager::processOneOccurrenceAsync ( WaitingTask task,
typename T::MyPrincipal &  principal,
EventSetup const &  eventSetup,
ServiceToken const &  token,
StreamID  streamID,
typename T::Context const *  topContext,
U const *  context 
)

Definition at line 141 of file WorkerManager.h.

References edm::UnscheduledCallProducer::runNowAsync(), TrackValidation_cff::task, mitigatedMETSequence_cff::U, and unscheduled_.

147  {
148  //make sure the unscheduled items see this run or lumi transition
149  unscheduled_.runNowAsync<T,U>(task,ep, es, token, streamID, topContext, context);
150  }
UnscheduledCallProducer unscheduled_
void runNowAsync(WaitingTask *task, typename T::MyPrincipal &p, EventSetup const &es, ServiceToken const &token, StreamID streamID, typename T::Context const *topContext, U const *context) const
long double T
void edm::WorkerManager::resetAll ( )

Definition at line 137 of file WorkerManager.cc.

References allWorkers_, edm::for_all(), and edm::Worker::reset().

Referenced by actionTable(), processOneOccurrence(), and setupOnDemandSystem().

137  {
138  for_all(allWorkers_, std::bind(&Worker::reset, std::placeholders::_1));
139  }
AllWorkers allWorkers_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void reset()
Definition: Worker.h:178
void edm::WorkerManager::setOnDemandProducts ( ProductRegistry pregistry,
std::set< std::string > const &  unscheduledLabels 
) const

Definition at line 70 of file WorkerManager.cc.

References edm::InEvent, parseEventContent::prod, and edm::ProductRegistry::productListUpdator().

Referenced by edm::SecondaryEventProvider::SecondaryEventProvider(), and edm::StreamSchedule::StreamSchedule().

70  {
71  for(auto& prod : pregistry.productListUpdator()) {
72  if(prod.second.produced() &&
73  prod.second.branchType() == InEvent &&
74  unscheduledLabels.end() != unscheduledLabels.find(prod.second.moduleLabel())) {
75  prod.second.setOnDemand(true);
76  }
77  }
78  }
void edm::WorkerManager::setupOnDemandSystem ( Principal principal,
EventSetup const &  es 
)

Member Data Documentation

ExceptionToActionTable const* edm::WorkerManager::actionTable_
private

Definition at line 103 of file WorkerManager.h.

Referenced by actionTable(), and getWorker().

AllWorkers edm::WorkerManager::allWorkers_
private
void const* edm::WorkerManager::lastSetupEventPrincipal_
private

Definition at line 106 of file WorkerManager.h.

Referenced by setupOnDemandSystem().

UnscheduledCallProducer edm::WorkerManager::unscheduled_
private
WorkerRegistry edm::WorkerManager::workerReg_
private

Definition at line 102 of file WorkerManager.h.

Referenced by getWorker().