CMS 3D CMS Logo

WorkerManager.cc
Go to the documentation of this file.
3 
10 
11 static const std::string kFilterType("EDFilter");
12 static const std::string kProducerType("EDProducer");
13 
14 namespace edm {
15  // -----------------------------
16 
17  WorkerManager::WorkerManager(std::shared_ptr<ActivityRegistry> areg,
19  ModuleTypeResolverMaker const* typeResolverMaker)
20  : workerReg_(areg, typeResolverMaker),
21  actionTable_(&actions),
22  allWorkers_(),
23  unscheduled_(*areg),
24  lastSetupEventPrincipal_(nullptr) {} // WorkerManager::WorkerManager
25 
26  WorkerManager::WorkerManager(std::shared_ptr<ModuleRegistry> modReg,
27  std::shared_ptr<ActivityRegistry> areg,
29  : workerReg_(areg, modReg),
30  actionTable_(&actions),
31  allWorkers_(),
32  unscheduled_(*areg),
33  lastSetupEventPrincipal_(nullptr) {} // WorkerManager::WorkerManager
34 
36  auto worker = workerReg_.get(moduleLabel);
37  if (worker != nullptr) {
38  auto eraseBeg = std::remove(allWorkers_.begin(), allWorkers_.end(), worker);
39  allWorkers_.erase(eraseBeg, allWorkers_.end());
40  unscheduled_.removeWorker(worker);
42  }
43  }
44 
46  ProductRegistry& preg,
47  PreallocationConfiguration const* prealloc,
48  std::shared_ptr<ProcessConfiguration const> processConfiguration,
49  std::string const& label) {
50  WorkerParams params(&pset, preg, prealloc, processConfiguration, *actionTable_);
52  }
53 
55  ProductRegistry& preg,
56  PreallocationConfiguration const* prealloc,
57  std::shared_ptr<ProcessConfiguration const> processConfiguration,
59  std::set<std::string>& unscheduledLabels,
60  std::vector<std::string>& shouldBeUsedLabels) {
61  //Need to
62  // 1) create worker
63  // 2) if it is a WorkerT<EDProducer>, add it to our list
64  auto modType = pset.getParameter<std::string>("@module_edm_type");
65  if (modType == kProducerType || modType == kFilterType) {
66  Worker* newWorker = getWorker(pset, preg, prealloc, processConfiguration, label);
67  assert(newWorker->moduleType() == Worker::kProducer || newWorker->moduleType() == Worker::kFilter);
68  unscheduledLabels.insert(label);
69  unscheduled_.addWorker(newWorker);
70  //add to list so it gets reset each new event
71  addToAllWorkers(newWorker);
72  } else {
73  shouldBeUsedLabels.push_back(label);
74  }
75  }
76 
78  for (auto& worker : allWorkers_) {
79  worker->endJob();
80  }
81  }
82 
84  for (auto& worker : allWorkers_) {
85  try {
86  convertException::wrap([&]() { worker->endJob(); });
87  } catch (cms::Exception const& ex) {
88  collector.addException(ex);
89  }
90  }
91  }
92 
95  ProcessBlockHelperBase const& processBlockHelperBase) {
96  auto const processBlockLookup = iRegistry.productLookup(InProcess);
97  auto const runLookup = iRegistry.productLookup(InRun);
98  auto const lumiLookup = iRegistry.productLookup(InLumi);
99  auto const eventLookup = iRegistry.productLookup(InEvent);
100  if (!allWorkers_.empty()) {
101  auto const& processName = allWorkers_[0]->description()->processName();
102  auto processBlockModuleToIndicies = processBlockLookup->indiciesForModulesInProcess(processName);
103  auto runModuleToIndicies = runLookup->indiciesForModulesInProcess(processName);
104  auto lumiModuleToIndicies = lumiLookup->indiciesForModulesInProcess(processName);
105  auto eventModuleToIndicies = eventLookup->indiciesForModulesInProcess(processName);
106  for (auto& worker : allWorkers_) {
107  worker->updateLookup(InProcess, *processBlockLookup);
108  worker->updateLookup(InRun, *runLookup);
109  worker->updateLookup(InLumi, *lumiLookup);
110  worker->updateLookup(InEvent, *eventLookup);
111  worker->updateLookup(iESIndices);
112  worker->resolvePutIndicies(InProcess, processBlockModuleToIndicies);
113  worker->resolvePutIndicies(InRun, runModuleToIndicies);
114  worker->resolvePutIndicies(InLumi, lumiModuleToIndicies);
115  worker->resolvePutIndicies(InEvent, eventModuleToIndicies);
116  worker->selectInputProcessBlocks(iRegistry, processBlockHelperBase);
117  }
118 
119  for_all(allWorkers_, std::bind(&Worker::beginJob, std::placeholders::_1));
120  }
121  }
122 
124  for (auto& worker : allWorkers_) {
125  worker->beginStream(iID, streamContext);
126  }
127  }
128 
129  void WorkerManager::endStream(StreamID iID, StreamContext& streamContext) {
130  for (auto& worker : allWorkers_) {
131  worker->endStream(iID, streamContext);
132  }
133  }
134 
135  void WorkerManager::resetAll() { for_all(allWorkers_, std::bind(&Worker::reset, std::placeholders::_1)); }
136 
138  if (!search_all(allWorkers_, w)) {
139  allWorkers_.push_back(w);
140  }
141  }
142 
144  this->resetAll();
145  if (&ep != lastSetupEventPrincipal_) {
147  ep.setupUnscheduled(config);
149  }
150  }
151 
154  }
155 
156 } // namespace edm
void addException(cms::Exception const &exception)
static const TGPicture * info(bool iBackgroundIsBlack)
roAction_t actions[nactions]
Definition: GenABIO.cc:181
void removeWorker(Worker const *worker)
void endStream(StreamID iID, StreamContext &streamContext)
static const std::string kFilterType("EDFilter")
T w() const
void beginJob(ProductRegistry const &iRegistry, eventsetup::ESRecordsToProductResolverIndices const &, ProcessBlockHelperBase const &)
UnscheduledCallProducer unscheduled_
AllWorkers allWorkers_
dictionary config
Read in AllInOne config in JSON format.
Definition: DMR_cfg.py:21
void setupOnDemandSystem(EventTransitionInfo const &)
Definition: config.py:1
void deleteModuleIfExists(std::string const &moduleLabel)
assert(be >=bs)
void setEventTransitionInfo(EventTransitionInfo const &info)
UnscheduledAuxiliary const & auxiliary() const
Worker const * get(std::string const &moduleLabel) const
ExceptionToActionTable const * actionTable_
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
void const * lastSetupEventPrincipal_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
char const * label
void reset()
Definition: Worker.h:188
WorkerManager(std::shared_ptr< ActivityRegistry > actReg, ExceptionToActionTable const &actions, ModuleTypeResolverMaker const *typeResolverMaker)
void deleteModule(std::string const &moduleLabel)
Deletes the module of the Worker, but the Worker continues to exist.
virtual Types moduleType() const =0
void beginStream(StreamID iID, StreamContext &streamContext)
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
std::shared_ptr< ProductResolverIndexHelper const > productLookup(BranchType branchType) const
static const std::string kProducerType("EDProducer")
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:36
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:223
HLT enums.
void setupResolvers(Principal &principal)
void beginJob()
Definition: Worker.cc:280
WorkerRegistry workerReg_
Worker * getWorker(WorkerParams const &p, std::string const &moduleLabel)
Retrieve the particular instance of the worker.
auto wrap(F iFunc) -> decltype(iFunc())
void addToAllWorkers(Worker *w)