1 #ifndef FWCore_Framework_GlobalSchedule_h
2 #define FWCore_Framework_GlobalSchedule_h
37 #include "boost/range/adaptor/reversed.hpp"
43 class GlobalScheduleSignalSentry {
46 : a_(a), context_(context), allowThrow_(
false) {
48 T::preScheduleSignal(a_, context_);
50 ~GlobalScheduleSignalSentry() noexcept(
false) {
54 T::postScheduleSignal(a_, context_);
62 void allowThrow() { allowThrow_ =
true; }
67 typename T::Context
const* context_;
73 class ExceptionCollector;
75 class PreallocationConfiguration;
77 class TriggerResultInserter;
78 class PathStatusInserter;
79 class EndPathStatusInserter;
91 std::shared_ptr<ModuleRegistry> modReg,
92 std::vector<std::string>
const& modulesToUse,
97 std::shared_ptr<ActivityRegistry>
areg,
102 template <
typename T>
104 typename T::TransitionInfoType&,
106 bool cleaningUpAfterException =
false);
161 template <
typename T>
163 typename T::TransitionInfoType& transitionInfo,
165 bool cleaningUpAfterException) {
166 auto const& principal = transitionInfo.principal();
171 auto globalContext = std::make_shared<GlobalContext>(T::makeGlobalContext(principal,
processContext_));
176 T::preScheduleSignal(
actReg_.get(), globalContext.get());
181 [
this, iHolder, cleaningUpAfterException, globalContext, weakToken](std::exception_ptr
const* iPtr)
mutable {
182 std::exception_ptr excpt;
190 std::ostringstream ost;
192 ost <<
"Processing " << T::transitionName() <<
" ";
196 excpt = std::current_exception();
207 T::postScheduleSignal(
actReg_.get(), globalContext.get());
210 excpt = std::current_exception();
217 workerManager.resetAll();
222 workerManager.setupResolvers(transitionInfo.principal());
226 auto& aw = workerManager.allWorkers();
227 for (
Worker* worker : boost::adaptors::reverse(aw)) {
228 worker->doWorkAsync<
T>(
ServiceToken lock() const
std::vector< std::string > vstring
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
std::vector< WorkerManager > workerManagers_
std::vector< Worker * > AllWorkers
void deleteModule(std::string const &iLabel)
Delete the module with label iLabel.
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
edm::ActivityRegistry * reg_
static StreamID invalidStreamID()
bool terminate() const
Return whether each output module has reached its maximum count.
oneapi::tbb::task_group * group() const noexcept
void doneWaiting(std::exception_ptr iExcept)
std::vector< Worker * > Workers
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
std::list< std::string > const & context() const
std::shared_ptr< Worker > WorkerPtr
FunctorWaitingTask< F > * make_waiting_task(F f)
void endJob(ExceptionCollector &collector)
std::shared_ptr< ActivityRegistry > actReg_
PreGlobalEarlyTermination preGlobalEarlyTerminationSignal_
~SendTerminationSignalIfException()
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
void beginJob(ProductRegistry const &, eventsetup::ESRecordsToProxyIndices const &, ProcessBlockHelperBase const &)
SendTerminationSignalIfException(edm::ActivityRegistry *iReg, edm::GlobalContext const *iContext)
ExceptionToActionTable const & actionTable() const
returns the action table
void processOneGlobalAsync(WaitingTaskHolder holder, typename T::TransitionInfoType &, ServiceToken const &token, bool cleaningUpAfterException=false)
GlobalContext const * context_
auto wrap(F iFunc) -> decltype(iFunc())
ProcessContext const * processContext_
void completedSuccessfully()
GlobalSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry > modReg, std::vector< std::string > const &modulesToUse, ParameterSet &proc_pset, ProductRegistry &pregistry, PreallocationConfiguration const &prealloc, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, ProcessContext const *processContext)
std::vector< edm::propagate_const< WorkerPtr > > extraWorkers_