1 #ifndef FWCore_Framework_StreamSchedule_h
2 #define FWCore_Framework_StreamSchedule_h
102 class BranchIDListHelper;
103 class ExceptionCollector;
104 class ExceptionToActionTable;
105 class OutputModuleCommunicator;
106 class ProcessContext;
109 class ModuleRegistry;
110 class TriggerResultInserter;
111 class PathStatusInserter;
112 class EndPathStatusInserter;
113 class PreallocationConfiguration;
117 class TriggerNamesService;
121 template <
typename T>
122 class StreamScheduleSignalSentry {
125 : a_(a), context_(context), allowThrow_(
false) {
127 T::preScheduleSignal(a_, context_);
129 ~StreamScheduleSignalSentry() noexcept(
false) {
133 T::postScheduleSignal(a_, context_);
142 void allowThrow() { allowThrow_ =
true; }
147 typename T::Context
const* context_;
169 std::shared_ptr<ModuleRegistry>,
176 std::shared_ptr<ActivityRegistry>
areg,
189 template <
typename T>
191 typename T::TransitionInfoType& transitionInfo,
193 bool cleaningUpAfterException =
false);
209 void availablePaths(std::vector<std::string>& oLabelsToFill)
const;
215 std::vector<ModuleDescription const*>& descriptions,
216 unsigned int hint)
const;
219 std::vector<ModuleDescription const*>& descriptions,
220 unsigned int hint)
const;
249 std::vector<std::string>
const& branchesToDeleteEarly,
298 std::vector<std::string>
const& endPathNames);
306 std::vector<std::string>
const& endPathNames);
313 std::vector<std::string>
const& endPathNames);
367 reportSvc->reportSkippedEvent(ep.
id().
run(), ep.
id().
event());
370 template <
typename T>
372 typename T::TransitionInfoType& transitionInfo,
374 bool cleaningUpAfterException) {
375 auto const& principal = transitionInfo.principal();
378 auto id = principal.id();
381 [
this, iHolder,
id, cleaningUpAfterException, weakToken](std::exception_ptr
const* iPtr)
mutable {
382 std::exception_ptr excpt;
390 std::ostringstream ost;
392 ost <<
"Processing " << T::transitionName() <<
" " <<
id;
396 excpt = std::current_exception();
408 excpt = std::current_exception();
416 auto token = weakToken.
lock();
424 h.doneWaiting(std::current_exception());
443 iHolder.
group()->run([task]() {
448 oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
449 arena.enqueue([task]() {
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
EventNumber_t event() const
StreamContext const * context_
ServiceToken lock() const
pathNames_ & tns()), endPathNames_(&tns.getEndPaths()), wantSummary_(tns.wantSummary()
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
std::shared_ptr< HLTGlobalStatus const > TrigResConstPtr
std::vector< Worker * > Workers
std::vector< int > empty_trig_paths_
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames)
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
uint16_t *__restrict__ id
int totalEventsFailed() const
unsigned int numberOfUnscheduledModules() const
std::vector< Path > NonTrigPaths
std::shared_ptr< HLTGlobalStatus > TrigResPtr
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
void processOneEventAsync(WaitingTaskHolder iTask, EventTransitionInfo &, ServiceToken const &token, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
std::vector< BranchToCount > earlyDeleteBranchToCount_
EventID const & id() const
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
void addToAllWorkers(Worker *w)
void reportSkipped(EventPrincipal const &ep) const
edm::propagate_const< WorkerPtr > results_inserter_
ExceptionToActionTable const & actionTable() const
returns the action table
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::shared_ptr< Worker > WorkerPtr
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames)
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
void finishedPaths(std::atomic< std::exception_ptr * > &, WaitingTaskHolder, EventTransitionInfo &)
void processOneOccurrenceAsync(WaitingTaskHolder, typename T::TransitionInfoType &, ServiceToken const &, StreamID, typename T::Context const *topContext, U const *context)
oneapi::tbb::task_group * group() const noexcept
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames)
std::vector< WorkerInPath > PathWorkers
int totalEventsPassed() const
std::vector< int > empty_end_paths_
void getTriggerReport(TriggerReport &rep) const
void doneWaiting(std::exception_ptr iExcept)
StreamID streamID() const
PreStreamEarlyTermination preStreamEarlyTerminationSignal_
SendTerminationSignalIfException(edm::ActivityRegistry *iReg, edm::StreamContext const *iContext)
StreamContext streamContext_
std::vector< std::string > vstring
std::list< std::string > const & context() const
StreamSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, StreamID streamID, ProcessContext const *processContext)
FunctorWaitingTask< F > * make_waiting_task(F f)
std::vector< Worker * > AllWorkers
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
void deleteModule(std::string const &iLabel)
Delete the module with label iLabel.
void clearCounters()
Clear all the counters in the trigger report.
void initializeEarlyDelete(ModuleRegistry &modReg, std::vector< std::string > const &branchesToDeleteEarly, edm::ProductRegistry const &preg)
AllWorkers const & allWorkers() const
unsigned int value() const
void processOneStreamAsync(WaitingTaskHolder iTask, typename T::TransitionInfoType &transitionInfo, ServiceToken const &token, bool cleaningUpAfterException=false)
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
edm::propagate_const< TrigResPtr > results_
FunctorTask< F > * make_functor_task(F f)
~SendTerminationSignalIfException()
std::atomic< bool > skippingEvent_
StreamContext const & context() const
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
auto wrap(F iFunc) -> decltype(iFunc())
edm::ActivityRegistry * reg_
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
TrigResConstPtr results() const
std::vector< Path > TrigPaths
ExceptionToActionTable const & actionTable() const
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
void completedSuccessfully()