1 #ifndef FWCore_Framework_StreamSchedule_h 2 #define FWCore_Framework_StreamSchedule_h 99 class BranchIDListHelper;
101 class ExceptionCollector;
102 class OutputModuleCommunicator;
103 class ProcessContext;
106 class ModuleRegistry;
107 class TriggerResultInserter;
108 class PreallocationConfiguration;
112 class TriggerNamesService;
116 template <
typename T>
117 class StreamScheduleSignalSentry {
119 StreamScheduleSignalSentry(
ActivityRegistry*
a,
typename T::Context
const* context) :
120 a_(a), context_(context), allowThrow_(
false) {
121 if (a_) T::preScheduleSignal(a_, context_);
125 if (a_) { T::postScheduleSignal(a_, context_); }
127 if(allowThrow_) {
throw;}
138 typename T::Context
const* context_;
158 std::shared_ptr<ModuleRegistry>,
165 std::shared_ptr<ActivityRegistry> areg,
166 std::shared_ptr<ProcessConfiguration> processConfiguration,
167 bool allowEarlyDelete,
177 template <
typename T>
178 void processOneStream(
typename T::MyPrincipal&
principal,
180 bool cleaningUpAfterException =
false);
182 template <
typename T>
186 bool cleaningUpAfterException =
false);
199 std::vector<ModuleDescription const*> getAllModuleDescriptions()
const;
202 void availablePaths(std::vector<std::string>& oLabelsToFill)
const;
207 void triggerPaths(std::vector<std::string>& oLabelsToFill)
const;
210 void endPaths(std::vector<std::string>& oLabelsToFill)
const;
214 std::vector<std::string>& oLabelsToFill)
const;
216 void moduleDescriptionsInPath(
std::string const& iPathLabel,
217 std::vector<ModuleDescription const*>& descriptions,
218 unsigned int hint)
const;
220 void moduleDescriptionsInEndPath(
std::string const& iEndPathLabel,
221 std::vector<ModuleDescription const*>& descriptions,
222 unsigned int hint)
const;
228 return total_events_;
234 return total_passed_;
240 return totalEvents() - totalEventsPassed();
245 void enableEndPaths(
bool active);
249 bool endPathsEnabled()
const;
256 void clearCounters();
263 return workerManager_.allWorkers();
267 return number_of_unscheduled_modules_;
296 return workerManager_.actionTable();
304 std::exception_ptr finishProcessOneEvent(std::exception_ptr);
306 template <
typename T>
307 bool runTriggerPaths(
typename T::MyPrincipal
const&,
EventSetup const&,
typename T::Context
const*);
309 template <
typename T>
310 void runEndPaths(
typename T::MyPrincipal
const&,
EventSetup const&,
typename T::Context
const*);
317 std::shared_ptr<ProcessConfiguration const> processConfiguration,
319 vstring* labelsOnPaths);
323 std::shared_ptr<ProcessConfiguration const> processConfiguration,
325 vstring* labelsOnTriggerPaths);
329 std::shared_ptr<ProcessConfiguration const> processConfiguration,
332 void addToAllWorkers(
Worker*
w);
334 void resetEarlyDelete();
338 bool allowEarlyDelete);
386 reportSvc->reportSkippedEvent(ep.
id().
run(), ep.
id().
event());
389 template <
typename T>
392 bool cleaningUpAfterException) {
395 T::setStreamContext(streamContext_, ep);
396 StreamScheduleSignalSentry<T> sentry(actReg_.get(), &streamContext_);
401 workerManager_.processOneOccurrence<
T>(ep, es, streamID_, &streamContext_, &streamContext_, cleaningUpAfterException);
405 runTriggerPaths<T>(ep, es, &streamContext_);
407 if (endpathsAreActive_) runEndPaths<T>(ep, es, &streamContext_);
418 terminationSentry.completedSuccessfully();
424 template <
typename T>
426 typename T::MyPrincipal& ep,
428 bool cleaningUpAfterException) {
431 T::setStreamContext(streamContext_, ep);
435 [
this,iHolder,
id,cleaningUpAfterException,token](std::exception_ptr
const* iPtr)
mutable 438 std::exception_ptr excpt;
444 std::rethrow_exception(excpt);
448 std::ostringstream ost;
450 ost<<
"Processing "<<T::transitionName()<<
" "<<
id;
453 excpt = std::current_exception();
460 T::postScheduleSignal(actReg_.get(), &streamContext_);
463 excpt = std::current_exception();
470 auto task =
make_functor_task(tbb::task::allocate_root(), [
this,doneTask,&ep,&es,cleaningUpAfterException,token] ()
mutable {
472 T::preScheduleSignal(actReg_.get(), &streamContext_);
475 workerManager_.resetAll();
476 for(
auto&
p : end_paths_) {
477 p.runAllModulesAsync<
T>(doneTask, ep, es, streamID_, &streamContext_);
480 for(
auto&
p : trig_paths_) {
481 p.runAllModulesAsync<
T>(doneTask, ep, es, streamID_, &streamContext_);
484 workerManager_.processOneOccurrenceAsync<
T>(doneTask,
485 ep, es, streamID_, &streamContext_, &streamContext_);
488 if(streamID_.value() == 0) {
492 tbb::task::spawn( *task);
494 tbb::task::enqueue( *task);
499 template <
typename T>
502 for(
auto&
p : trig_paths_) {
503 p.processOneOccurrence<
T>(ep, es, streamID_, context);
505 return results_->accept();
508 template <
typename T>
513 for(
auto&
p : end_paths_) {
514 p.processOneOccurrence<
T>(ep, es, streamID_, context);
EventNumber_t event() const
StreamContext const * context_
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
std::shared_ptr< HLTGlobalStatus const > TrigResConstPtr
std::vector< Worker * > Workers
std::vector< int > empty_trig_paths_
roAction_t actions[nactions]
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
int totalEventsFailed() const
unsigned int numberOfUnscheduledModules() const
std::vector< Path > NonTrigPaths
std::shared_ptr< HLTGlobalStatus > TrigResPtr
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
volatile bool endpathsAreActive_
std::vector< BranchToCount > earlyDeleteBranchToCount_
EventID const & id() const
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
vstring end_path_name_list_
void reportSkipped(EventPrincipal const &ep) const
edm::propagate_const< WorkerPtr > results_inserter_
ExceptionToActionTable const & actionTable() const
returns the action table
std::shared_ptr< Worker > WorkerPtr
ServiceToken presentToken() const
void runEndPaths(typename T::MyPrincipal const &, EventSetup const &, typename T::Context const *)
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
std::vector< WorkerInPath > PathWorkers
int totalEventsPassed() const
void doneWaiting(std::exception_ptr iExcept)
StreamID streamID() const
SendTerminationSignalIfException(edm::ActivityRegistry *iReg, edm::StreamContext const *iContext)
StreamContext streamContext_
std::vector< std::string > vstring
std::list< std::string > const & context() const
static ServiceRegistry & instance()
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::vector< Worker * > AllWorkers
vstring empty_trig_path_names_
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
edm::propagate_const< TrigResPtr > results_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
void processOneStreamAsync(WaitingTaskHolder iTask, typename T::MyPrincipal &principal, EventSetup const &eventSetup, bool cleaningUpAfterException=false)
~SendTerminationSignalIfException()
std::atomic< bool > skippingEvent_
StreamContext const & context() const
auto wrap(F iFunc) -> decltype(iFunc())
bool runTriggerPaths(typename T::MyPrincipal const &, EventSetup const &, typename T::Context const *)
edm::ActivityRegistry * reg_
static std::string const triggerPaths
TrigResConstPtr results() const
std::vector< Path > TrigPaths
void processOneStream(typename T::MyPrincipal &principal, EventSetup const &eventSetup, bool cleaningUpAfterException=false)
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void completedSuccessfully()