CMS 3D CMS Logo

GlobalSchedule.cc
Go to the documentation of this file.
7 
15 
16 #include <algorithm>
17 #include <cassert>
18 #include <cstdlib>
19 #include <functional>
20 #include <map>
21 #include <sstream>
22 
23 namespace edm {
25  std::shared_ptr<TriggerResultInserter> inserter,
26  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
27  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
28  std::shared_ptr<ModuleRegistry> modReg,
29  std::vector<std::string> const& iModulesToUse,
30  ParameterSet& proc_pset,
31  ProductRegistry& pregistry,
32  PreallocationConfiguration const& prealloc,
34  std::shared_ptr<ActivityRegistry> areg,
35  std::shared_ptr<ProcessConfiguration const> processConfiguration,
36  ProcessContext const* processContext)
37  : actReg_(areg),
38  processContext_(processContext),
39  numberOfConcurrentLumis_(prealloc.numberOfLuminosityBlocks()),
40  numberOfConcurrentRuns_(prealloc.numberOfRuns()) {
41  unsigned int nManagers = prealloc.numberOfLuminosityBlocks() + prealloc.numberOfRuns() +
43  workerManagers_.reserve(nManagers);
44  for (unsigned int i = 0; i < nManagers; ++i) {
45  workerManagers_.emplace_back(modReg, areg, actions);
46  }
47  for (auto const& moduleLabel : iModulesToUse) {
48  bool isTracked;
49  ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
50  if (modpset != nullptr) { // It will be null for PathStatusInserters, it should
51  // be impossible to be null for anything else
52  assert(isTracked);
53 
54  //side effect keeps this module around
55  for (auto& wm : workerManagers_) {
56  wm.addToAllWorkers(wm.getWorker(*modpset, pregistry, &prealloc, processConfiguration, moduleLabel));
57  }
58  }
59  }
60  if (inserter) {
61  inserter->doPreallocate(prealloc);
62  for (auto& wm : workerManagers_) {
64  inserter, inserter->moduleDescription(), &actions)); // propagate_const<T> has no reset() function
65  results_inserter->setActivityRegistry(actReg_);
66  wm.addToAllWorkers(results_inserter.get());
67  extraWorkers_.emplace_back(std::move(results_inserter));
68  }
69  }
70 
71  for (auto& pathStatusInserter : pathStatusInserters) {
72  std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
73  inserterPtr->doPreallocate(prealloc);
74 
75  for (auto& wm : workerManagers_) {
76  WorkerPtr workerPtr(
77  new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
78  workerPtr->setActivityRegistry(actReg_);
79  wm.addToAllWorkers(workerPtr.get());
80  extraWorkers_.emplace_back(std::move(workerPtr));
81  }
82  }
83 
84  for (auto& endPathStatusInserter : endPathStatusInserters) {
85  std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
86  inserterPtr->doPreallocate(prealloc);
87  for (auto& wm : workerManagers_) {
89  inserterPtr, inserterPtr->moduleDescription(), &actions));
90  workerPtr->setActivityRegistry(actReg_);
91  wm.addToAllWorkers(workerPtr.get());
92  extraWorkers_.emplace_back(std::move(workerPtr));
93  }
94  }
95 
96  } // GlobalSchedule::GlobalSchedule
97 
100  ProcessBlockHelperBase const& processBlockHelperBase,
101  PathsAndConsumesOfModulesBase const& pathsAndConsumesOfModules,
102  ProcessContext const& processContext) {
104  unsigned int const managerIndex =
106 
107  std::exception_ptr exceptionPtr;
108  CMS_SA_ALLOW try {
109  try {
110  convertException::wrap([this, &pathsAndConsumesOfModules, &processContext]() {
111  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules, processContext);
112  });
113  } catch (cms::Exception& ex) {
114  exceptionContext(ex, globalContext, "Handling pre signal, likely in a service function");
115  throw;
116  }
117  workerManagers_[managerIndex].beginJob(iRegistry, iESIndices, processBlockHelperBase, globalContext);
118  } catch (...) {
119  exceptionPtr = std::current_exception();
120  }
121 
122  try {
123  convertException::wrap([this]() { actReg_->postBeginJobSignal_(); });
124  } catch (cms::Exception& ex) {
125  if (!exceptionPtr) {
126  exceptionContext(ex, globalContext, "Handling post signal, likely in a service function");
127  exceptionPtr = std::current_exception();
128  }
129  }
130  if (exceptionPtr) {
131  std::rethrow_exception(exceptionPtr);
132  }
133  }
134 
137  unsigned int const managerIndex =
139 
140  std::exception_ptr exceptionPtr;
141  CMS_SA_ALLOW try {
142  try {
143  convertException::wrap([this]() { actReg_->preEndJobSignal_(); });
144  } catch (cms::Exception& ex) {
145  exceptionContext(ex, globalContext, "Handling pre signal, likely in a service function");
146  throw;
147  }
148  workerManagers_[managerIndex].endJob(collector, globalContext);
149  } catch (...) {
150  exceptionPtr = std::current_exception();
151  }
152 
153  try {
154  convertException::wrap([this]() { actReg_->postEndJobSignal_(); });
155  } catch (cms::Exception& ex) {
156  if (!exceptionPtr) {
157  exceptionContext(ex, globalContext, "Handling post signal, likely in a service function");
158  exceptionPtr = std::current_exception();
159  }
160  }
161  if (exceptionPtr) {
162  collector.call([&exceptionPtr]() { std::rethrow_exception(exceptionPtr); });
163  }
164  }
165 
167  Worker* found = nullptr;
168  unsigned int const jobManagerIndex =
170  unsigned int managerIndex = 0;
171  for (auto& wm : workerManagers_) {
172  for (auto const& worker : wm.allWorkers()) {
173  if (worker->description()->moduleLabel() == iLabel) {
174  found = worker;
175  break;
176  }
177  }
178  if (nullptr == found) {
179  return;
180  }
181 
182  iMod->replaceModuleFor(found);
183  if (managerIndex == jobManagerIndex) {
185  found->beginJob(globalContext);
186  }
187  ++managerIndex;
188  }
189  }
190 
192  for (auto& wm : workerManagers_) {
193  wm.deleteModuleIfExists(iLabel);
194  }
195  }
196 
197  std::vector<ModuleDescription const*> GlobalSchedule::getAllModuleDescriptions() const {
198  std::vector<ModuleDescription const*> result;
199  result.reserve(allWorkers().size());
200 
201  for (auto const& worker : allWorkers()) {
202  ModuleDescription const* p = worker->description();
203  result.push_back(p);
204  }
205  return result;
206  }
207 
209  ServiceWeakToken const& weakToken,
210  bool cleaningUpAfterException,
211  std::exception_ptr& excpt) {
212  //add context information to the exception and print message
213  try {
214  convertException::wrap([&excpt]() { std::rethrow_exception(excpt); });
215  } catch (cms::Exception& ex) {
216  std::ostringstream ost;
217  // In most cases the exception will already have context at this point,
218  // but add some context here in those rare cases where it does not.
219  if (ex.context().empty()) {
220  exceptionContext(ost, *globalContext);
221  }
222  ServiceRegistry::Operate op(weakToken.lock());
223  addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
224  excpt = std::current_exception();
225  }
226  // We are already handling an earlier exception, so ignore it
227  // if this signal results in another exception being thrown.
228  CMS_SA_ALLOW try {
229  if (actReg_) {
230  ServiceRegistry::Operate op(weakToken.lock());
231  actReg_->preGlobalEarlyTerminationSignal_(*globalContext, TerminationOrigin::ExceptionFromThisContext);
232  }
233  } catch (...) {
234  }
235  }
236 
237 } // namespace edm
#define CMS_SA_ALLOW
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
std::vector< WorkerManager > workerManagers_
roAction_t actions[nactions]
Definition: GenABIO.cc:181
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
virtual void replaceModuleFor(Worker *) const =0
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
void deleteModule(std::string const &iLabel)
Delete the module with label iLabel.
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
assert(be >=bs)
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
GlobalSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry > modReg, std::vector< std::string > const &modulesToUse, ParameterSet &proc_pset, ProductRegistry &pregistry, PreallocationConfiguration const &prealloc, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, ProcessContext const *processContext)
std::shared_ptr< Worker > WorkerPtr
void endJob(ExceptionCollector &collector)
ServiceToken lock() const
Definition: ServiceToken.h:101
std::shared_ptr< ActivityRegistry > actReg_
void beginJob(ProductRegistry const &, eventsetup::ESRecordsToProductResolverIndices const &, ProcessBlockHelperBase const &, PathsAndConsumesOfModulesBase const &, ProcessContext const &)
unsigned int numberOfConcurrentRuns_
constexpr T & get_underlying(propagate_const< T > &)
void handleException(GlobalContext const *, ServiceWeakToken const &, bool cleaningUpAfterException, std::exception_ptr &)
HLT enums.
static constexpr unsigned int numberOfConcurrentProcessBlocks_
auto wrap(F iFunc) -> decltype(iFunc())
ProcessContext const * processContext_
void call(std::function< void(void)>)
std::list< std::string > const & context() const
Definition: Exception.cc:151
unsigned int numberOfConcurrentLumis_
def move(src, dest)
Definition: eostools.py:511
static constexpr unsigned int numberOfConcurrentJobs_
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
std::vector< edm::propagate_const< WorkerPtr > > extraWorkers_