CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
WorkerManager.cc
Go to the documentation of this file.
3 
10 
11 static const std::string kFilterType("EDFilter");
12 static const std::string kProducerType("EDProducer");
13 
14 namespace edm {
15  // -----------------------------
16 
17  WorkerManager::WorkerManager(std::shared_ptr<ActivityRegistry> areg, ExceptionToActionTable const& actions)
18  : workerReg_(areg),
19  actionTable_(&actions),
20  allWorkers_(),
21  unscheduled_(*areg),
22  lastSetupEventPrincipal_(nullptr) {} // WorkerManager::WorkerManager
23 
24  WorkerManager::WorkerManager(std::shared_ptr<ModuleRegistry> modReg,
25  std::shared_ptr<ActivityRegistry> areg,
27  : workerReg_(areg, modReg),
28  actionTable_(&actions),
29  allWorkers_(),
30  unscheduled_(*areg),
31  lastSetupEventPrincipal_(nullptr) {} // WorkerManager::WorkerManager
32 
34  auto worker = workerReg_.get(moduleLabel);
35  if (worker != nullptr) {
36  auto eraseBeg = std::remove(allWorkers_.begin(), allWorkers_.end(), worker);
37  allWorkers_.erase(eraseBeg, allWorkers_.end());
38  unscheduled_.removeWorker(worker);
39  workerReg_.deleteModule(moduleLabel);
40  }
41  }
42 
46  std::shared_ptr<ProcessConfiguration const> processConfiguration,
47  std::string const& label) {
48  WorkerParams params(&pset, preg, prealloc, processConfiguration, *actionTable_);
49  return workerReg_.getWorker(params, label);
50  }
51 
55  std::shared_ptr<ProcessConfiguration> processConfiguration,
57  std::set<std::string>& unscheduledLabels,
58  std::vector<std::string>& shouldBeUsedLabels) {
59  //Need to
60  // 1) create worker
61  // 2) if it is a WorkerT<EDProducer>, add it to our list
62  auto modType = pset.getParameter<std::string>("@module_edm_type");
63  if (modType == kProducerType || modType == kFilterType) {
64  Worker* newWorker = getWorker(pset, preg, prealloc, processConfiguration, label);
65  assert(newWorker->moduleType() == Worker::kProducer || newWorker->moduleType() == Worker::kFilter);
66  unscheduledLabels.insert(label);
67  unscheduled_.addWorker(newWorker);
68  //add to list so it gets reset each new event
69  addToAllWorkers(newWorker);
70  } else {
71  shouldBeUsedLabels.push_back(label);
72  }
73  }
74 
76  for (auto& worker : allWorkers_) {
77  worker->endJob();
78  }
79  }
80 
82  for (auto& worker : allWorkers_) {
83  try {
84  convertException::wrap([&]() { worker->endJob(); });
85  } catch (cms::Exception const& ex) {
86  collector.addException(ex);
87  }
88  }
89  }
90 
92  eventsetup::ESRecordsToProxyIndices const& iESIndices,
93  ProcessBlockHelperBase const& processBlockHelperBase) {
94  auto const processBlockLookup = iRegistry.productLookup(InProcess);
95  auto const runLookup = iRegistry.productLookup(InRun);
96  auto const lumiLookup = iRegistry.productLookup(InLumi);
97  auto const eventLookup = iRegistry.productLookup(InEvent);
98  if (!allWorkers_.empty()) {
99  auto const& processName = allWorkers_[0]->description()->processName();
100  auto processBlockModuleToIndicies = processBlockLookup->indiciesForModulesInProcess(processName);
101  auto runModuleToIndicies = runLookup->indiciesForModulesInProcess(processName);
102  auto lumiModuleToIndicies = lumiLookup->indiciesForModulesInProcess(processName);
103  auto eventModuleToIndicies = eventLookup->indiciesForModulesInProcess(processName);
104  for (auto& worker : allWorkers_) {
105  worker->updateLookup(InProcess, *processBlockLookup);
106  worker->updateLookup(InRun, *runLookup);
107  worker->updateLookup(InLumi, *lumiLookup);
108  worker->updateLookup(InEvent, *eventLookup);
109  worker->updateLookup(iESIndices);
110  worker->resolvePutIndicies(InProcess, processBlockModuleToIndicies);
111  worker->resolvePutIndicies(InRun, runModuleToIndicies);
112  worker->resolvePutIndicies(InLumi, lumiModuleToIndicies);
113  worker->resolvePutIndicies(InEvent, eventModuleToIndicies);
114  worker->selectInputProcessBlocks(iRegistry, processBlockHelperBase);
115  }
116 
117  for_all(allWorkers_, std::bind(&Worker::beginJob, std::placeholders::_1));
118  }
119  }
120 
122  for (auto& worker : allWorkers_) {
123  worker->beginStream(iID, streamContext);
124  }
125  }
126 
127  void WorkerManager::endStream(StreamID iID, StreamContext& streamContext) {
128  for (auto& worker : allWorkers_) {
129  worker->endStream(iID, streamContext);
130  }
131  }
132 
133  void WorkerManager::resetAll() { for_all(allWorkers_, std::bind(&Worker::reset, std::placeholders::_1)); }
134 
136  if (!search_all(allWorkers_, w)) {
137  allWorkers_.push_back(w);
138  }
139  }
140 
142  this->resetAll();
143  if (&ep != lastSetupEventPrincipal_) {
145  ep.setupUnscheduled(config);
147  }
148  }
149 
152  }
153 
154 } // namespace edm
void addException(cms::Exception const &exception)
static const TGPicture * info(bool iBackgroundIsBlack)
void setupUnscheduled(UnscheduledConfigurator const &)
Definition: Principal.cc:398
void removeWorker(Worker const *worker)
void endStream(StreamID iID, StreamContext &streamContext)
const double w
Definition: UKUtility.cc:23
static const std::string kFilterType("EDFilter")
UnscheduledCallProducer unscheduled_
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
AllWorkers allWorkers_
void setupOnDemandSystem(EventTransitionInfo const &)
processConfiguration
Definition: Schedule.cc:687
void deleteModuleIfExists(std::string const &moduleLabel)
void beginJob(ProductRegistry const &iRegistry, eventsetup::ESRecordsToProxyIndices const &, ProcessBlockHelperBase const &)
assert(be >=bs)
actions
Definition: Schedule.cc:687
void setEventTransitionInfo(EventTransitionInfo const &info)
ExceptionToActionTable const * actionTable_
void const * lastSetupEventPrincipal_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
char const * label
void reset()
Definition: Worker.h:178
void deleteModule(std::string const &moduleLabel)
Deletes the module of the Worker, but the Worker continues to exist.
areg
Definition: Schedule.cc:687
virtual Types moduleType() const =0
void beginStream(StreamID iID, StreamContext &streamContext)
static const std::string kProducerType("EDProducer")
UnscheduledAuxiliary const & auxiliary() const
T getParameter(std::string const &) const
Definition: ParameterSet.h:303
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:36
Worker const * get(std::string const &moduleLabel) const
std::shared_ptr< ProductResolverIndexHelper const > productLookup(BranchType branchType) const
tuple config
parse the configuration file
void setupResolvers(Principal &principal)
void beginJob()
Definition: Worker.cc:295
WorkerRegistry workerReg_
Worker * getWorker(WorkerParams const &p, std::string const &moduleLabel)
Retrieve the particular instance of the worker.
auto wrap(F iFunc) -> decltype(iFunc())
WorkerManager(std::shared_ptr< ActivityRegistry > actReg, ExceptionToActionTable const &actions)
preg
Definition: Schedule.cc:687
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
prealloc
Definition: Schedule.cc:687
void addToAllWorkers(Worker *w)