CMS 3D CMS Logo

GlobalSchedule.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_GlobalSchedule_h
2 #define FWCore_Framework_GlobalSchedule_h
3 
4 /*
5 */
6 
30 
31 #include <map>
32 #include <memory>
33 #include <set>
34 #include <string>
35 #include <vector>
36 #include <sstream>
37 #include "boost/range/adaptor/reversed.hpp"
38 
39 namespace edm {
40 
41  namespace {
42  template <typename T>
43  class GlobalScheduleSignalSentry {
44  public:
45  GlobalScheduleSignalSentry(ActivityRegistry* a, typename T::Context const* context)
46  : a_(a), context_(context), allowThrow_(false) {
47  if (a_)
48  T::preScheduleSignal(a_, context_);
49  }
50  ~GlobalScheduleSignalSentry() noexcept(false) {
51  // Caught exception is rethrown
52  CMS_SA_ALLOW try {
53  if (a_)
54  T::postScheduleSignal(a_, context_);
55  } catch (...) {
56  if (allowThrow_) {
57  throw;
58  }
59  }
60  }
61 
62  void allowThrow() { allowThrow_ = true; }
63 
64  private:
65  // We own none of these resources.
66  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
67  typename T::Context const* context_;
68  bool allowThrow_;
69  };
70  } // namespace
71 
72  class ActivityRegistry;
73  class ExceptionCollector;
74  class ProcessContext;
75  class PreallocationConfiguration;
76  class ModuleRegistry;
77  class TriggerResultInserter;
78  class PathStatusInserter;
79  class EndPathStatusInserter;
80 
82  public:
83  typedef std::vector<std::string> vstring;
84  typedef std::vector<Worker*> AllWorkers;
85  typedef std::shared_ptr<Worker> WorkerPtr;
86  typedef std::vector<Worker*> Workers;
87 
88  GlobalSchedule(std::shared_ptr<TriggerResultInserter> inserter,
89  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
90  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
91  std::shared_ptr<ModuleRegistry> modReg,
92  std::vector<std::string> const& modulesToUse,
93  ParameterSet& proc_pset,
94  ProductRegistry& pregistry,
95  PreallocationConfiguration const& prealloc,
97  std::shared_ptr<ActivityRegistry> areg,
98  std::shared_ptr<ProcessConfiguration const> processConfiguration,
99  ProcessContext const* processContext);
100  GlobalSchedule(GlobalSchedule const&) = delete;
101 
102  template <typename T>
104  typename T::TransitionInfoType&,
105  ServiceToken const& token,
106  bool cleaningUpAfterException = false);
107 
108  void beginJob(ProductRegistry const&,
110  ProcessBlockHelperBase const&);
111  void endJob(ExceptionCollector& collector);
112 
115 
119  std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
120 
122  bool terminate() const;
123 
125  void replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel);
126 
128  void deleteModule(std::string const& iLabel);
129 
131  AllWorkers const& allWorkers() const { return workerManagers_[0].allWorkers(); }
132 
133  private:
134  //Sentry class to only send a signal if an
135  // exception occurs. An exception is identified
136  // by the destructor being called without first
137  // calling completedSuccessfully().
139  public:
141  : reg_(iReg), context_(iContext) {}
143  if (reg_) {
145  }
146  }
147  void completedSuccessfully() { reg_ = nullptr; }
148 
149  private:
150  edm::ActivityRegistry* reg_; // We do not use propagate_const because the registry itself is mutable.
152  };
153 
155  ExceptionToActionTable const& actionTable() const { return workerManagers_[0].actionTable(); }
156 
157  std::vector<WorkerManager> workerManagers_;
158  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
159  std::vector<edm::propagate_const<WorkerPtr>> extraWorkers_;
162  };
163 
164  template <typename T>
166  typename T::TransitionInfoType& transitionInfo,
167  ServiceToken const& token,
168  bool cleaningUpAfterException) {
169  auto const& principal = transitionInfo.principal();
170 
171  // Caught exception is propagated via WaitingTaskHolder
172  CMS_SA_ALLOW try {
173  //need the doneTask to own the memory
174  auto globalContext = std::make_shared<GlobalContext>(T::makeGlobalContext(principal, processContext_));
175 
176  if (actReg_) {
177  //Services may depend upon each other
179  T::preScheduleSignal(actReg_.get(), globalContext.get());
180  }
181 
182  ServiceWeakToken weakToken = token;
183  auto doneTask = make_waiting_task(
184  [this, iHolder, cleaningUpAfterException, globalContext, weakToken](std::exception_ptr const* iPtr) mutable {
185  std::exception_ptr excpt;
186  if (iPtr) {
187  excpt = *iPtr;
188  //add context information to the exception and print message
189  try {
190  convertException::wrap([&]() { std::rethrow_exception(excpt); });
191  } catch (cms::Exception& ex) {
192  //TODO: should add the transition type info
193  std::ostringstream ost;
194  if (ex.context().empty()) {
195  ost << "Processing " << T::transitionName() << " ";
196  }
197  ServiceRegistry::Operate op(weakToken.lock());
198  addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
199  excpt = std::current_exception();
200  }
201  if (actReg_) {
202  ServiceRegistry::Operate op(weakToken.lock());
203  actReg_->preGlobalEarlyTerminationSignal_(*globalContext, TerminationOrigin::ExceptionFromThisContext);
204  }
205  }
206  if (actReg_) {
207  // Caught exception is propagated via WaitingTaskHolder
208  CMS_SA_ALLOW try {
209  ServiceRegistry::Operate op(weakToken.lock());
210  T::postScheduleSignal(actReg_.get(), globalContext.get());
211  } catch (...) {
212  if (not excpt) {
213  excpt = std::current_exception();
214  }
215  }
216  }
217  iHolder.doneWaiting(excpt);
218  });
219  unsigned int managerIndex = principal.index();
220  if constexpr (T::branchType_ == InRun) {
221  managerIndex += numberOfConcurrentLumis_;
222  }
223  WorkerManager& workerManager = workerManagers_[managerIndex];
224  workerManager.resetAll();
225 
226  ParentContext parentContext(globalContext.get());
227  //make sure the ProductResolvers know about their
228  // workers to allow proper data dependency handling
229  workerManager.setupResolvers(transitionInfo.principal());
230 
231  //make sure the task doesn't get run until all workers have beens started
232  WaitingTaskHolder holdForLoop(*iHolder.group(), doneTask);
233  auto& aw = workerManager.allWorkers();
234  for (Worker* worker : boost::adaptors::reverse(aw)) {
235  worker->doWorkAsync<T>(
236  holdForLoop, transitionInfo, token, StreamID::invalidStreamID(), parentContext, globalContext.get());
237  }
238  } catch (...) {
239  iHolder.doneWaiting(std::current_exception());
240  }
241  }
242 } // namespace edm
243 
244 #endif
std::string_view transitionName(GlobalContext::Transition)
std::vector< std::string > vstring
#define CMS_SA_ALLOW
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
std::vector< WorkerManager > workerManagers_
roAction_t actions[nactions]
Definition: GenABIO.cc:181
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
std::vector< Worker * > AllWorkers
ExceptionToActionTable const & actionTable() const
returns the action table
void deleteModule(std::string const &iLabel)
Delete the module with label iLabel.
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
static StreamID invalidStreamID()
Definition: StreamID.h:45
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
oneapi::tbb::task_group * group() const noexcept
bool terminate() const
Return whether each output module has reached its maximum count.
void doneWaiting(std::exception_ptr iExcept)
std::vector< Worker * > Workers
GlobalSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry > modReg, std::vector< std::string > const &modulesToUse, ParameterSet &proc_pset, ProductRegistry &pregistry, PreallocationConfiguration const &prealloc, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, ProcessContext const *processContext)
std::shared_ptr< Worker > WorkerPtr
AllWorkers const & allWorkers() const
Definition: WorkerManager.h:88
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
void endJob(ExceptionCollector &collector)
ServiceToken lock() const
Definition: ServiceToken.h:101
std::shared_ptr< ActivityRegistry > actReg_
PreGlobalEarlyTermination preGlobalEarlyTerminationSignal_
SendTerminationSignalIfException(edm::ActivityRegistry *iReg, edm::GlobalContext const *iContext)
HLT enums.
void setupResolvers(Principal &principal)
void processOneGlobalAsync(WaitingTaskHolder holder, typename T::TransitionInfoType &, ServiceToken const &token, bool cleaningUpAfterException=false)
double a
Definition: hdecay.h:121
void beginJob(ProductRegistry const &, eventsetup::ESRecordsToProductResolverIndices const &, ProcessBlockHelperBase const &)
auto wrap(F iFunc) -> decltype(iFunc())
ProcessContext const * processContext_
std::list< std::string > const & context() const
Definition: Exception.cc:151
long double T
unsigned int numberOfConcurrentLumis_
std::vector< edm::propagate_const< WorkerPtr > > extraWorkers_