CMS 3D CMS Logo

GlobalSchedule.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_GlobalSchedule_h
2 #define FWCore_Framework_GlobalSchedule_h
3 
4 /*
5 */
6 
24 
25 #include <map>
26 #include <memory>
27 #include <set>
28 #include <string>
29 #include <vector>
30 #include <sstream>
31 #include "boost/range/adaptor/reversed.hpp"
32 
33 
34 namespace edm {
35 
36  namespace {
37  template <typename T>
38  class GlobalScheduleSignalSentry {
39  public:
40  GlobalScheduleSignalSentry(ActivityRegistry* a, typename T::Context const* context) :
41  a_(a), context_(context),
42  allowThrow_(false) {
43  if (a_) T::preScheduleSignal(a_, context_);
44  }
45  ~GlobalScheduleSignalSentry() noexcept(false) {
46  try {
47  if (a_) T::postScheduleSignal(a_, context_);
48  } catch(...) {
49  if(allowThrow_) {throw;}
50  }
51  }
52 
53  void allowThrow() {
54  allowThrow_ = true;
55  }
56 
57  private:
58  // We own none of these resources.
59  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
60  typename T::Context const* context_;
61  bool allowThrow_;
62  };
63  }
64 
65  class ActivityRegistry;
66  class EventSetup;
67  class ExceptionCollector;
68  class ProcessContext;
69  class PreallocationConfiguration;
70  class ModuleRegistry;
71  class TriggerResultInserter;
72 
74  public:
75  typedef std::vector<std::string> vstring;
76  typedef std::vector<Worker*> AllWorkers;
77  typedef std::shared_ptr<Worker> WorkerPtr;
78  typedef std::vector<Worker*> Workers;
79 
80  GlobalSchedule(std::shared_ptr<TriggerResultInserter> inserter,
81  std::shared_ptr<ModuleRegistry> modReg,
82  std::vector<std::string> const& modulesToUse,
83  ParameterSet& proc_pset,
84  ProductRegistry& pregistry,
85  PreallocationConfiguration const& prealloc,
87  std::shared_ptr<ActivityRegistry> areg,
88  std::shared_ptr<ProcessConfiguration> processConfiguration,
89  ProcessContext const* processContext);
90  GlobalSchedule(GlobalSchedule const&) = delete;
91 
92  template <typename T>
93  void processOneGlobal(typename T::MyPrincipal& principal,
94  EventSetup const& eventSetup,
95  bool cleaningUpAfterException = false);
96 
97  template <typename T>
98  void processOneGlobalAsync(WaitingTaskHolder holder,
99  typename T::MyPrincipal& principal,
100  EventSetup const& eventSetup,
101  bool cleaningUpAfterException = false);
102 
103  void beginJob(ProductRegistry const&);
104  void endJob(ExceptionCollector & collector);
105 
108 
112  std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
113 
116  void getTriggerReport(TriggerReport& rep) const;
117 
119  bool terminate() const;
120 
122  void replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel);
123 
125  AllWorkers const& allWorkers() const {
126  return workerManager_.allWorkers();
127  }
128 
129  private:
130  //Sentry class to only send a signal if an
131  // exception occurs. An exception is identified
132  // by the destructor being called without first
133  // calling completedSuccessfully().
135  public:
137  reg_(iReg),
138  context_(iContext){}
140  if(reg_) {
141  reg_->preGlobalEarlyTerminationSignal_(*context_,TerminationOrigin::ExceptionFromThisContext);
142  }
143  }
145  reg_ = nullptr;
146  }
147  private:
148  edm::ActivityRegistry* reg_; // We do not use propagate_const because the registry itself is mutable.
150  };
151 
152 
153  template<typename T>
154  void runNow(typename T::MyPrincipal const& p, EventSetup const& es,
155  GlobalContext const* context);
156 
159  return workerManager_.actionTable();
160  }
161 
162  void addToAllWorkers(Worker* w);
163 
165  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
167 
168 
170  };
171 
172 
173  template <typename T>
174  void
175  GlobalSchedule::processOneGlobal(typename T::MyPrincipal& ep,
176  EventSetup const& es,
177  bool cleaningUpAfterException) {
178  GlobalContext globalContext = T::makeGlobalContext(ep, processContext_);
179 
180  GlobalScheduleSignalSentry<T> sentry(actReg_.get(), &globalContext);
181 
182  SendTerminationSignalIfException terminationSentry(actReg_.get(), &globalContext);
183 
184  //If we are in an end transition, we need to reset failed items since they might
185  // be set this time around
186  if( not T::begin_) {
187  ep.resetFailedFromThisProcess();
188  }
189  // This call takes care of the unscheduled processing.
190  workerManager_.processOneOccurrence<T>(ep, es, StreamID::invalidStreamID(), &globalContext, &globalContext, cleaningUpAfterException);
191 
192  try {
193  convertException::wrap([&]() {
194  runNow<T>(ep,es,&globalContext);
195  });
196  }
197  catch(cms::Exception& ex) {
198  if (ex.context().empty()) {
199  addContextAndPrintException("Calling function GlobalSchedule::processOneGlobal", ex, cleaningUpAfterException);
200  } else {
201  addContextAndPrintException("", ex, cleaningUpAfterException);
202  }
203  throw;
204  }
205  terminationSentry.completedSuccessfully();
206 
207  //If we got here no other exception has happened so we can propogate any Service related exceptions
208  sentry.allowThrow();
209  }
210 
211  template <typename T>
212  void
214  typename T::MyPrincipal& ep,
215  EventSetup const& es,
216  bool cleaningUpAfterException) {
218 
219  //need the doneTask to own the memory
220  auto globalContext = std::make_shared<GlobalContext>(T::makeGlobalContext(ep, processContext_));
221 
222  if(actReg_) {
223  T::preScheduleSignal(actReg_.get(), globalContext.get());
224  }
225 
226 
227  //If we are in an end transition, we need to reset failed items since they might
228  // be set this time around
229  if( not T::begin_) {
230  ep.resetFailedFromThisProcess();
231  }
232 
233  auto doneTask = make_waiting_task(tbb::task::allocate_root(),
234  [this,iHolder, cleaningUpAfterException, globalContext, token](std::exception_ptr const* iPtr) mutable
235  {
236  ServiceRegistry::Operate op(token);
237  std::exception_ptr excpt;
238  if(iPtr) {
239  excpt = *iPtr;
240  //add context information to the exception and print message
241  try {
242  convertException::wrap([&]() {
243  std::rethrow_exception(excpt);
244  });
245  } catch(cms::Exception& ex) {
246  //TODO: should add the transition type info
247  std::ostringstream ost;
248  if(ex.context().empty()) {
249  ost<<"Processing "<<T::transitionName()<<" ";
250  }
251  addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
252  excpt = std::current_exception();
253  }
254  if(actReg_) {
255  actReg_->preGlobalEarlyTerminationSignal_(*globalContext,TerminationOrigin::ExceptionFromThisContext);
256  }
257  }
258  if(actReg_) {
259  try {
260  T::postScheduleSignal(actReg_.get(), globalContext.get());
261  } catch(...) {
262  if(not excpt) {
263  excpt = std::current_exception();
264  }
265  }
266  }
267  iHolder.doneWaiting(excpt);
268 
269  });
270  workerManager_.resetAll();
271 
272  ParentContext parentContext(globalContext.get());
273  //make sure the ProductResolvers know about their
274  // workers to allow proper data dependency handling
275  workerManager_.setupOnDemandSystem(ep,es);
276 
277  //make sure the task doesn't get run until all workers have beens started
278  WaitingTaskHolder holdForLoop(doneTask);
279  for(auto& worker: boost::adaptors::reverse((allWorkers()))) {
280  worker->doWorkAsync<T>(doneTask,ep,es,StreamID::invalidStreamID(),parentContext,globalContext.get());
281  }
282 
283  }
284 
285  template <typename T>
286  void
287  GlobalSchedule::runNow(typename T::MyPrincipal const& p, EventSetup const& es,
288  GlobalContext const* context) {
289  //do nothing for event since we will run when requested
290  for(auto & worker: allWorkers()) {
291  try {
292  ParentContext parentContext(context);
293  worker->doWork<T>(p, es,StreamID::invalidStreamID(), parentContext, context);
294  }
295  catch (cms::Exception & ex) {
296  if(ex.context().empty()) {
297  std::ostringstream ost;
298  ost << "Processing " <<T::transitionName()<<" "<< p.id();
299  ex.addContext(ost.str());
300  }
301  throw;
302  }
303  }
304  }
305 }
306 
307 #endif
void processOneGlobal(typename T::MyPrincipal &principal, EventSetup const &eventSetup, bool cleaningUpAfterException=false)
std::vector< std::string > vstring
edm::propagate_const< WorkerPtr > results_inserter_
roAction_t actions[nactions]
Definition: GenABIO.cc:187
const double w
Definition: UKUtility.cc:23
std::vector< Worker * > AllWorkers
#define noexcept
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
void runNow(typename T::MyPrincipal const &p, EventSetup const &es, GlobalContext const *context)
static StreamID invalidStreamID()
Definition: StreamID.h:48
ServiceToken presentToken() const
void beginJob()
Definition: Breakpoints.cc:15
void processOneGlobalAsync(WaitingTaskHolder holder, typename T::MyPrincipal &principal, EventSetup const &eventSetup, bool cleaningUpAfterException=false)
def principal(options)
void doneWaiting(std::exception_ptr iExcept)
std::vector< Worker * > Workers
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
std::list< std::string > const & context() const
Definition: Exception.cc:191
static ServiceRegistry & instance()
std::shared_ptr< Worker > WorkerPtr
rep
Definition: cuy.py:1188
std::shared_ptr< ActivityRegistry > actReg_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:90
void addContext(std::string const &context)
Definition: Exception.cc:227
SendTerminationSignalIfException(edm::ActivityRegistry *iReg, edm::GlobalContext const *iContext)
ExceptionToActionTable const & actionTable() const
returns the action table
HLT enums.
double a
Definition: hdecay.h:121
auto wrap(F iFunc) -> decltype(iFunc())
ProcessContext const * processContext_
WorkerManager workerManager_
long double T