19 actionTable_(&actions),
22 lastSetupEventPrincipal_(
nullptr)
28 std::shared_ptr<ActivityRegistry> areg,
41 std::shared_ptr<ProcessConfiguration const> processConfiguration,
50 std::shared_ptr<ProcessConfiguration> processConfiguration,
52 std::set<std::string>& unscheduledLabels,
53 std::vector<std::string>& shouldBeUsedLabels) {
59 Worker* newWorker =
getWorker(pset, preg, prealloc, processConfiguration, label);
61 unscheduledLabels.insert(label);
66 shouldBeUsedLabels.push_back(label);
72 if(
prod.second.produced() &&
74 unscheduledLabels.end() != unscheduledLabels.find(
prod.second.moduleLabel())) {
75 prod.second.setOnDemand(
true);
106 auto runModuleToIndicies = runLookup->indiciesForModulesInProcess(
processName);
107 auto lumiModuleToIndicies = lumiLookup->indiciesForModulesInProcess(
processName);
108 auto eventModuleToIndicies = eventLookup->indiciesForModulesInProcess(
processName);
110 worker->updateLookup(
InRun,*runLookup);
111 worker->updateLookup(
InLumi,*lumiLookup);
112 worker->updateLookup(
InEvent,*eventLookup);
113 worker->resolvePutIndicies(
InRun,runModuleToIndicies);
114 worker->resolvePutIndicies(
InLumi,lumiModuleToIndicies);
115 worker->resolvePutIndicies(
InEvent,eventModuleToIndicies);
125 worker->beginStream(iID, streamContext);
132 worker->endStream(iID, streamContext);
T getParameter(std::string const &) const
void addException(cms::Exception const &exception)
void setupUnscheduled(UnscheduledConfigurator const &)
roAction_t actions[nactions]
void endStream(StreamID iID, StreamContext &streamContext)
static const std::string kFilterType("EDFilter")
UnscheduledCallProducer unscheduled_
void setOnDemandProducts(ProductRegistry &pregistry, std::set< std::string > const &unscheduledLabels) const
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
void beginJob(ProductRegistry const &iRegistry)
ExceptionToActionTable const * actionTable_
void const * lastSetupEventPrincipal_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
void addWorker(Worker *aWorker)
void beginStream(StreamID iID, StreamContext &streamContext)
static const std::string kProducerType("EDProducer")
UnscheduledAuxiliary const & auxiliary() const
bool search_all(ForwardSequence const &s, Datum const &d)
ProductList & productListUpdator()
std::shared_ptr< ProductResolverIndexHelper const > productLookup(BranchType branchType) const
void setEventSetup(EventSetup const &iSetup)
WorkerRegistry workerReg_
Worker * getWorker(WorkerParams const &p, std::string const &moduleLabel)
Retrieve the particular instance of the worker.
virtual Types moduleType() const =0
auto wrap(F iFunc) -> decltype(iFunc())
WorkerManager(std::shared_ptr< ActivityRegistry > actReg, ExceptionToActionTable const &actions)
void setupOnDemandSystem(Principal &principal, EventSetup const &es)
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
void addToAllWorkers(Worker *w)