1 #ifndef FWCore_Framework_WorkerManager_h 2 #define FWCore_Framework_WorkerManager_h 25 class ExceptionCollector;
29 class PreallocationConfiguration;
39 std::shared_ptr<ActivityRegistry> actReg,
44 std::shared_ptr<ProcessConfiguration> processConfiguration,
46 std::set<std::string>& unscheduledLabels,
47 std::vector<std::string>& shouldBeUsedLabels);
51 template <
typename T,
typename U>
55 typename T::Context
const* topContext,
57 bool cleaningUpAfterException =
false);
58 template <
typename T,
typename U>
65 typename T::Context
const* topContext,
70 typename T::MyPrincipal
const& ep,
75 typename T::Context
const* context);
95 std::shared_ptr<ProcessConfiguration const> processConfiguration,
109 template <
typename T,
typename U>
114 typename T::Context
const* topContext,
116 bool cleaningUpAfterException) {
120 waitTask->increment_ref_count();
122 waitTask->wait_for_all();
123 if(waitTask->exceptionPtr() !=
nullptr) {
126 std::rethrow_exception(* (waitTask->exceptionPtr()) );
139 template <
typename T,
typename U>
142 typename T::MyPrincipal& ep,
146 typename T::Context
const* topContext,
152 template <
typename T>
155 typename T::MyPrincipal
const& ep,
160 typename T::Context
const* context) {
void processOneOccurrenceAsync(WaitingTask *task, typename T::MyPrincipal &principal, EventSetup const &eventSetup, ServiceToken const &token, StreamID streamID, typename T::Context const *topContext, U const *context)
roAction_t actions[nactions]
void endStream(StreamID iID, StreamContext &streamContext)
UnscheduledCallProducer unscheduled_
void setOnDemandProducts(ProductRegistry &pregistry, std::set< std::string > const &unscheduledLabels) const
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
void processOneOccurrence(typename T::MyPrincipal &principal, EventSetup const &eventSetup, StreamID streamID, typename T::Context const *topContext, U const *context, bool cleaningUpAfterException=false)
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
void beginJob(ProductRegistry const &iRegistry)
ServiceToken presentToken() const
ExceptionToActionTable const * actionTable_
void const * lastSetupEventPrincipal_
std::vector< Worker * > AllWorkers
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
std::list< std::string > const & context() const
static ServiceRegistry & instance()
void beginStream(StreamID iID, StreamContext &streamContext)
AllWorkers const & allWorkers() const
void runNowAsync(WaitingTask *task, typename T::MyPrincipal &p, EventSetup const &es, ServiceToken const &token, StreamID streamID, typename T::Context const *topContext, U const *context) const
void runAccumulatorsAsync(WaitingTask *task, typename T::MyPrincipal const &ep, EventSetup const &es, ServiceToken const &token, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
WorkerRegistry workerReg_
The Registry of all workers that where requested Holds all instances of workers. In this implementati...
auto wrap(F iFunc) -> decltype(iFunc())
WorkerManager(std::shared_ptr< ActivityRegistry > actReg, ExceptionToActionTable const &actions)
void setupOnDemandSystem(Principal &principal, EventSetup const &es)
void processAccumulatorsAsync(WaitingTask *task, typename T::MyPrincipal const &ep, EventSetup const &es, ServiceToken const &token, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
ExceptionToActionTable const & actionTable() const
void addToAllWorkers(Worker *w)