CMS 3D CMS Logo

WorkerManager.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_WorkerManager_h
2 #define FWCore_Framework_WorkerManager_h
3 
4 /*
5 
6 */
7 
17 
18 #include <memory>
19 
20 #include <set>
21 #include <string>
22 #include <vector>
23 
24 namespace edm {
25  class ExceptionCollector;
26  class StreamID;
27  class StreamContext;
28  class ModuleRegistry;
29  class PreallocationConfiguration;
30 
31  class WorkerManager {
32  public:
33  typedef std::vector<Worker*> AllWorkers;
34 
35  WorkerManager(std::shared_ptr<ActivityRegistry> actReg, ExceptionToActionTable const& actions);
36  WorkerManager(WorkerManager&&) = default;
37 
38  WorkerManager(std::shared_ptr<ModuleRegistry> modReg,
39  std::shared_ptr<ActivityRegistry> actReg,
42  ProductRegistry& preg,
43  PreallocationConfiguration const* prealloc,
44  std::shared_ptr<ProcessConfiguration> processConfiguration,
46  std::set<std::string>& unscheduledLabels,
47  std::vector<std::string>& shouldBeUsedLabels);
48 
49  void setOnDemandProducts(ProductRegistry& pregistry, std::set<std::string> const& unscheduledLabels) const;
50 
51  template <typename T, typename U>
52  void processOneOccurrence(typename T::MyPrincipal& principal,
53  EventSetup const& eventSetup,
54  StreamID streamID,
55  typename T::Context const* topContext,
56  U const* context,
57  bool cleaningUpAfterException = false);
58  template <typename T, typename U>
60  WaitingTask* task,
61  typename T::MyPrincipal& principal,
62  EventSetup const& eventSetup,
63  ServiceToken const& token,
64  StreamID streamID,
65  typename T::Context const* topContext,
66  U const* context);
67 
68  template <typename T>
70  typename T::MyPrincipal const& ep,
71  EventSetup const& es,
72  ServiceToken const& token,
73  StreamID streamID,
74  ParentContext const& parentContext,
75  typename T::Context const* context);
76 
78 
79  void beginJob(ProductRegistry const& iRegistry);
80  void endJob();
81  void endJob(ExceptionCollector& collector);
82 
83  void beginStream(StreamID iID, StreamContext& streamContext);
84  void endStream(StreamID iID, StreamContext& streamContext);
85 
86  AllWorkers const& allWorkers() const {return allWorkers_;}
87 
88  void addToAllWorkers(Worker* w);
89 
91 
93  ProductRegistry& preg,
94  PreallocationConfiguration const* prealloc,
95  std::shared_ptr<ProcessConfiguration const> processConfiguration,
96  std::string const& label);
97 
98  void resetAll();
99 
100  private:
101 
104  AllWorkers allWorkers_;
107  };
108 
109  template <typename T, typename U>
110  void
111  WorkerManager::processOneOccurrence(typename T::MyPrincipal& ep,
112  EventSetup const& es,
113  StreamID streamID,
114  typename T::Context const* topContext,
115  U const* context,
116  bool cleaningUpAfterException) {
117  this->resetAll();
118 
119  auto waitTask = make_empty_waiting_task();
120  waitTask->increment_ref_count();
121  processOneOccurrenceAsync<T,U>(waitTask.get(), ep, es, ServiceRegistry::instance().presentToken(), streamID, topContext, context);
122  waitTask->wait_for_all();
123  if(waitTask->exceptionPtr() != nullptr) {
124  try{
125  convertException::wrap([&]() {
126  std::rethrow_exception(* (waitTask->exceptionPtr()) );
127  });
128  } catch(cms::Exception& ex) {
129  if (ex.context().empty()) {
130  addContextAndPrintException("Calling function WorkerManager::processOneOccurrence", ex, cleaningUpAfterException);
131  } else {
132  addContextAndPrintException("", ex, cleaningUpAfterException);
133  }
134  throw;
135  }
136  }
137  }
138 
139  template <typename T, typename U>
140  void
142  typename T::MyPrincipal& ep,
143  EventSetup const& es,
144  ServiceToken const& token,
145  StreamID streamID,
146  typename T::Context const* topContext,
147  U const* context) {
148  //make sure the unscheduled items see this run or lumi transition
149  unscheduled_.runNowAsync<T,U>(task,ep, es, token, streamID, topContext, context);
150  }
151 
152  template <typename T>
153  void
155  typename T::MyPrincipal const& ep,
156  EventSetup const& es,
157  ServiceToken const& token,
158  StreamID streamID,
159  ParentContext const& parentContext,
160  typename T::Context const* context) {
161  unscheduled_.runAccumulatorsAsync<T>(task, ep, es, token, streamID, parentContext, context);
162  }
163 }
164 
165 #endif
void processOneOccurrenceAsync(WaitingTask *task, typename T::MyPrincipal &principal, EventSetup const &eventSetup, ServiceToken const &token, StreamID streamID, typename T::Context const *topContext, U const *context)
roAction_t actions[nactions]
Definition: GenABIO.cc:187
void endStream(StreamID iID, StreamContext &streamContext)
const double w
Definition: UKUtility.cc:23
UnscheduledCallProducer unscheduled_
void setOnDemandProducts(ProductRegistry &pregistry, std::set< std::string > const &unscheduledLabels) const
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
AllWorkers allWorkers_
void processOneOccurrence(typename T::MyPrincipal &principal, EventSetup const &eventSetup, StreamID streamID, typename T::Context const *topContext, U const *context, bool cleaningUpAfterException=false)
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
void beginJob(ProductRegistry const &iRegistry)
ServiceToken presentToken() const
ExceptionToActionTable const * actionTable_
void const * lastSetupEventPrincipal_
std::vector< Worker * > AllWorkers
Definition: WorkerManager.h:33
def principal(options)
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
std::list< std::string > const & context() const
Definition: Exception.cc:191
static ServiceRegistry & instance()
void beginStream(StreamID iID, StreamContext &streamContext)
AllWorkers const & allWorkers() const
Definition: WorkerManager.h:86
void runNowAsync(WaitingTask *task, typename T::MyPrincipal &p, EventSetup const &es, ServiceToken const &token, StreamID streamID, typename T::Context const *topContext, U const *context) const
HLT enums.
void runAccumulatorsAsync(WaitingTask *task, typename T::MyPrincipal const &ep, EventSetup const &es, ServiceToken const &token, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
WorkerRegistry workerReg_
The Registry of all workers that where requested Holds all instances of workers. In this implementati...
auto wrap(F iFunc) -> decltype(iFunc())
WorkerManager(std::shared_ptr< ActivityRegistry > actReg, ExceptionToActionTable const &actions)
void setupOnDemandSystem(Principal &principal, EventSetup const &es)
void processAccumulatorsAsync(WaitingTask *task, typename T::MyPrincipal const &ep, EventSetup const &es, ServiceToken const &token, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
long double T
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
ExceptionToActionTable const & actionTable() const
Definition: WorkerManager.h:90
void addToAllWorkers(Worker *w)