26 : workerReg_(areg, typeResolverMaker),
30 lastSetupEventPrincipal_(nullptr) {}
33 std::shared_ptr<ActivityRegistry> areg,
35 : workerReg_(areg, modReg),
39 lastSetupEventPrincipal_(nullptr) {}
43 if (worker !=
nullptr) {
54 std::shared_ptr<ProcessConfiguration const> processConfiguration,
63 std::shared_ptr<ProcessConfiguration const> processConfiguration,
65 std::set<std::string>& unscheduledLabels,
66 std::vector<std::string>& shouldBeUsedLabels) {
74 unscheduledLabels.insert(
label);
79 shouldBeUsedLabels.push_back(
label);
87 std::exception_ptr exceptionPtr;
95 auto processBlockModuleToIndicies = processBlockLookup->indiciesForModulesInProcess(
processName);
96 auto runModuleToIndicies = runLookup->indiciesForModulesInProcess(
processName);
97 auto lumiModuleToIndicies = lumiLookup->indiciesForModulesInProcess(
processName);
98 auto eventModuleToIndicies = eventLookup->indiciesForModulesInProcess(
processName);
100 worker->updateLookup(
InProcess, *processBlockLookup);
101 worker->updateLookup(
InRun, *runLookup);
102 worker->updateLookup(
InLumi, *lumiLookup);
103 worker->updateLookup(
InEvent, *eventLookup);
104 worker->updateLookup(iESIndices);
105 worker->resolvePutIndicies(
InProcess, processBlockModuleToIndicies);
106 worker->resolvePutIndicies(
InRun, runModuleToIndicies);
107 worker->resolvePutIndicies(
InLumi, lumiModuleToIndicies);
108 worker->resolvePutIndicies(
InEvent, eventModuleToIndicies);
109 worker->selectInputProcessBlocks(iRegistry, processBlockHelperBase);
113 exceptionPtr = std::current_exception();
117 CMS_SA_ALLOW try { worker->beginJob(globalContext); }
catch (...) {
119 exceptionPtr = std::current_exception();
124 std::rethrow_exception(exceptionPtr);
139 std::exception_ptr exceptionPtr;
141 CMS_SA_ALLOW try { worker->beginStream(streamID, streamContext); }
catch (...) {
143 exceptionPtr = std::current_exception();
148 std::rethrow_exception(exceptionPtr);
156 for (
auto& worker : allWorkers_) {
157 CMS_SA_ALLOW try { worker->endStream(streamID, streamContext); }
catch (...) {
158 std::exception_ptr exceptionPtr = std::current_exception();
159 std::lock_guard<std::mutex> collectorLock(collectorMutex);
160 collector.call([&exceptionPtr]() { std::rethrow_exception(exceptionPtr); });
void addException(cms::Exception const &exception)
roAction_t actions[nactions]
void removeWorker(Worker const *worker)
static const std::string kFilterType("EDFilter")
UnscheduledCallProducer unscheduled_
void beginJob(ProductRegistry const &iRegistry, eventsetup::ESRecordsToProductResolverIndices const &, ProcessBlockHelperBase const &, GlobalContext const &)
void setupOnDemandSystem(EventTransitionInfo const &)
void deleteModuleIfExists(std::string const &moduleLabel)
void setEventTransitionInfo(EventTransitionInfo const &info)
UnscheduledAuxiliary const & auxiliary() const
Worker const * get(std::string const &moduleLabel) const
ExceptionToActionTable const * actionTable_
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
void const * lastSetupEventPrincipal_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
void addWorker(Worker *aWorker)
void endJob(ExceptionCollector &, GlobalContext const &)
WorkerManager(std::shared_ptr< ActivityRegistry > actReg, ExceptionToActionTable const &actions, ModuleTypeResolverMaker const *typeResolverMaker)
void deleteModule(std::string const &moduleLabel)
Deletes the module of the Worker, but the Worker continues to exist.
virtual Types moduleType() const =0
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
std::shared_ptr< ProductResolverIndexHelper const > productLookup(BranchType branchType) const
static const std::string kProducerType("EDProducer")
bool search_all(ForwardSequence const &s, Datum const &d)
def remove(d, key, TELL=False)
void setupResolvers(Principal &principal)
WorkerRegistry workerReg_
dictionary config
Read in AllInOne config in JSON format.
Worker * getWorker(WorkerParams const &p, std::string const &moduleLabel)
Retrieve the particular instance of the worker.
auto wrap(F iFunc) -> decltype(iFunc())
void endStream(StreamID, StreamContext const &, ExceptionCollector &, std::mutex &collectorMutex) noexcept
void beginStream(StreamID, StreamContext const &)
void addToAllWorkers(Worker *w)