CMS 3D CMS Logo

GlobalSchedule.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_GlobalSchedule_h
2 #define FWCore_Framework_GlobalSchedule_h
3 
4 /*
5 */
6 
24 
25 #include <map>
26 #include <memory>
27 #include <set>
28 #include <string>
29 #include <vector>
30 #include <sstream>
31 #include "boost/range/adaptor/reversed.hpp"
32 
33 
34 namespace edm {
35 
36  namespace {
37  template <typename T>
38  class GlobalScheduleSignalSentry {
39  public:
40  GlobalScheduleSignalSentry(ActivityRegistry* a, typename T::Context const* context) :
41  a_(a), context_(context),
42  allowThrow_(false) {
43  if (a_) T::preScheduleSignal(a_, context_);
44  }
45  ~GlobalScheduleSignalSentry() noexcept(false) {
46  try {
47  if (a_) T::postScheduleSignal(a_, context_);
48  } catch(...) {
49  if(allowThrow_) {throw;}
50  }
51  }
52 
53  void allowThrow() {
54  allowThrow_ = true;
55  }
56 
57  private:
58  // We own none of these resources.
59  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
60  typename T::Context const* context_;
61  bool allowThrow_;
62  };
63  }
64 
65  class ActivityRegistry;
66  class EventSetup;
67  class ExceptionCollector;
68  class ProcessContext;
69  class PreallocationConfiguration;
70  class ModuleRegistry;
71  class TriggerResultInserter;
72  class PathStatusInserter;
73  class EndPathStatusInserter;
74 
76  public:
77  typedef std::vector<std::string> vstring;
78  typedef std::vector<Worker*> AllWorkers;
79  typedef std::shared_ptr<Worker> WorkerPtr;
80  typedef std::vector<Worker*> Workers;
81 
82  GlobalSchedule(std::shared_ptr<TriggerResultInserter> inserter,
83  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
84  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
85  std::shared_ptr<ModuleRegistry> modReg,
86  std::vector<std::string> const& modulesToUse,
87  ParameterSet& proc_pset,
88  ProductRegistry& pregistry,
89  PreallocationConfiguration const& prealloc,
91  std::shared_ptr<ActivityRegistry> areg,
92  std::shared_ptr<ProcessConfiguration> processConfiguration,
93  ProcessContext const* processContext);
94  GlobalSchedule(GlobalSchedule const&) = delete;
95 
96  template <typename T>
97  void processOneGlobalAsync(WaitingTaskHolder holder,
98  typename T::MyPrincipal& principal,
99  EventSetup const& eventSetup,
100  bool cleaningUpAfterException = false);
101 
102  void beginJob(ProductRegistry const&);
103  void endJob(ExceptionCollector & collector);
104 
107 
111  std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
112 
115  void getTriggerReport(TriggerReport& rep) const;
116 
118  bool terminate() const;
119 
121  void replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel);
122 
124  AllWorkers const& allWorkers() const {
125  return workerManager_.allWorkers();
126  }
127 
128  private:
129  //Sentry class to only send a signal if an
130  // exception occurs. An exception is identified
131  // by the destructor being called without first
132  // calling completedSuccessfully().
134  public:
136  reg_(iReg),
137  context_(iContext){}
139  if(reg_) {
140  reg_->preGlobalEarlyTerminationSignal_(*context_,TerminationOrigin::ExceptionFromThisContext);
141  }
142  }
144  reg_ = nullptr;
145  }
146  private:
147  edm::ActivityRegistry* reg_; // We do not use propagate_const because the registry itself is mutable.
149  };
150 
151 
154  return workerManager_.actionTable();
155  }
156 
157  void addToAllWorkers(Worker* w);
158 
160  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
162  std::vector<edm::propagate_const<WorkerPtr>> pathStatusInserterWorkers_;
163  std::vector<edm::propagate_const<WorkerPtr>> endPathStatusInserterWorkers_;
164 
166  };
167 
168 
169  template <typename T>
170  void
172  typename T::MyPrincipal& ep,
173  EventSetup const& es,
174  bool cleaningUpAfterException) {
176 
177  //need the doneTask to own the memory
178  auto globalContext = std::make_shared<GlobalContext>(T::makeGlobalContext(ep, processContext_));
179 
180  if(actReg_) {
181  T::preScheduleSignal(actReg_.get(), globalContext.get());
182  }
183 
184 
185  //If we are in an end transition, we need to reset failed items since they might
186  // be set this time around
187  if( not T::begin_) {
188  ep.resetFailedFromThisProcess();
189  }
190 
191  auto doneTask = make_waiting_task(tbb::task::allocate_root(),
192  [this,iHolder, cleaningUpAfterException, globalContext, token](std::exception_ptr const* iPtr) mutable
193  {
194  ServiceRegistry::Operate op(token);
195  std::exception_ptr excpt;
196  if(iPtr) {
197  excpt = *iPtr;
198  //add context information to the exception and print message
199  try {
200  convertException::wrap([&]() {
201  std::rethrow_exception(excpt);
202  });
203  } catch(cms::Exception& ex) {
204  //TODO: should add the transition type info
205  std::ostringstream ost;
206  if(ex.context().empty()) {
207  ost<<"Processing "<<T::transitionName()<<" ";
208  }
209  addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
210  excpt = std::current_exception();
211  }
212  if(actReg_) {
213  actReg_->preGlobalEarlyTerminationSignal_(*globalContext,TerminationOrigin::ExceptionFromThisContext);
214  }
215  }
216  if(actReg_) {
217  try {
218  T::postScheduleSignal(actReg_.get(), globalContext.get());
219  } catch(...) {
220  if(not excpt) {
221  excpt = std::current_exception();
222  }
223  }
224  }
225  iHolder.doneWaiting(excpt);
226 
227  });
228  workerManager_.resetAll();
229 
230  ParentContext parentContext(globalContext.get());
231  //make sure the ProductResolvers know about their
232  // workers to allow proper data dependency handling
233  workerManager_.setupOnDemandSystem(ep,es);
234 
235  //make sure the task doesn't get run until all workers have beens started
236  WaitingTaskHolder holdForLoop(doneTask);
237  for(auto& worker: boost::adaptors::reverse((allWorkers()))) {
238  worker->doWorkAsync<T>(doneTask,ep,es,StreamID::invalidStreamID(),parentContext,globalContext.get());
239  }
240 
241  }
242 
243 }
244 
245 #endif
std::vector< std::string > vstring
edm::propagate_const< WorkerPtr > results_inserter_
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
roAction_t actions[nactions]
Definition: GenABIO.cc:187
const double w
Definition: UKUtility.cc:23
std::vector< Worker * > AllWorkers
#define noexcept
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
static StreamID invalidStreamID()
Definition: StreamID.h:48
ServiceToken presentToken() const
void beginJob()
Definition: Breakpoints.cc:15
void processOneGlobalAsync(WaitingTaskHolder holder, typename T::MyPrincipal &principal, EventSetup const &eventSetup, bool cleaningUpAfterException=false)
def principal(options)
void doneWaiting(std::exception_ptr iExcept)
std::vector< Worker * > Workers
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
std::list< std::string > const & context() const
Definition: Exception.cc:191
static ServiceRegistry & instance()
std::shared_ptr< Worker > WorkerPtr
rep
Definition: cuy.py:1188
std::shared_ptr< ActivityRegistry > actReg_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:90
SendTerminationSignalIfException(edm::ActivityRegistry *iReg, edm::GlobalContext const *iContext)
ExceptionToActionTable const & actionTable() const
returns the action table
HLT enums.
double a
Definition: hdecay.h:121
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
auto wrap(F iFunc) -> decltype(iFunc())
ProcessContext const * processContext_
WorkerManager workerManager_
long double T