CMS 3D CMS Logo

GlobalSchedule.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_GlobalSchedule_h
2 #define FWCore_Framework_GlobalSchedule_h
3 
4 /*
5 */
6 
29 
30 #include <map>
31 #include <memory>
32 #include <set>
33 #include <string>
34 #include <vector>
35 #include <sstream>
36 #include "boost/range/adaptor/reversed.hpp"
37 
38 namespace edm {
39 
40  namespace {
41  template <typename T>
42  class GlobalScheduleSignalSentry {
43  public:
44  GlobalScheduleSignalSentry(ActivityRegistry* a, typename T::Context const* context)
45  : a_(a), context_(context), allowThrow_(false) {
46  if (a_)
47  T::preScheduleSignal(a_, context_);
48  }
49  ~GlobalScheduleSignalSentry() noexcept(false) {
50  // Caught exception is rethrown
51  CMS_SA_ALLOW try {
52  if (a_)
53  T::postScheduleSignal(a_, context_);
54  } catch (...) {
55  if (allowThrow_) {
56  throw;
57  }
58  }
59  }
60 
61  void allowThrow() { allowThrow_ = true; }
62 
63  private:
64  // We own none of these resources.
65  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
66  typename T::Context const* context_;
67  bool allowThrow_;
68  };
69  } // namespace
70 
71  class ActivityRegistry;
72  class ExceptionCollector;
73  class ProcessContext;
74  class PreallocationConfiguration;
75  class ModuleRegistry;
76  class TriggerResultInserter;
77  class PathStatusInserter;
78  class EndPathStatusInserter;
79 
81  public:
82  typedef std::vector<std::string> vstring;
83  typedef std::vector<Worker*> AllWorkers;
84  typedef std::shared_ptr<Worker> WorkerPtr;
85  typedef std::vector<Worker*> Workers;
86 
87  GlobalSchedule(std::shared_ptr<TriggerResultInserter> inserter,
88  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
89  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
90  std::shared_ptr<ModuleRegistry> modReg,
91  std::vector<std::string> const& modulesToUse,
92  ParameterSet& proc_pset,
93  ProductRegistry& pregistry,
94  PreallocationConfiguration const& prealloc,
96  std::shared_ptr<ActivityRegistry> areg,
97  std::shared_ptr<ProcessConfiguration> processConfiguration,
98  ProcessContext const* processContext);
99  GlobalSchedule(GlobalSchedule const&) = delete;
100 
101  template <typename T>
103  typename T::TransitionInfoType&,
104  ServiceToken const& token,
105  bool cleaningUpAfterException = false);
106 
108  void endJob(ExceptionCollector& collector);
109 
112 
116  std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
117 
119  bool terminate() const;
120 
122  void replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel);
123 
125  void deleteModule(std::string const& iLabel);
126 
128  AllWorkers const& allWorkers() const { return workerManagers_[0].allWorkers(); }
129 
130  private:
131  //Sentry class to only send a signal if an
132  // exception occurs. An exception is identified
133  // by the destructor being called without first
134  // calling completedSuccessfully().
136  public:
138  : reg_(iReg), context_(iContext) {}
140  if (reg_) {
142  }
143  }
144  void completedSuccessfully() { reg_ = nullptr; }
145 
146  private:
147  edm::ActivityRegistry* reg_; // We do not use propagate_const because the registry itself is mutable.
149  };
150 
152  ExceptionToActionTable const& actionTable() const { return workerManagers_[0].actionTable(); }
153 
154  std::vector<WorkerManager> workerManagers_;
155  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
156  std::vector<edm::propagate_const<WorkerPtr>> extraWorkers_;
158  };
159 
160  template <typename T>
162  typename T::TransitionInfoType& transitionInfo,
163  ServiceToken const& token,
164  bool cleaningUpAfterException) {
165  auto const& principal = transitionInfo.principal();
166 
167  // Caught exception is propagated via WaitingTaskHolder
168  CMS_SA_ALLOW try {
169  //need the doneTask to own the memory
170  auto globalContext = std::make_shared<GlobalContext>(T::makeGlobalContext(principal, processContext_));
171 
172  if (actReg_) {
173  //Services may depend upon each other
175  T::preScheduleSignal(actReg_.get(), globalContext.get());
176  }
177 
178  ServiceWeakToken weakToken = token;
179  auto doneTask = make_waiting_task(
180  [this, iHolder, cleaningUpAfterException, globalContext, weakToken](std::exception_ptr const* iPtr) mutable {
181  std::exception_ptr excpt;
182  if (iPtr) {
183  excpt = *iPtr;
184  //add context information to the exception and print message
185  try {
186  convertException::wrap([&]() { std::rethrow_exception(excpt); });
187  } catch (cms::Exception& ex) {
188  //TODO: should add the transition type info
189  std::ostringstream ost;
190  if (ex.context().empty()) {
191  ost << "Processing " << T::transitionName() << " ";
192  }
193  ServiceRegistry::Operate op(weakToken.lock());
194  addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
195  excpt = std::current_exception();
196  }
197  if (actReg_) {
198  ServiceRegistry::Operate op(weakToken.lock());
199  actReg_->preGlobalEarlyTerminationSignal_(*globalContext, TerminationOrigin::ExceptionFromThisContext);
200  }
201  }
202  if (actReg_) {
203  // Caught exception is propagated via WaitingTaskHolder
204  CMS_SA_ALLOW try {
205  ServiceRegistry::Operate op(weakToken.lock());
206  T::postScheduleSignal(actReg_.get(), globalContext.get());
207  } catch (...) {
208  if (not excpt) {
209  excpt = std::current_exception();
210  }
211  }
212  }
213  iHolder.doneWaiting(excpt);
214  });
215  WorkerManager& workerManager = workerManagers_[principal.index()];
216  workerManager.resetAll();
217 
218  ParentContext parentContext(globalContext.get());
219  //make sure the ProductResolvers know about their
220  // workers to allow proper data dependency handling
221  workerManager.setupResolvers(transitionInfo.principal());
222 
223  //make sure the task doesn't get run until all workers have beens started
224  WaitingTaskHolder holdForLoop(*iHolder.group(), doneTask);
225  auto& aw = workerManager.allWorkers();
226  for (Worker* worker : boost::adaptors::reverse(aw)) {
227  worker->doWorkAsync<T>(
228  holdForLoop, transitionInfo, token, StreamID::invalidStreamID(), parentContext, globalContext.get());
229  }
230  } catch (...) {
231  iHolder.doneWaiting(std::current_exception());
232  }
233  }
234 } // namespace edm
235 
236 #endif
edm::GlobalSchedule::allWorkers
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
Definition: GlobalSchedule.h:128
WorkerManager.h
edm::TerminationOrigin::ExceptionFromThisContext
edm::eventsetup::ESRecordsToProxyIndices
Definition: ESRecordsToProxyIndices.h:35
funct::false
false
Definition: Factorize.h:29
edm::GlobalSchedule::SendTerminationSignalIfException::completedSuccessfully
void completedSuccessfully()
Definition: GlobalSchedule.h:144
edm::ServiceWeakToken
Definition: ServiceToken.h:86
BranchType.h
ActivityRegistry
propagate_const.h
WaitingTaskHolder.h
edm
HLT enums.
Definition: AlignableModifier.h:19
edm::ProcessContext
Definition: ProcessContext.h:27
edm::GlobalSchedule::terminate
bool terminate() const
Return whether each output module has reached its maximum count.
Algorithms.h
ProcessBlockPrincipal.h
edm::GlobalSchedule::deleteModule
void deleteModule(std::string const &iLabel)
Delete the module with label iLabel.
Definition: GlobalSchedule.cc:116
edm::maker::ModuleHolder
Definition: ModuleHolder.h:37
edm::GlobalSchedule::vstring
std::vector< std::string > vstring
Definition: GlobalSchedule.h:82
edm::GlobalSchedule::Workers
std::vector< Worker * > Workers
Definition: GlobalSchedule.h:85
edm::WaitingTaskHolder::doneWaiting
void doneWaiting(std::exception_ptr iExcept)
Definition: WaitingTaskHolder.h:93
edm::GlobalSchedule::SendTerminationSignalIfException::reg_
edm::ActivityRegistry * reg_
Definition: GlobalSchedule.h:147
edm::GlobalSchedule::beginJob
void beginJob(ProductRegistry const &, eventsetup::ESRecordsToProxyIndices const &)
Definition: GlobalSchedule.cc:93
CMS_SA_ALLOW
#define CMS_SA_ALLOW
Definition: thread_safety_macros.h:5
groupFilesInBlocks.reverse
reverse
Definition: groupFilesInBlocks.py:131
edm::ProductRegistry
Definition: ProductRegistry.h:37
ModuleDescription.h
edm::ServiceToken
Definition: ServiceToken.h:42
edm::propagate_const
Definition: propagate_const.h:32
edm::GlobalSchedule::SendTerminationSignalIfException::~SendTerminationSignalIfException
~SendTerminationSignalIfException()
Definition: GlobalSchedule.h:139
edm::GlobalSchedule::SendTerminationSignalIfException::context_
GlobalContext const * context_
Definition: GlobalSchedule.h:148
edm::GlobalSchedule::actReg_
std::shared_ptr< ActivityRegistry > actReg_
Definition: GlobalSchedule.h:155
edm::GlobalSchedule::getAllModuleDescriptions
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
Definition: GlobalSchedule.cc:122
edm::ExceptionCollector
Definition: ExceptionCollector.h:33
EventPrincipal.h
edm::convertException::wrap
auto wrap(F iFunc) -> decltype(iFunc())
Definition: ConvertException.h:19
edm::ActivityRegistry
Definition: ActivityRegistry.h:134
edm::GlobalSchedule
Definition: GlobalSchedule.h:80
ConvertException.h
edm::Worker
Definition: Worker.h:90
edm::GlobalSchedule::processContext_
ProcessContext const * processContext_
Definition: GlobalSchedule.h:157
edm::GlobalSchedule::workerManagers_
std::vector< WorkerManager > workerManagers_
Definition: GlobalSchedule.h:154
edm::ParentContext
Definition: ParentContext.h:27
AlCaHLTBitMon_QueryRunRegistry.string
string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
edm::GlobalSchedule::endJob
void endJob(ExceptionCollector &collector)
Definition: GlobalSchedule.cc:91
edm::GlobalContext
Definition: GlobalContext.h:29
edm::ParameterSet
Definition: ParameterSet.h:47
a
double a
Definition: hdecay.h:119
GlobalContext.h
cms::Exception::context
std::list< std::string > const & context() const
Definition: Exception.cc:147
edm::make_waiting_task
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:101
edm::WaitingTaskHolder
Definition: WaitingTaskHolder.h:32
ExceptionMessages.h
thread_safety_macros.h
RunPrincipal.h
edm::StreamID::invalidStreamID
static StreamID invalidStreamID()
Definition: StreamID.h:45
edm::GlobalSchedule::actionTable
ExceptionToActionTable const & actionTable() const
returns the action table
Definition: GlobalSchedule.h:152
edm::addContextAndPrintException
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
Definition: ExceptionHelpers.cc:11
trackerHitRTTI::vector
Definition: trackerHitRTTI.h:21
ExceptionActions.h
edm::GlobalSchedule::extraWorkers_
std::vector< edm::propagate_const< WorkerPtr > > extraWorkers_
Definition: GlobalSchedule.h:156
edm::GlobalSchedule::processOneGlobalAsync
void processOneGlobalAsync(WaitingTaskHolder holder, typename T::TransitionInfoType &, ServiceToken const &token, bool cleaningUpAfterException=false)
Definition: GlobalSchedule.h:161
edm::ServiceWeakToken::lock
ServiceToken lock() const
Definition: ServiceToken.h:101
edm::ExceptionToActionTable
Definition: ExceptionActions.h:16
LuminosityBlockPrincipal.h
edm::GlobalSchedule::GlobalSchedule
GlobalSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry > modReg, std::vector< std::string > const &modulesToUse, ParameterSet &proc_pset, ProductRegistry &pregistry, PreallocationConfiguration const &prealloc, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, ProcessContext const *processContext)
Definition: GlobalSchedule.cc:22
OccurrenceTraits.h
edm::PreallocationConfiguration
Definition: PreallocationConfiguration.h:27
edm::GlobalSchedule::SendTerminationSignalIfException::SendTerminationSignalIfException
SendTerminationSignalIfException(edm::ActivityRegistry *iReg, edm::GlobalContext const *iContext)
Definition: GlobalSchedule.h:137
edm::GlobalSchedule::WorkerPtr
std::shared_ptr< Worker > WorkerPtr
Definition: GlobalSchedule.h:84
ExceptionHelpers.h
Frameworkfwd.h
T
long double T
Definition: Basic3DVectorLD.h:48
Worker.h
WorkerRegistry.h
Exception.h
edm::GlobalSchedule::SendTerminationSignalIfException
Definition: GlobalSchedule.h:135
edm::GlobalSchedule::replaceModule
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
Definition: GlobalSchedule.cc:98
actions
roAction_t actions[nactions]
Definition: GenABIO.cc:181
cms::Exception
Definition: Exception.h:70
StreamID.h
edm::ServiceRegistry::Operate
Definition: ServiceRegistry.h:40
edm::WorkerManager
Definition: WorkerManager.h:35
edm::WaitingTaskHolder::group
tbb::task_group * group() const noexcept
Definition: WaitingTaskHolder.h:77
edm::GlobalSchedule::AllWorkers
std::vector< Worker * > AllWorkers
Definition: GlobalSchedule.h:83
unpackBuffers-CaloStage2.token
token
Definition: unpackBuffers-CaloStage2.py:316
edm::ActivityRegistry::preGlobalEarlyTerminationSignal_
PreGlobalEarlyTermination preGlobalEarlyTerminationSignal_
Definition: ActivityRegistry.h:491