CMS 3D CMS Logo

WorkerManager.cc
Go to the documentation of this file.
3 
13 
14 #include <exception>
15 #include <functional>
16 
17 static const std::string kFilterType("EDFilter");
18 static const std::string kProducerType("EDProducer");
19 
20 namespace edm {
21  // -----------------------------
22 
23  WorkerManager::WorkerManager(std::shared_ptr<ActivityRegistry> areg,
25  ModuleTypeResolverMaker const* typeResolverMaker)
26  : workerReg_(areg, typeResolverMaker),
27  actionTable_(&actions),
28  allWorkers_(),
29  unscheduled_(*areg),
30  lastSetupEventPrincipal_(nullptr) {} // WorkerManager::WorkerManager
31 
32  WorkerManager::WorkerManager(std::shared_ptr<ModuleRegistry> modReg,
33  std::shared_ptr<ActivityRegistry> areg,
35  : workerReg_(areg, modReg),
36  actionTable_(&actions),
37  allWorkers_(),
38  unscheduled_(*areg),
39  lastSetupEventPrincipal_(nullptr) {} // WorkerManager::WorkerManager
40 
42  auto worker = workerReg_.get(moduleLabel);
43  if (worker != nullptr) {
44  auto eraseBeg = std::remove(allWorkers_.begin(), allWorkers_.end(), worker);
45  allWorkers_.erase(eraseBeg, allWorkers_.end());
46  unscheduled_.removeWorker(worker);
48  }
49  }
50 
52  ProductRegistry& preg,
53  PreallocationConfiguration const* prealloc,
54  std::shared_ptr<ProcessConfiguration const> processConfiguration,
55  std::string const& label) {
56  WorkerParams params(&pset, preg, prealloc, processConfiguration, *actionTable_);
58  }
59 
61  ProductRegistry& preg,
62  PreallocationConfiguration const* prealloc,
63  std::shared_ptr<ProcessConfiguration const> processConfiguration,
65  std::set<std::string>& unscheduledLabels,
66  std::vector<std::string>& shouldBeUsedLabels) {
67  //Need to
68  // 1) create worker
69  // 2) if it is a WorkerT<EDProducer>, add it to our list
70  auto modType = pset.getParameter<std::string>("@module_edm_type");
71  if (modType == kProducerType || modType == kFilterType) {
72  Worker* newWorker = getWorker(pset, preg, prealloc, processConfiguration, label);
73  assert(newWorker->moduleType() == Worker::kProducer || newWorker->moduleType() == Worker::kFilter);
74  unscheduledLabels.insert(label);
75  unscheduled_.addWorker(newWorker);
76  //add to list so it gets reset each new event
77  addToAllWorkers(newWorker);
78  } else {
79  shouldBeUsedLabels.push_back(label);
80  }
81  }
82 
85  ProcessBlockHelperBase const& processBlockHelperBase,
86  GlobalContext const& globalContext) {
87  std::exception_ptr exceptionPtr;
88  CMS_SA_ALLOW try {
89  auto const processBlockLookup = iRegistry.productLookup(InProcess);
90  auto const runLookup = iRegistry.productLookup(InRun);
91  auto const lumiLookup = iRegistry.productLookup(InLumi);
92  auto const eventLookup = iRegistry.productLookup(InEvent);
93  if (!allWorkers_.empty()) {
94  auto const& processName = allWorkers_[0]->description()->processName();
95  auto processBlockModuleToIndicies = processBlockLookup->indiciesForModulesInProcess(processName);
96  auto runModuleToIndicies = runLookup->indiciesForModulesInProcess(processName);
97  auto lumiModuleToIndicies = lumiLookup->indiciesForModulesInProcess(processName);
98  auto eventModuleToIndicies = eventLookup->indiciesForModulesInProcess(processName);
99  for (auto& worker : allWorkers_) {
100  worker->updateLookup(InProcess, *processBlockLookup);
101  worker->updateLookup(InRun, *runLookup);
102  worker->updateLookup(InLumi, *lumiLookup);
103  worker->updateLookup(InEvent, *eventLookup);
104  worker->updateLookup(iESIndices);
105  worker->resolvePutIndicies(InProcess, processBlockModuleToIndicies);
106  worker->resolvePutIndicies(InRun, runModuleToIndicies);
107  worker->resolvePutIndicies(InLumi, lumiModuleToIndicies);
108  worker->resolvePutIndicies(InEvent, eventModuleToIndicies);
109  worker->selectInputProcessBlocks(iRegistry, processBlockHelperBase);
110  }
111  }
112  } catch (...) {
113  exceptionPtr = std::current_exception();
114  }
115 
116  for (auto& worker : allWorkers_) {
117  CMS_SA_ALLOW try { worker->beginJob(globalContext); } catch (...) {
118  if (!exceptionPtr) {
119  exceptionPtr = std::current_exception();
120  }
121  }
122  }
123  if (exceptionPtr) {
124  std::rethrow_exception(exceptionPtr);
125  }
126  }
127 
128  void WorkerManager::endJob(ExceptionCollector& collector, GlobalContext const& globalContext) {
129  for (auto& worker : allWorkers_) {
130  try {
131  convertException::wrap([&worker, &globalContext]() { worker->endJob(globalContext); });
132  } catch (cms::Exception const& ex) {
133  collector.addException(ex);
134  }
135  }
136  }
137 
138  void WorkerManager::beginStream(StreamID streamID, StreamContext const& streamContext) {
139  std::exception_ptr exceptionPtr;
140  for (auto& worker : allWorkers_) {
141  CMS_SA_ALLOW try { worker->beginStream(streamID, streamContext); } catch (...) {
142  if (!exceptionPtr) {
143  exceptionPtr = std::current_exception();
144  }
145  }
146  }
147  if (exceptionPtr) {
148  std::rethrow_exception(exceptionPtr);
149  }
150  }
151 
153  StreamContext const& streamContext,
154  ExceptionCollector& collector,
155  std::mutex& collectorMutex) noexcept {
156  for (auto& worker : allWorkers_) {
157  CMS_SA_ALLOW try { worker->endStream(streamID, streamContext); } catch (...) {
158  std::exception_ptr exceptionPtr = std::current_exception();
159  std::lock_guard<std::mutex> collectorLock(collectorMutex);
160  collector.call([&exceptionPtr]() { std::rethrow_exception(exceptionPtr); });
161  }
162  }
163  }
164 
165  void WorkerManager::resetAll() { for_all(allWorkers_, std::bind(&Worker::reset, std::placeholders::_1)); }
166 
168  if (!search_all(allWorkers_, w)) {
169  allWorkers_.push_back(w);
170  }
171  }
172 
174  this->resetAll();
175  if (&ep != lastSetupEventPrincipal_) {
177  ep.setupUnscheduled(config);
179  }
180  }
181 
184  }
185 
186 } // namespace edm
void addException(cms::Exception const &exception)
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
roAction_t actions[nactions]
Definition: GenABIO.cc:181
void removeWorker(Worker const *worker)
static const std::string kFilterType("EDFilter")
T w() const
UnscheduledCallProducer unscheduled_
void beginJob(ProductRegistry const &iRegistry, eventsetup::ESRecordsToProductResolverIndices const &, ProcessBlockHelperBase const &, GlobalContext const &)
static std::mutex mutex
Definition: Proxy.cc:8
AllWorkers allWorkers_
void setupOnDemandSystem(EventTransitionInfo const &)
Definition: config.py:1
void deleteModuleIfExists(std::string const &moduleLabel)
assert(be >=bs)
void setEventTransitionInfo(EventTransitionInfo const &info)
UnscheduledAuxiliary const & auxiliary() const
Worker const * get(std::string const &moduleLabel) const
ExceptionToActionTable const * actionTable_
Definition: WorkerManager.h:99
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
void const * lastSetupEventPrincipal_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
char const * label
void endJob(ExceptionCollector &, GlobalContext const &)
void reset()
Definition: Worker.h:190
WorkerManager(std::shared_ptr< ActivityRegistry > actReg, ExceptionToActionTable const &actions, ModuleTypeResolverMaker const *typeResolverMaker)
void deleteModule(std::string const &moduleLabel)
Deletes the module of the Worker, but the Worker continues to exist.
virtual Types moduleType() const =0
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
std::shared_ptr< ProductResolverIndexHelper const > productLookup(BranchType branchType) const
static const std::string kProducerType("EDProducer")
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:36
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:233
HLT enums.
void setupResolvers(Principal &principal)
WorkerRegistry workerReg_
Definition: WorkerManager.h:98
dictionary config
Read in AllInOne config in JSON format.
Definition: DiMuonV_cfg.py:30
Worker * getWorker(WorkerParams const &p, std::string const &moduleLabel)
Retrieve the particular instance of the worker.
auto wrap(F iFunc) -> decltype(iFunc())
void endStream(StreamID, StreamContext const &, ExceptionCollector &, std::mutex &collectorMutex) noexcept
void beginStream(StreamID, StreamContext const &)
void addToAllWorkers(Worker *w)