CMS 3D CMS Logo

WorkerManager.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_WorkerManager_h
2 #define FWCore_Framework_WorkerManager_h
3 
4 /*
5 
6 */
7 
17 
18 #include <memory>
19 
20 #include <set>
21 #include <string>
22 #include <vector>
23 
24 namespace edm {
25  class ExceptionCollector;
26  class StreamID;
27  class StreamContext;
28  class ModuleRegistry;
29  class PreallocationConfiguration;
30  namespace eventsetup {
32  }
33  class WorkerManager {
34  public:
35  typedef std::vector<Worker*> AllWorkers;
36 
37  WorkerManager(std::shared_ptr<ActivityRegistry> actReg, ExceptionToActionTable const& actions);
38  WorkerManager(WorkerManager&&) = default;
39 
40  WorkerManager(std::shared_ptr<ModuleRegistry> modReg,
41  std::shared_ptr<ActivityRegistry> actReg,
43  void addToUnscheduledWorkers(ParameterSet& pset,
44  ProductRegistry& preg,
45  PreallocationConfiguration const* prealloc,
46  std::shared_ptr<ProcessConfiguration> processConfiguration,
48  std::set<std::string>& unscheduledLabels,
49  std::vector<std::string>& shouldBeUsedLabels);
50 
51  template <typename T, typename U>
52  void processOneOccurrence(typename T::MyPrincipal& principal,
53  EventSetupImpl const& eventSetup,
54  StreamID streamID,
55  typename T::Context const* topContext,
56  U const* context,
57  bool cleaningUpAfterException = false);
58  template <typename T, typename U>
59  void processOneOccurrenceAsync(WaitingTask* task,
60  typename T::MyPrincipal& principal,
61  EventSetupImpl const& eventSetup,
62  ServiceToken const& token,
63  StreamID streamID,
64  typename T::Context const* topContext,
65  U const* context);
66 
67  template <typename T>
68  void processAccumulatorsAsync(WaitingTask* task,
69  typename T::MyPrincipal const& ep,
70  EventSetupImpl const& es,
71  ServiceToken const& token,
72  StreamID streamID,
73  ParentContext const& parentContext,
74  typename T::Context const* context);
75 
76  void setupOnDemandSystem(Principal& principal, EventSetupImpl const& es);
77 
78  void beginJob(ProductRegistry const& iRegistry, eventsetup::ESRecordsToProxyIndices const&);
79  void endJob();
80  void endJob(ExceptionCollector& collector);
81 
82  void beginStream(StreamID iID, StreamContext& streamContext);
83  void endStream(StreamID iID, StreamContext& streamContext);
84 
85  AllWorkers const& allWorkers() const { return allWorkers_; }
86 
87  void addToAllWorkers(Worker* w);
88 
89  ExceptionToActionTable const& actionTable() const { return *actionTable_; }
90 
91  Worker* getWorker(ParameterSet& pset,
92  ProductRegistry& preg,
93  PreallocationConfiguration const* prealloc,
94  std::shared_ptr<ProcessConfiguration const> processConfiguration,
95  std::string const& label);
96 
97  void resetAll();
98 
99  private:
102  AllWorkers allWorkers_;
105  };
106 
107  template <typename T, typename U>
108  void WorkerManager::processOneOccurrence(typename T::MyPrincipal& ep,
109  EventSetupImpl const& es,
110  StreamID streamID,
111  typename T::Context const* topContext,
112  U const* context,
113  bool cleaningUpAfterException) {
114  this->resetAll();
115 
116  auto waitTask = make_empty_waiting_task();
117  waitTask->increment_ref_count();
118  processOneOccurrenceAsync<T, U>(
119  waitTask.get(), ep, es, ServiceRegistry::instance().presentToken(), streamID, topContext, context);
120  waitTask->wait_for_all();
121  if (waitTask->exceptionPtr() != nullptr) {
122  try {
123  convertException::wrap([&]() { std::rethrow_exception(*(waitTask->exceptionPtr())); });
124  } catch (cms::Exception& ex) {
125  if (ex.context().empty()) {
127  "Calling function WorkerManager::processOneOccurrence", ex, cleaningUpAfterException);
128  } else {
129  addContextAndPrintException("", ex, cleaningUpAfterException);
130  }
131  throw;
132  }
133  }
134  }
135 
136  template <typename T, typename U>
138  typename T::MyPrincipal& ep,
139  EventSetupImpl const& es,
140  ServiceToken const& token,
141  StreamID streamID,
142  typename T::Context const* topContext,
143  U const* context) {
144  //make sure the unscheduled items see this run or lumi transition
145  unscheduled_.runNowAsync<T, U>(task, ep, es, token, streamID, topContext, context);
146  }
147 
148  template <typename T>
150  typename T::MyPrincipal const& ep,
151  EventSetupImpl const& es,
152  ServiceToken const& token,
153  StreamID streamID,
154  ParentContext const& parentContext,
155  typename T::Context const* context) {
156  unscheduled_.runAccumulatorsAsync<T>(task, ep, es, token, streamID, parentContext, context);
157  }
158 } // namespace edm
159 
160 #endif
roAction_t actions[nactions]
Definition: GenABIO.cc:181
const double w
Definition: UKUtility.cc:23
void processOneOccurrence(typename T::MyPrincipal &principal, EventSetupImpl const &eventSetup, StreamID streamID, typename T::Context const *topContext, U const *context, bool cleaningUpAfterException=false)
UnscheduledCallProducer unscheduled_
AllWorkers allWorkers_
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
ServiceToken presentToken() const
void beginJob()
Definition: Breakpoints.cc:14
ExceptionToActionTable const * actionTable_
void const * lastSetupEventPrincipal_
std::vector< Worker * > AllWorkers
Definition: WorkerManager.h:35
char const * label
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
std::list< std::string > const & context() const
Definition: Exception.cc:147
static ServiceRegistry & instance()
void processOneOccurrenceAsync(WaitingTask *task, typename T::MyPrincipal &principal, EventSetupImpl const &eventSetup, ServiceToken const &token, StreamID streamID, typename T::Context const *topContext, U const *context)
AllWorkers const & allWorkers() const
Definition: WorkerManager.h:85
void processAccumulatorsAsync(WaitingTask *task, typename T::MyPrincipal const &ep, EventSetupImpl const &es, ServiceToken const &token, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
HLT enums.
WorkerRegistry workerReg_
The Registry of all workers that where requested Holds all instances of workers. In this implementati...
auto wrap(F iFunc) -> decltype(iFunc())
long double T
ExceptionToActionTable const & actionTable() const
Definition: WorkerManager.h:89