CMS 3D CMS Logo

GlobalSchedule.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_GlobalSchedule_h
2 #define FWCore_Framework_GlobalSchedule_h
3 
4 /*
5 */
6 
24 
25 #include <map>
26 #include <memory>
27 #include <set>
28 #include <string>
29 #include <vector>
30 #include <sstream>
31 #include "boost/range/adaptor/reversed.hpp"
32 
33 namespace edm {
34 
35  namespace {
36  template <typename T>
37  class GlobalScheduleSignalSentry {
38  public:
39  GlobalScheduleSignalSentry(ActivityRegistry* a, typename T::Context const* context)
40  : a_(a), context_(context), allowThrow_(false) {
41  if (a_)
42  T::preScheduleSignal(a_, context_);
43  }
44  ~GlobalScheduleSignalSentry() noexcept(false) {
45  try {
46  if (a_)
47  T::postScheduleSignal(a_, context_);
48  } catch (...) {
49  if (allowThrow_) {
50  throw;
51  }
52  }
53  }
54 
55  void allowThrow() { allowThrow_ = true; }
56 
57  private:
58  // We own none of these resources.
59  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
60  typename T::Context const* context_;
61  bool allowThrow_;
62  };
63  } // namespace
64 
65  class ActivityRegistry;
66  class EventSetupImpl;
67  class ExceptionCollector;
68  class ProcessContext;
69  class PreallocationConfiguration;
70  class ModuleRegistry;
71  class TriggerResultInserter;
72  class PathStatusInserter;
73  class EndPathStatusInserter;
74 
76  public:
77  typedef std::vector<std::string> vstring;
78  typedef std::vector<Worker*> AllWorkers;
79  typedef std::shared_ptr<Worker> WorkerPtr;
80  typedef std::vector<Worker*> Workers;
81 
82  GlobalSchedule(std::shared_ptr<TriggerResultInserter> inserter,
83  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
84  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
85  std::shared_ptr<ModuleRegistry> modReg,
86  std::vector<std::string> const& modulesToUse,
87  ParameterSet& proc_pset,
88  ProductRegistry& pregistry,
89  PreallocationConfiguration const& prealloc,
91  std::shared_ptr<ActivityRegistry> areg,
92  std::shared_ptr<ProcessConfiguration> processConfiguration,
93  ProcessContext const* processContext);
94  GlobalSchedule(GlobalSchedule const&) = delete;
95 
96  template <typename T>
97  void processOneGlobalAsync(WaitingTaskHolder holder,
98  typename T::MyPrincipal& principal,
99  EventSetupImpl const& eventSetup,
100  ServiceToken const& token,
101  bool cleaningUpAfterException = false);
102 
104  void endJob(ExceptionCollector& collector);
105 
108 
112  std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
113 
116  void getTriggerReport(TriggerReport& rep) const;
117 
119  bool terminate() const;
120 
122  void replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel);
123 
125  AllWorkers const& allWorkers() const { return workerManagers_[0].allWorkers(); }
126 
127  private:
128  //Sentry class to only send a signal if an
129  // exception occurs. An exception is identified
130  // by the destructor being called without first
131  // calling completedSuccessfully().
133  public:
135  : reg_(iReg), context_(iContext) {}
137  if (reg_) {
138  reg_->preGlobalEarlyTerminationSignal_(*context_, TerminationOrigin::ExceptionFromThisContext);
139  }
140  }
141  void completedSuccessfully() { reg_ = nullptr; }
142 
143  private:
144  edm::ActivityRegistry* reg_; // We do not use propagate_const because the registry itself is mutable.
146  };
147 
149  ExceptionToActionTable const& actionTable() const { return workerManagers_[0].actionTable(); }
150 
151  std::vector<WorkerManager> workerManagers_;
152  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
153  std::vector<edm::propagate_const<WorkerPtr>> extraWorkers_;
155  };
156 
157  template <typename T>
159  typename T::MyPrincipal& ep,
160  EventSetupImpl const& es,
161  ServiceToken const& token,
162  bool cleaningUpAfterException) {
163  try {
164  //need the doneTask to own the memory
165  auto globalContext = std::make_shared<GlobalContext>(T::makeGlobalContext(ep, processContext_));
166 
167  if (actReg_) {
168  //Services may depend upon each other
169  ServiceRegistry::Operate op(token);
170  T::preScheduleSignal(actReg_.get(), globalContext.get());
171  }
172 
173  auto doneTask = make_waiting_task(
174  tbb::task::allocate_root(),
175  [this, iHolder, cleaningUpAfterException, globalContext, token](std::exception_ptr const* iPtr) mutable {
176  std::exception_ptr excpt;
177  if (iPtr) {
178  excpt = *iPtr;
179  //add context information to the exception and print message
180  try {
181  convertException::wrap([&]() { std::rethrow_exception(excpt); });
182  } catch (cms::Exception& ex) {
183  //TODO: should add the transition type info
184  std::ostringstream ost;
185  if (ex.context().empty()) {
186  ost << "Processing " << T::transitionName() << " ";
187  }
188  ServiceRegistry::Operate op(token);
189  addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
190  excpt = std::current_exception();
191  }
192  if (actReg_) {
193  ServiceRegistry::Operate op(token);
194  actReg_->preGlobalEarlyTerminationSignal_(*globalContext, TerminationOrigin::ExceptionFromThisContext);
195  }
196  }
197  if (actReg_) {
198  try {
199  ServiceRegistry::Operate op(token);
200  T::postScheduleSignal(actReg_.get(), globalContext.get());
201  } catch (...) {
202  if (not excpt) {
203  excpt = std::current_exception();
204  }
205  }
206  }
207  iHolder.doneWaiting(excpt);
208  });
209  workerManagers_[ep.index()].resetAll();
210 
211  ParentContext parentContext(globalContext.get());
212  //make sure the ProductResolvers know about their
213  // workers to allow proper data dependency handling
214  workerManagers_[ep.index()].setupOnDemandSystem(ep, es);
215 
216  //make sure the task doesn't get run until all workers have beens started
217  WaitingTaskHolder holdForLoop(doneTask);
218  auto& aw = workerManagers_[ep.index()].allWorkers();
219  for (Worker* worker : boost::adaptors::reverse(aw)) {
220  worker->doWorkAsync<T>(
221  doneTask, ep, es, token, StreamID::invalidStreamID(), parentContext, globalContext.get());
222  }
223  } catch (...) {
224  iHolder.doneWaiting(std::current_exception());
225  }
226  }
227 } // namespace edm
228 
229 #endif
std::vector< std::string > vstring
std::vector< WorkerManager > workerManagers_
roAction_t actions[nactions]
Definition: GenABIO.cc:181
std::vector< Worker * > AllWorkers
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
static StreamID invalidStreamID()
Definition: StreamID.h:44
void beginJob()
Definition: Breakpoints.cc:14
def principal(options)
void doneWaiting(std::exception_ptr iExcept)
std::vector< Worker * > Workers
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
std::list< std::string > const & context() const
Definition: Exception.cc:147
std::shared_ptr< Worker > WorkerPtr
rep
Definition: cuy.py:1190
void processOneGlobalAsync(WaitingTaskHolder holder, typename T::MyPrincipal &principal, EventSetupImpl const &eventSetup, ServiceToken const &token, bool cleaningUpAfterException=false)
std::shared_ptr< ActivityRegistry > actReg_
#define noexcept
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:87
SendTerminationSignalIfException(edm::ActivityRegistry *iReg, edm::GlobalContext const *iContext)
ExceptionToActionTable const & actionTable() const
returns the action table
HLT enums.
double a
Definition: hdecay.h:121
auto wrap(F iFunc) -> decltype(iFunc())
ProcessContext const * processContext_
long double T
std::vector< edm::propagate_const< WorkerPtr > > extraWorkers_