CMS 3D CMS Logo

GlobalSchedule.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_GlobalSchedule_h
2 #define FWCore_Framework_GlobalSchedule_h
3 
30 
31 #include <exception>
32 #include <map>
33 #include <memory>
34 #include <set>
35 #include <string>
36 #include <vector>
37 #include <sstream>
38 #include "boost/range/adaptor/reversed.hpp"
39 
40 namespace edm {
41 
42  class ExceptionCollector;
43  class PreallocationConfiguration;
44  class ModuleRegistry;
45  class TriggerResultInserter;
46  class PathStatusInserter;
47  class EndPathStatusInserter;
48 
50  public:
51  typedef std::vector<std::string> vstring;
52  typedef std::vector<Worker*> AllWorkers;
53  typedef std::shared_ptr<Worker> WorkerPtr;
54  typedef std::vector<Worker*> Workers;
55 
56  GlobalSchedule(std::shared_ptr<TriggerResultInserter> inserter,
57  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
58  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
59  std::shared_ptr<ModuleRegistry> modReg,
60  std::vector<std::string> const& modulesToUse,
61  ParameterSet& proc_pset,
62  ProductRegistry& pregistry,
63  PreallocationConfiguration const& prealloc,
65  std::shared_ptr<ActivityRegistry> areg,
66  std::shared_ptr<ProcessConfiguration const> processConfiguration,
67  ProcessContext const* processContext);
68  GlobalSchedule(GlobalSchedule const&) = delete;
69 
70  template <typename T>
72  typename T::TransitionInfoType&,
73  ServiceToken const& token,
74  bool cleaningUpAfterException = false);
75 
76  void beginJob(ProductRegistry const&,
80  ProcessContext const&);
81  void endJob(ExceptionCollector& collector);
82 
85 
89  std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
90 
92  bool terminate() const;
93 
95  void replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel);
96 
98  void deleteModule(std::string const& iLabel);
99 
101  AllWorkers const& allWorkers() const { return workerManagers_[0].allWorkers(); }
102 
103  private:
105  ExceptionToActionTable const& actionTable() const { return workerManagers_[0].actionTable(); }
106 
107  template <typename T>
108  void preScheduleSignal(GlobalContext const*, ServiceToken const&);
109 
110  template <typename T>
111  void postScheduleSignal(GlobalContext const*, ServiceWeakToken const&, std::exception_ptr&);
112 
113  void handleException(GlobalContext const*,
114  ServiceWeakToken const&,
115  bool cleaningUpAfterException,
116  std::exception_ptr&);
117 
118  std::vector<WorkerManager> workerManagers_;
119  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
120  std::vector<edm::propagate_const<WorkerPtr>> extraWorkers_;
122 
123  // The next 4 variables use the same naming convention, even though we have no intention
124  // to ever have concurrent ProcessBlocks or Jobs. They are all related to the number of
125  // WorkerManagers needed for global transitions.
129  static constexpr unsigned int numberOfConcurrentJobs_ = 1;
130  };
131 
132  template <typename T>
134  typename T::TransitionInfoType& transitionInfo,
135  ServiceToken const& token,
136  bool cleaningUpAfterException) {
137  auto const& principal = transitionInfo.principal();
138 
139  // Caught exception is propagated via WaitingTaskHolder
140  CMS_SA_ALLOW try {
141  //need the doneTask to own the memory
142  auto globalContext = std::make_shared<GlobalContext>(T::makeGlobalContext(principal, processContext_));
143 
144  ServiceWeakToken weakToken = token;
145  auto doneTask = make_waiting_task(
146  [this, iHolder, cleaningUpAfterException, globalContext, weakToken](std::exception_ptr const* iPtr) mutable {
147  std::exception_ptr excpt;
148  if (iPtr) {
149  excpt = *iPtr;
150  // add context information to the exception and print message
151  handleException(globalContext.get(), weakToken, cleaningUpAfterException, excpt);
152  }
153  postScheduleSignal<T>(globalContext.get(), weakToken, excpt);
154  iHolder.doneWaiting(excpt);
155  });
156 
157  //make sure the task doesn't get run until all workers have beens started
158  WaitingTaskHolder holdForLoop(*iHolder.group(), doneTask);
159 
160  CMS_SA_ALLOW try {
161  preScheduleSignal<T>(globalContext.get(), token);
162 
163  unsigned int managerIndex = principal.index();
164  if constexpr (T::branchType_ == InRun) {
165  managerIndex += numberOfConcurrentLumis_;
166  } else if constexpr (T::branchType_ == InProcess) {
168  }
169  WorkerManager& workerManager = workerManagers_[managerIndex];
170  workerManager.resetAll();
171 
172  ParentContext parentContext(globalContext.get());
173  // make sure the ProductResolvers know about their
174  // workers to allow proper data dependency handling
175  workerManager.setupResolvers(transitionInfo.principal());
176 
177  auto& aw = workerManager.allWorkers();
178  for (Worker* worker : boost::adaptors::reverse(aw)) {
179  worker->doWorkAsync<T>(
180  holdForLoop, transitionInfo, token, StreamID::invalidStreamID(), parentContext, globalContext.get());
181  }
182  } catch (...) {
183  holdForLoop.doneWaiting(std::current_exception());
184  }
185  } catch (...) {
186  iHolder.doneWaiting(std::current_exception());
187  }
188  }
189 
190  template <typename T>
192  if (actReg_) {
193  try {
195  convertException::wrap([this, globalContext]() { T::preScheduleSignal(actReg_.get(), globalContext); });
196  } catch (cms::Exception& ex) {
197  exceptionContext(ex, *globalContext, "Handling pre signal, likely in a service function");
198  throw;
199  }
200  }
201  }
202 
203  template <typename T>
205  ServiceWeakToken const& weakToken,
206  std::exception_ptr& excpt) {
207  if (actReg_) {
208  try {
209  convertException::wrap([this, &weakToken, globalContext]() {
210  ServiceRegistry::Operate op(weakToken.lock());
211  T::postScheduleSignal(actReg_.get(), globalContext);
212  });
213  } catch (cms::Exception& ex) {
214  if (not excpt) {
215  exceptionContext(ex, *globalContext, "Handling post signal, likely in a service function");
216  excpt = std::current_exception();
217  }
218  }
219  }
220  }
221 
222 } // namespace edm
223 
224 #endif
std::vector< std::string > vstring
#define CMS_SA_ALLOW
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
std::vector< WorkerManager > workerManagers_
roAction_t actions[nactions]
Definition: GenABIO.cc:181
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
std::vector< Worker * > AllWorkers
ExceptionToActionTable const & actionTable() const
returns the action table
void deleteModule(std::string const &iLabel)
Delete the module with label iLabel.
static StreamID invalidStreamID()
Definition: StreamID.h:45
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
oneapi::tbb::task_group * group() const noexcept
bool terminate() const
Return whether each output module has reached its maximum count.
void preScheduleSignal(GlobalContext const *, ServiceToken const &)
std::vector< Worker * > Workers
GlobalSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry > modReg, std::vector< std::string > const &modulesToUse, ParameterSet &proc_pset, ProductRegistry &pregistry, PreallocationConfiguration const &prealloc, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, ProcessContext const *processContext)
std::shared_ptr< Worker > WorkerPtr
AllWorkers const & allWorkers() const
Definition: WorkerManager.h:82
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
void endJob(ExceptionCollector &collector)
void postScheduleSignal(GlobalContext const *, ServiceWeakToken const &, std::exception_ptr &)
ServiceToken lock() const
Definition: ServiceToken.h:101
void doneWaiting(std::exception_ptr iExcept) noexcept
std::shared_ptr< ActivityRegistry > actReg_
void beginJob(ProductRegistry const &, eventsetup::ESRecordsToProductResolverIndices const &, ProcessBlockHelperBase const &, PathsAndConsumesOfModulesBase const &, ProcessContext const &)
unsigned int numberOfConcurrentRuns_
void handleException(GlobalContext const *, ServiceWeakToken const &, bool cleaningUpAfterException, std::exception_ptr &)
HLT enums.
void setupResolvers(Principal &principal)
void processOneGlobalAsync(WaitingTaskHolder holder, typename T::TransitionInfoType &, ServiceToken const &token, bool cleaningUpAfterException=false)
static constexpr unsigned int numberOfConcurrentProcessBlocks_
auto wrap(F iFunc) -> decltype(iFunc())
ProcessContext const * processContext_
long double T
unsigned int numberOfConcurrentLumis_
static constexpr unsigned int numberOfConcurrentJobs_
std::vector< edm::propagate_const< WorkerPtr > > extraWorkers_