1 #ifndef FWCore_Framework_GlobalSchedule_h 2 #define FWCore_Framework_GlobalSchedule_h 38 #include "boost/range/adaptor/reversed.hpp" 42 class ExceptionCollector;
43 class PreallocationConfiguration;
45 class TriggerResultInserter;
46 class PathStatusInserter;
47 class EndPathStatusInserter;
59 std::shared_ptr<ModuleRegistry> modReg,
60 std::vector<std::string>
const& modulesToUse,
65 std::shared_ptr<ActivityRegistry> areg,
66 std::shared_ptr<ProcessConfiguration const> processConfiguration,
72 typename T::TransitionInfoType&,
74 bool cleaningUpAfterException =
false);
107 template <
typename T>
110 template <
typename T>
115 bool cleaningUpAfterException,
116 std::exception_ptr&);
132 template <
typename T>
134 typename T::TransitionInfoType& transitionInfo,
136 bool cleaningUpAfterException) {
137 auto const& principal = transitionInfo.principal();
142 auto globalContext = std::make_shared<GlobalContext>(T::makeGlobalContext(principal,
processContext_));
146 [
this, iHolder, cleaningUpAfterException, globalContext, weakToken](std::exception_ptr
const* iPtr)
mutable {
147 std::exception_ptr excpt;
151 handleException(globalContext.get(), weakToken, cleaningUpAfterException, excpt);
153 postScheduleSignal<T>(globalContext.get(), weakToken, excpt);
161 preScheduleSignal<T>(globalContext.get(),
token);
163 unsigned int managerIndex = principal.index();
179 worker->doWorkAsync<
T>(
183 holdForLoop.doneWaiting(std::current_exception());
190 template <
typename T>
197 exceptionContext(ex, *globalContext,
"Handling pre signal, likely in a service function");
203 template <
typename T>
206 std::exception_ptr& excpt) {
211 T::postScheduleSignal(
actReg_.get(), globalContext);
215 exceptionContext(ex, *globalContext,
"Handling post signal, likely in a service function");
216 excpt = std::current_exception();
std::vector< std::string > vstring
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
std::vector< WorkerManager > workerManagers_
roAction_t actions[nactions]
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
std::vector< Worker * > AllWorkers
ExceptionToActionTable const & actionTable() const
returns the action table
void deleteModule(std::string const &iLabel)
Delete the module with label iLabel.
static StreamID invalidStreamID()
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
oneapi::tbb::task_group * group() const noexcept
bool terminate() const
Return whether each output module has reached its maximum count.
void preScheduleSignal(GlobalContext const *, ServiceToken const &)
std::vector< Worker * > Workers
GlobalSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry > modReg, std::vector< std::string > const &modulesToUse, ParameterSet &proc_pset, ProductRegistry &pregistry, PreallocationConfiguration const &prealloc, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, ProcessContext const *processContext)
std::shared_ptr< Worker > WorkerPtr
AllWorkers const & allWorkers() const
FunctorWaitingTask< F > * make_waiting_task(F f)
void endJob(ExceptionCollector &collector)
void postScheduleSignal(GlobalContext const *, ServiceWeakToken const &, std::exception_ptr &)
ServiceToken lock() const
void doneWaiting(std::exception_ptr iExcept) noexcept
std::shared_ptr< ActivityRegistry > actReg_
void beginJob(ProductRegistry const &, eventsetup::ESRecordsToProductResolverIndices const &, ProcessBlockHelperBase const &, PathsAndConsumesOfModulesBase const &, ProcessContext const &)
unsigned int numberOfConcurrentRuns_
void handleException(GlobalContext const *, ServiceWeakToken const &, bool cleaningUpAfterException, std::exception_ptr &)
void setupResolvers(Principal &principal)
void processOneGlobalAsync(WaitingTaskHolder holder, typename T::TransitionInfoType &, ServiceToken const &token, bool cleaningUpAfterException=false)
static constexpr unsigned int numberOfConcurrentProcessBlocks_
auto wrap(F iFunc) -> decltype(iFunc())
ProcessContext const * processContext_
unsigned int numberOfConcurrentLumis_
static constexpr unsigned int numberOfConcurrentJobs_
std::vector< edm::propagate_const< WorkerPtr > > extraWorkers_