25 std::shared_ptr<TriggerResultInserter> inserter,
28 std::shared_ptr<ModuleRegistry> modReg,
29 std::vector<std::string>
const& iModulesToUse,
34 std::shared_ptr<ActivityRegistry> areg,
35 std::shared_ptr<ProcessConfiguration const> processConfiguration,
38 processContext_(processContext),
39 numberOfConcurrentLumis_(prealloc.numberOfLuminosityBlocks()),
44 for (
unsigned int i = 0;
i < nManagers; ++
i) {
50 if (modpset !=
nullptr) {
56 wm.addToAllWorkers(wm.getWorker(*modpset, pregistry, &prealloc, processConfiguration,
moduleLabel));
61 inserter->doPreallocate(prealloc);
64 inserter, inserter->moduleDescription(), &
actions));
65 results_inserter->setActivityRegistry(
actReg_);
66 wm.addToAllWorkers(results_inserter.get());
71 for (
auto& pathStatusInserter : pathStatusInserters) {
72 std::shared_ptr<PathStatusInserter> inserterPtr =
get_underlying(pathStatusInserter);
73 inserterPtr->doPreallocate(prealloc);
78 workerPtr->setActivityRegistry(
actReg_);
79 wm.addToAllWorkers(workerPtr.get());
84 for (
auto& endPathStatusInserter : endPathStatusInserters) {
85 std::shared_ptr<EndPathStatusInserter> inserterPtr =
get_underlying(endPathStatusInserter);
86 inserterPtr->doPreallocate(prealloc);
89 inserterPtr, inserterPtr->moduleDescription(), &
actions));
90 workerPtr->setActivityRegistry(
actReg_);
91 wm.addToAllWorkers(workerPtr.get());
104 unsigned int const managerIndex =
107 std::exception_ptr exceptionPtr;
111 actReg_->preBeginJobSignal_(pathsAndConsumesOfModules, processContext);
114 exceptionContext(ex, globalContext,
"Handling pre signal, likely in a service function");
117 workerManagers_[managerIndex].beginJob(iRegistry, iESIndices, processBlockHelperBase, globalContext);
119 exceptionPtr = std::current_exception();
126 exceptionContext(ex, globalContext,
"Handling post signal, likely in a service function");
127 exceptionPtr = std::current_exception();
131 std::rethrow_exception(exceptionPtr);
137 unsigned int const managerIndex =
140 std::exception_ptr exceptionPtr;
145 exceptionContext(ex, globalContext,
"Handling pre signal, likely in a service function");
150 exceptionPtr = std::current_exception();
157 exceptionContext(ex, globalContext,
"Handling post signal, likely in a service function");
158 exceptionPtr = std::current_exception();
162 collector.
call([&exceptionPtr]() { std::rethrow_exception(exceptionPtr); });
168 unsigned int const jobManagerIndex =
170 unsigned int managerIndex = 0;
172 for (
auto const& worker : wm.allWorkers()) {
173 if (worker->description()->moduleLabel() == iLabel) {
178 if (
nullptr ==
found) {
183 if (managerIndex == jobManagerIndex) {
185 found->beginJob(globalContext);
193 wm.deleteModuleIfExists(iLabel);
198 std::vector<ModuleDescription const*>
result;
210 bool cleaningUpAfterException,
211 std::exception_ptr& excpt) {
216 std::ostringstream ost;
224 excpt = std::current_exception();
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
std::vector< WorkerManager > workerManagers_
roAction_t actions[nactions]
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
virtual void replaceModuleFor(Worker *) const =0
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
void deleteModule(std::string const &iLabel)
Delete the module with label iLabel.
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
GlobalSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry > modReg, std::vector< std::string > const &modulesToUse, ParameterSet &proc_pset, ProductRegistry &pregistry, PreallocationConfiguration const &prealloc, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, ProcessContext const *processContext)
std::shared_ptr< Worker > WorkerPtr
void endJob(ExceptionCollector &collector)
unsigned int numberOfRuns() const
ServiceToken lock() const
std::shared_ptr< ActivityRegistry > actReg_
void beginJob(ProductRegistry const &, eventsetup::ESRecordsToProductResolverIndices const &, ProcessBlockHelperBase const &, PathsAndConsumesOfModulesBase const &, ProcessContext const &)
unsigned int numberOfLuminosityBlocks() const
unsigned int numberOfConcurrentRuns_
constexpr T & get_underlying(propagate_const< T > &)
void handleException(GlobalContext const *, ServiceWeakToken const &, bool cleaningUpAfterException, std::exception_ptr &)
static constexpr unsigned int numberOfConcurrentProcessBlocks_
auto wrap(F iFunc) -> decltype(iFunc())
ProcessContext const * processContext_
void call(std::function< void(void)>)
std::list< std::string > const & context() const
unsigned int numberOfConcurrentLumis_
static constexpr unsigned int numberOfConcurrentJobs_
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
std::vector< edm::propagate_const< WorkerPtr > > extraWorkers_