CMS 3D CMS Logo

GlobalSchedule.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_GlobalSchedule_h
2 #define FWCore_Framework_GlobalSchedule_h
3 
29 
30 #include <exception>
31 #include <map>
32 #include <memory>
33 #include <set>
34 #include <string>
35 #include <vector>
36 #include <sstream>
37 #include "boost/range/adaptor/reversed.hpp"
38 
39 namespace edm {
40 
41  class ActivityRegistry;
42  class ExceptionCollector;
43  class ProcessContext;
44  class PreallocationConfiguration;
45  class ModuleRegistry;
46  class TriggerResultInserter;
47  class PathStatusInserter;
48  class EndPathStatusInserter;
49 
51  public:
52  typedef std::vector<std::string> vstring;
53  typedef std::vector<Worker*> AllWorkers;
54  typedef std::shared_ptr<Worker> WorkerPtr;
55  typedef std::vector<Worker*> Workers;
56 
57  GlobalSchedule(std::shared_ptr<TriggerResultInserter> inserter,
58  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
59  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
60  std::shared_ptr<ModuleRegistry> modReg,
61  std::vector<std::string> const& modulesToUse,
62  ParameterSet& proc_pset,
63  ProductRegistry& pregistry,
64  PreallocationConfiguration const& prealloc,
66  std::shared_ptr<ActivityRegistry> areg,
67  std::shared_ptr<ProcessConfiguration const> processConfiguration,
68  ProcessContext const* processContext);
69  GlobalSchedule(GlobalSchedule const&) = delete;
70 
71  template <typename T>
73  typename T::TransitionInfoType&,
74  ServiceToken const& token,
75  bool cleaningUpAfterException = false);
76 
77  void beginJob(ProductRegistry const&,
79  ProcessBlockHelperBase const&);
80  void endJob(ExceptionCollector& collector);
81 
84 
88  std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
89 
91  bool terminate() const;
92 
94  void replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel);
95 
97  void deleteModule(std::string const& iLabel);
98 
100  AllWorkers const& allWorkers() const { return workerManagers_[0].allWorkers(); }
101 
102  private:
104  ExceptionToActionTable const& actionTable() const { return workerManagers_[0].actionTable(); }
105 
106  template <typename T>
107  void preScheduleSignal(GlobalContext const*, ServiceToken const&);
108 
109  template <typename T>
110  void postScheduleSignal(GlobalContext const*, ServiceWeakToken const&, std::exception_ptr&);
111 
112  void handleException(GlobalContext const*,
113  ServiceWeakToken const&,
114  bool cleaningUpAfterException,
115  std::exception_ptr&);
116 
117  std::vector<WorkerManager> workerManagers_;
118  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
119  std::vector<edm::propagate_const<WorkerPtr>> extraWorkers_;
122  };
123 
124  template <typename T>
126  typename T::TransitionInfoType& transitionInfo,
127  ServiceToken const& token,
128  bool cleaningUpAfterException) {
129  auto const& principal = transitionInfo.principal();
130 
131  // Caught exception is propagated via WaitingTaskHolder
132  CMS_SA_ALLOW try {
133  //need the doneTask to own the memory
134  auto globalContext = std::make_shared<GlobalContext>(T::makeGlobalContext(principal, processContext_));
135 
136  ServiceWeakToken weakToken = token;
137  auto doneTask = make_waiting_task(
138  [this, iHolder, cleaningUpAfterException, globalContext, weakToken](std::exception_ptr const* iPtr) mutable {
139  std::exception_ptr excpt;
140  if (iPtr) {
141  excpt = *iPtr;
142  // add context information to the exception and print message
143  handleException(globalContext.get(), weakToken, cleaningUpAfterException, excpt);
144  }
145  postScheduleSignal<T>(globalContext.get(), weakToken, excpt);
146  iHolder.doneWaiting(excpt);
147  });
148 
149  //make sure the task doesn't get run until all workers have beens started
150  WaitingTaskHolder holdForLoop(*iHolder.group(), doneTask);
151 
152  CMS_SA_ALLOW try {
153  preScheduleSignal<T>(globalContext.get(), token);
154 
155  unsigned int managerIndex = principal.index();
156  if constexpr (T::branchType_ == InRun) {
157  managerIndex += numberOfConcurrentLumis_;
158  }
159  WorkerManager& workerManager = workerManagers_[managerIndex];
160  workerManager.resetAll();
161 
162  ParentContext parentContext(globalContext.get());
163  // make sure the ProductResolvers know about their
164  // workers to allow proper data dependency handling
165  workerManager.setupResolvers(transitionInfo.principal());
166 
167  auto& aw = workerManager.allWorkers();
168  for (Worker* worker : boost::adaptors::reverse(aw)) {
169  worker->doWorkAsync<T>(
170  holdForLoop, transitionInfo, token, StreamID::invalidStreamID(), parentContext, globalContext.get());
171  }
172  } catch (...) {
173  holdForLoop.doneWaiting(std::current_exception());
174  }
175  } catch (...) {
176  iHolder.doneWaiting(std::current_exception());
177  }
178  }
179 
180  template <typename T>
182  if (actReg_) {
183  try {
185  convertException::wrap([this, globalContext]() { T::preScheduleSignal(actReg_.get(), globalContext); });
186  } catch (cms::Exception& ex) {
187  std::ostringstream ost;
188  ex.addContext("Handling pre signal, likely in a service function");
189  exceptionContext(ost, *globalContext);
190  ex.addContext(ost.str());
191  throw;
192  }
193  }
194  }
195 
196  template <typename T>
198  ServiceWeakToken const& weakToken,
199  std::exception_ptr& excpt) {
200  if (actReg_) {
201  try {
202  convertException::wrap([this, &weakToken, globalContext]() {
203  ServiceRegistry::Operate op(weakToken.lock());
204  T::postScheduleSignal(actReg_.get(), globalContext);
205  });
206  } catch (cms::Exception& ex) {
207  if (not excpt) {
208  std::ostringstream ost;
209  ex.addContext("Handling post signal, likely in a service function");
210  exceptionContext(ost, *globalContext);
211  ex.addContext(ost.str());
212  excpt = std::current_exception();
213  }
214  }
215  }
216  }
217 
218 } // namespace edm
219 
220 #endif
std::vector< std::string > vstring
#define CMS_SA_ALLOW
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
std::vector< WorkerManager > workerManagers_
roAction_t actions[nactions]
Definition: GenABIO.cc:181
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
std::vector< Worker * > AllWorkers
ExceptionToActionTable const & actionTable() const
returns the action table
void deleteModule(std::string const &iLabel)
Delete the module with label iLabel.
static StreamID invalidStreamID()
Definition: StreamID.h:45
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
oneapi::tbb::task_group * group() const noexcept
bool terminate() const
Return whether each output module has reached its maximum count.
void preScheduleSignal(GlobalContext const *, ServiceToken const &)
std::vector< Worker * > Workers
GlobalSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry > modReg, std::vector< std::string > const &modulesToUse, ParameterSet &proc_pset, ProductRegistry &pregistry, PreallocationConfiguration const &prealloc, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, ProcessContext const *processContext)
std::shared_ptr< Worker > WorkerPtr
AllWorkers const & allWorkers() const
Definition: WorkerManager.h:80
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
void endJob(ExceptionCollector &collector)
void postScheduleSignal(GlobalContext const *, ServiceWeakToken const &, std::exception_ptr &)
ServiceToken lock() const
Definition: ServiceToken.h:101
void doneWaiting(std::exception_ptr iExcept) noexcept
std::shared_ptr< ActivityRegistry > actReg_
void addContext(std::string const &context)
Definition: Exception.cc:169
void handleException(GlobalContext const *, ServiceWeakToken const &, bool cleaningUpAfterException, std::exception_ptr &)
HLT enums.
void setupResolvers(Principal &principal)
void processOneGlobalAsync(WaitingTaskHolder holder, typename T::TransitionInfoType &, ServiceToken const &token, bool cleaningUpAfterException=false)
void beginJob(ProductRegistry const &, eventsetup::ESRecordsToProductResolverIndices const &, ProcessBlockHelperBase const &)
auto wrap(F iFunc) -> decltype(iFunc())
ProcessContext const * processContext_
long double T
unsigned int numberOfConcurrentLumis_
std::vector< edm::propagate_const< WorkerPtr > > extraWorkers_