1 #ifndef FWCore_Framework_StreamSchedule_h 2 #define FWCore_Framework_StreamSchedule_h 100 class BranchIDListHelper;
102 class ExceptionCollector;
103 class ExceptionToActionTable;
104 class OutputModuleCommunicator;
105 class ProcessContext;
108 class ModuleRegistry;
109 class TriggerResultInserter;
110 class PathStatusInserter;
111 class EndPathStatusInserter;
112 class PreallocationConfiguration;
116 class TriggerNamesService;
120 template <
typename T>
121 class StreamScheduleSignalSentry {
123 StreamScheduleSignalSentry(
ActivityRegistry*
a,
typename T::Context
const* context)
124 : a_(a), context_(context), allowThrow_(
false) {
126 T::preScheduleSignal(a_, context_);
131 T::postScheduleSignal(a_, context_);
140 void allowThrow() { allowThrow_ =
true; }
145 typename T::Context
const* context_;
166 std::vector<
edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
167 std::shared_ptr<ModuleRegistry>,
174 std::shared_ptr<ActivityRegistry> areg,
175 std::shared_ptr<ProcessConfiguration> processConfiguration,
176 bool allowEarlyDelete,
182 void processOneEventAsync(
189 template <
typename T>
194 bool cleaningUpAfterException =
false);
207 std::vector<ModuleDescription const*> getAllModuleDescriptions()
const;
210 void availablePaths(std::vector<std::string>& oLabelsToFill)
const;
213 void modulesInPath(
std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill)
const;
215 void moduleDescriptionsInPath(
std::string const& iPathLabel,
216 std::vector<ModuleDescription const*>& descriptions,
217 unsigned int hint)
const;
219 void moduleDescriptionsInEndPath(
std::string const& iEndPathLabel,
220 std::vector<ModuleDescription const*>& descriptions,
221 unsigned int hint)
const;
238 void enableEndPaths(
bool active);
242 bool endPathsEnabled()
const;
249 void clearCounters();
255 AllWorkers
const&
allWorkers()
const {
return workerManager_.allWorkers(); }
269 : reg_(iReg), context_(iContext) {}
287 void finishedPaths(std::atomic<std::exception_ptr*>&,
291 std::exception_ptr finishProcessOneEvent(std::exception_ptr);
298 std::shared_ptr<ProcessConfiguration const> processConfiguration,
302 std::vector<std::string>
const& endPathNames);
306 std::shared_ptr<ProcessConfiguration const> processConfiguration,
310 std::vector<std::string>
const& endPathNames);
314 std::shared_ptr<ProcessConfiguration const> processConfiguration,
317 std::vector<std::string>
const& endPathNames);
319 void addToAllWorkers(
Worker*
w);
321 void resetEarlyDelete();
325 bool allowEarlyDelete);
330 void makePathStatusInserters(
332 std::vector<
edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
376 reportSvc->reportSkippedEvent(ep.
id().
run(), ep.
id().
event());
379 template <
typename T>
381 typename T::MyPrincipal& ep,
384 bool cleaningUpAfterException) {
385 T::setStreamContext(streamContext_, ep);
389 tbb::task::allocate_root(),
390 [
this, iHolder,
id, cleaningUpAfterException, token](std::exception_ptr
const* iPtr)
mutable {
391 std::exception_ptr excpt;
399 std::ostringstream ost;
401 ost <<
"Processing " << T::transitionName() <<
" " <<
id;
405 excpt = std::current_exception();
414 T::postScheduleSignal(actReg_.get(), &streamContext_);
417 excpt = std::current_exception();
427 T::preScheduleSignal(actReg_.get(), &streamContext_);
429 workerManager_.resetAll();
431 h.doneWaiting(std::current_exception());
435 for (
auto&
p : end_paths_) {
436 p.runAllModulesAsync<
T>(doneTask, ep, es, token, streamID_, &streamContext_);
439 for (
auto&
p : trig_paths_) {
440 p.runAllModulesAsync<
T>(doneTask, ep, es, token, streamID_, &streamContext_);
443 workerManager_.processOneOccurrenceAsync<
T>(
444 doneTask, ep, es, token, streamID_, &streamContext_, &streamContext_);
447 if (streamID_.value() == 0) {
451 tbb::task::spawn(*task);
453 tbb::task::enqueue(*task);
EventNumber_t event() const
StreamContext const * context_
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
std::shared_ptr< HLTGlobalStatus const > TrigResConstPtr
std::vector< Worker * > Workers
std::vector< int > empty_trig_paths_
roAction_t actions[nactions]
void processOneStreamAsync(WaitingTaskHolder iTask, typename T::MyPrincipal &principal, EventSetupImpl const &eventSetup, ServiceToken const &token, bool cleaningUpAfterException=false)
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
int totalEventsFailed() const
unsigned int numberOfUnscheduledModules() const
std::vector< Path > NonTrigPaths
std::shared_ptr< HLTGlobalStatus > TrigResPtr
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
volatile bool endpathsAreActive_
std::vector< BranchToCount > earlyDeleteBranchToCount_
EventID const & id() const
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
void reportSkipped(EventPrincipal const &ep) const
edm::propagate_const< WorkerPtr > results_inserter_
ExceptionToActionTable const & actionTable() const
returns the action table
std::shared_ptr< Worker > WorkerPtr
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
std::vector< WorkerInPath > PathWorkers
int totalEventsPassed() const
std::vector< int > empty_end_paths_
void doneWaiting(std::exception_ptr iExcept)
StreamID streamID() const
SendTerminationSignalIfException(edm::ActivityRegistry *iReg, edm::StreamContext const *iContext)
StreamContext streamContext_
std::vector< std::string > vstring
std::list< std::string > const & context() const
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::vector< Worker * > AllWorkers
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
edm::propagate_const< TrigResPtr > results_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
~SendTerminationSignalIfException()
std::atomic< bool > skippingEvent_
StreamContext const & context() const
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
auto wrap(F iFunc) -> decltype(iFunc())
edm::ActivityRegistry * reg_
TrigResConstPtr results() const
std::vector< Path > TrigPaths
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void completedSuccessfully()