CMS 3D CMS Logo

List of all members | Public Types | Public Member Functions | Private Member Functions | Private Attributes
edm::GlobalSchedule Class Reference

#include <GlobalSchedule.h>

Public Types

typedef std::vector< Worker * > AllWorkers
 
typedef std::vector< std::string > vstring
 
typedef std::shared_ptr< WorkerWorkerPtr
 
typedef std::vector< Worker * > Workers
 

Public Member Functions

AllWorkers const & allWorkers () const
 returns the collection of pointers to workers More...
 
void beginJob (ProductRegistry const &, eventsetup::ESRecordsToProductResolverIndices const &, ProcessBlockHelperBase const &)
 
void deleteModule (std::string const &iLabel)
 Delete the module with label iLabel. More...
 
void endJob (ExceptionCollector &collector)
 
std::vector< ModuleDescription const * > getAllModuleDescriptions () const
 
 GlobalSchedule (std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry > modReg, std::vector< std::string > const &modulesToUse, ParameterSet &proc_pset, ProductRegistry &pregistry, PreallocationConfiguration const &prealloc, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, ProcessContext const *processContext)
 
 GlobalSchedule (GlobalSchedule const &)=delete
 
template<typename T >
void processOneGlobalAsync (WaitingTaskHolder holder, typename T::TransitionInfoType &, ServiceToken const &token, bool cleaningUpAfterException=false)
 
void replaceModule (maker::ModuleHolder *iMod, std::string const &iLabel)
 clone the type of module with label iLabel but configure with iPSet. More...
 
bool terminate () const
 Return whether each output module has reached its maximum count. More...
 

Private Member Functions

ExceptionToActionTable const & actionTable () const
 returns the action table More...
 
void handleException (GlobalContext const *, ServiceWeakToken const &, bool cleaningUpAfterException, std::exception_ptr &)
 
template<typename T >
void postScheduleSignal (GlobalContext const *, ServiceWeakToken const &, std::exception_ptr &)
 
template<typename T >
void preScheduleSignal (GlobalContext const *, ServiceToken const &)
 

Private Attributes

std::shared_ptr< ActivityRegistryactReg_
 
std::vector< edm::propagate_const< WorkerPtr > > extraWorkers_
 
unsigned int numberOfConcurrentLumis_
 
ProcessContext const * processContext_
 
std::vector< WorkerManagerworkerManagers_
 

Detailed Description

Definition at line 50 of file GlobalSchedule.h.

Member Typedef Documentation

◆ AllWorkers

typedef std::vector<Worker*> edm::GlobalSchedule::AllWorkers

Definition at line 53 of file GlobalSchedule.h.

◆ vstring

typedef std::vector<std::string> edm::GlobalSchedule::vstring

Definition at line 52 of file GlobalSchedule.h.

◆ WorkerPtr

typedef std::shared_ptr<Worker> edm::GlobalSchedule::WorkerPtr

Definition at line 54 of file GlobalSchedule.h.

◆ Workers

typedef std::vector<Worker*> edm::GlobalSchedule::Workers

Definition at line 55 of file GlobalSchedule.h.

Constructor & Destructor Documentation

◆ GlobalSchedule() [1/2]

edm::GlobalSchedule::GlobalSchedule ( std::shared_ptr< TriggerResultInserter inserter,
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters,
std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &  endPathStatusInserters,
std::shared_ptr< ModuleRegistry modReg,
std::vector< std::string > const &  modulesToUse,
ParameterSet proc_pset,
ProductRegistry pregistry,
PreallocationConfiguration const &  prealloc,
ExceptionToActionTable const &  actions,
std::shared_ptr< ActivityRegistry areg,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
ProcessContext const *  processContext 
)

Definition at line 22 of file GlobalSchedule.cc.

References actions, actReg_, cms::cuda::assert(), extraWorkers_, edm::get_underlying(), edm::ParameterSet::getPSetForUpdate(), mps_fire::i, HerwigMaxPtPartonFilter_cfi::moduleLabel, eostools::move(), edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfRuns(), and workerManagers_.

35  : actReg_(areg), processContext_(processContext), numberOfConcurrentLumis_(prealloc.numberOfLuminosityBlocks()) {
36  unsigned int nManagers = prealloc.numberOfLuminosityBlocks() + prealloc.numberOfRuns();
37  workerManagers_.reserve(nManagers);
38  for (unsigned int i = 0; i < nManagers; ++i) {
39  workerManagers_.emplace_back(modReg, areg, actions);
40  }
41  for (auto const& moduleLabel : iModulesToUse) {
42  bool isTracked;
43  ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
44  if (modpset != nullptr) { // It will be null for PathStatusInserters, it should
45  // be impossible to be null for anything else
46  assert(isTracked);
47 
48  //side effect keeps this module around
49  for (auto& wm : workerManagers_) {
50  wm.addToAllWorkers(wm.getWorker(*modpset, pregistry, &prealloc, processConfiguration, moduleLabel));
51  }
52  }
53  }
54  if (inserter) {
55  inserter->doPreallocate(prealloc);
56  for (auto& wm : workerManagers_) {
58  inserter, inserter->moduleDescription(), &actions)); // propagate_const<T> has no reset() function
59  results_inserter->setActivityRegistry(actReg_);
60  wm.addToAllWorkers(results_inserter.get());
61  extraWorkers_.emplace_back(std::move(results_inserter));
62  }
63  }
64 
65  for (auto& pathStatusInserter : pathStatusInserters) {
66  std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
67  inserterPtr->doPreallocate(prealloc);
68 
69  for (auto& wm : workerManagers_) {
70  WorkerPtr workerPtr(
71  new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
72  workerPtr->setActivityRegistry(actReg_);
73  wm.addToAllWorkers(workerPtr.get());
74  extraWorkers_.emplace_back(std::move(workerPtr));
75  }
76  }
77 
78  for (auto& endPathStatusInserter : endPathStatusInserters) {
79  std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
80  inserterPtr->doPreallocate(prealloc);
81  for (auto& wm : workerManagers_) {
83  inserterPtr, inserterPtr->moduleDescription(), &actions));
84  workerPtr->setActivityRegistry(actReg_);
85  wm.addToAllWorkers(workerPtr.get());
86  extraWorkers_.emplace_back(std::move(workerPtr));
87  }
88  }
89 
90  } // GlobalSchedule::GlobalSchedule
std::vector< WorkerManager > workerManagers_
roAction_t actions[nactions]
Definition: GenABIO.cc:181
assert(be >=bs)
std::shared_ptr< Worker > WorkerPtr
std::shared_ptr< ActivityRegistry > actReg_
constexpr T & get_underlying(propagate_const< T > &)
ProcessContext const * processContext_
unsigned int numberOfConcurrentLumis_
def move(src, dest)
Definition: eostools.py:511
std::vector< edm::propagate_const< WorkerPtr > > extraWorkers_

◆ GlobalSchedule() [2/2]

edm::GlobalSchedule::GlobalSchedule ( GlobalSchedule const &  )
delete

Member Function Documentation

◆ actionTable()

ExceptionToActionTable const& edm::GlobalSchedule::actionTable ( ) const
inlineprivate

returns the action table

Definition at line 104 of file GlobalSchedule.h.

References workerManagers_.

104 { return workerManagers_[0].actionTable(); }
std::vector< WorkerManager > workerManagers_

◆ allWorkers()

AllWorkers const& edm::GlobalSchedule::allWorkers ( ) const
inline

returns the collection of pointers to workers

Definition at line 100 of file GlobalSchedule.h.

References workerManagers_.

Referenced by getAllModuleDescriptions().

100 { return workerManagers_[0].allWorkers(); }
std::vector< WorkerManager > workerManagers_

◆ beginJob()

void edm::GlobalSchedule::beginJob ( ProductRegistry const &  iRegistry,
eventsetup::ESRecordsToProductResolverIndices const &  iESIndices,
ProcessBlockHelperBase const &  processBlockHelperBase 
)

Definition at line 94 of file GlobalSchedule.cc.

References workerManagers_.

96  {
97  workerManagers_[0].beginJob(iRegistry, iESIndices, processBlockHelperBase);
98  }
std::vector< WorkerManager > workerManagers_

◆ deleteModule()

void edm::GlobalSchedule::deleteModule ( std::string const &  iLabel)

Delete the module with label iLabel.

Definition at line 118 of file GlobalSchedule.cc.

References workerManagers_.

118  {
119  for (auto& wm : workerManagers_) {
120  wm.deleteModuleIfExists(iLabel);
121  }
122  }
std::vector< WorkerManager > workerManagers_

◆ endJob()

void edm::GlobalSchedule::endJob ( ExceptionCollector collector)

Definition at line 92 of file GlobalSchedule.cc.

References workerManagers_.

92 { workerManagers_[0].endJob(collector); }
std::vector< WorkerManager > workerManagers_

◆ getAllModuleDescriptions()

std::vector< ModuleDescription const * > edm::GlobalSchedule::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this GlobalSchedule. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 124 of file GlobalSchedule.cc.

References allWorkers(), AlCaHLTBitMon_ParallelJobs::p, and mps_fire::result.

124  {
125  std::vector<ModuleDescription const*> result;
126  result.reserve(allWorkers().size());
127 
128  for (auto const& worker : allWorkers()) {
129  ModuleDescription const* p = worker->description();
130  result.push_back(p);
131  }
132  return result;
133  }
size
Write out results.
AllWorkers const & allWorkers() const
returns the collection of pointers to workers

◆ handleException()

void edm::GlobalSchedule::handleException ( GlobalContext const *  globalContext,
ServiceWeakToken const &  weakToken,
bool  cleaningUpAfterException,
std::exception_ptr &  excpt 
)
private

Definition at line 135 of file GlobalSchedule.cc.

References actReg_, edm::addContextAndPrintException(), CMS_SA_ALLOW, cms::Exception::context(), edm::exceptionContext(), edm::ExceptionFromThisContext, edm::ServiceWeakToken::lock(), findAndChange::op, and edm::convertException::wrap().

Referenced by processOneGlobalAsync().

138  {
139  //add context information to the exception and print message
140  try {
141  convertException::wrap([&excpt]() { std::rethrow_exception(excpt); });
142  } catch (cms::Exception& ex) {
143  std::ostringstream ost;
144  // In most cases the exception will already have context at this point,
145  // but add some context here in those rare cases where it does not.
146  if (ex.context().empty()) {
147  exceptionContext(ost, *globalContext);
148  }
149  ServiceRegistry::Operate op(weakToken.lock());
150  addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
151  excpt = std::current_exception();
152  }
153  // We are already handling an earlier exception, so ignore it
154  // if this signal results in another exception being thrown.
155  CMS_SA_ALLOW try {
156  if (actReg_) {
157  ServiceRegistry::Operate op(weakToken.lock());
158  actReg_->preGlobalEarlyTerminationSignal_(*globalContext, TerminationOrigin::ExceptionFromThisContext);
159  }
160  } catch (...) {
161  }
162  }
#define CMS_SA_ALLOW
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
std::shared_ptr< ActivityRegistry > actReg_
auto wrap(F iFunc) -> decltype(iFunc())
std::list< std::string > const & context() const
Definition: Exception.cc:151

◆ postScheduleSignal()

template<typename T >
void edm::GlobalSchedule::postScheduleSignal ( GlobalContext const *  globalContext,
ServiceWeakToken const &  weakToken,
std::exception_ptr &  excpt 
)
private

Definition at line 197 of file GlobalSchedule.h.

References actReg_, cms::Exception::addContext(), edm::exceptionContext(), edm::ServiceWeakToken::lock(), findAndChange::op, and edm::convertException::wrap().

199  {
200  if (actReg_) {
201  try {
202  convertException::wrap([this, &weakToken, globalContext]() {
203  ServiceRegistry::Operate op(weakToken.lock());
204  T::postScheduleSignal(actReg_.get(), globalContext);
205  });
206  } catch (cms::Exception& ex) {
207  if (not excpt) {
208  std::ostringstream ost;
209  ex.addContext("Handling post signal, likely in a service function");
210  exceptionContext(ost, *globalContext);
211  ex.addContext(ost.str());
212  excpt = std::current_exception();
213  }
214  }
215  }
216  }
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
std::shared_ptr< ActivityRegistry > actReg_
void addContext(std::string const &context)
Definition: Exception.cc:169
auto wrap(F iFunc) -> decltype(iFunc())

◆ preScheduleSignal()

template<typename T >
void edm::GlobalSchedule::preScheduleSignal ( GlobalContext const *  globalContext,
ServiceToken const &  token 
)
private

Definition at line 181 of file GlobalSchedule.h.

References actReg_, cms::Exception::addContext(), edm::exceptionContext(), findAndChange::op, unpackBuffers-CaloStage2::token, and edm::convertException::wrap().

181  {
182  if (actReg_) {
183  try {
185  convertException::wrap([this, globalContext]() { T::preScheduleSignal(actReg_.get(), globalContext); });
186  } catch (cms::Exception& ex) {
187  std::ostringstream ost;
188  ex.addContext("Handling pre signal, likely in a service function");
189  exceptionContext(ost, *globalContext);
190  ex.addContext(ost.str());
191  throw;
192  }
193  }
194  }
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
std::shared_ptr< ActivityRegistry > actReg_
void addContext(std::string const &context)
Definition: Exception.cc:169
auto wrap(F iFunc) -> decltype(iFunc())

◆ processOneGlobalAsync()

template<typename T >
void edm::GlobalSchedule::processOneGlobalAsync ( WaitingTaskHolder  holder,
typename T::TransitionInfoType &  transitionInfo,
ServiceToken const &  token,
bool  cleaningUpAfterException = false 
)

Definition at line 125 of file GlobalSchedule.h.

References edm::WorkerManager::allWorkers(), CMS_SA_ALLOW, ALPAKA_ACCELERATOR_NAMESPACE::brokenline::constexpr(), edm::WaitingTaskHolder::doneWaiting(), edm::WaitingTaskHolder::group(), handleException(), edm::InRun, edm::StreamID::invalidStreamID(), edm::make_waiting_task(), numberOfConcurrentLumis_, processContext_, edm::WorkerManager::resetAll(), groupFilesInBlocks::reverse, edm::WorkerManager::setupResolvers(), unpackBuffers-CaloStage2::token, and workerManagers_.

128  {
129  auto const& principal = transitionInfo.principal();
130 
131  // Caught exception is propagated via WaitingTaskHolder
132  CMS_SA_ALLOW try {
133  //need the doneTask to own the memory
134  auto globalContext = std::make_shared<GlobalContext>(T::makeGlobalContext(principal, processContext_));
135 
136  ServiceWeakToken weakToken = token;
137  auto doneTask = make_waiting_task(
138  [this, iHolder, cleaningUpAfterException, globalContext, weakToken](std::exception_ptr const* iPtr) mutable {
139  std::exception_ptr excpt;
140  if (iPtr) {
141  excpt = *iPtr;
142  // add context information to the exception and print message
143  handleException(globalContext.get(), weakToken, cleaningUpAfterException, excpt);
144  }
145  postScheduleSignal<T>(globalContext.get(), weakToken, excpt);
146  iHolder.doneWaiting(excpt);
147  });
148 
149  //make sure the task doesn't get run until all workers have beens started
150  WaitingTaskHolder holdForLoop(*iHolder.group(), doneTask);
151 
152  CMS_SA_ALLOW try {
153  preScheduleSignal<T>(globalContext.get(), token);
154 
155  unsigned int managerIndex = principal.index();
156  if constexpr (T::branchType_ == InRun) {
157  managerIndex += numberOfConcurrentLumis_;
158  }
159  WorkerManager& workerManager = workerManagers_[managerIndex];
160  workerManager.resetAll();
161 
162  ParentContext parentContext(globalContext.get());
163  // make sure the ProductResolvers know about their
164  // workers to allow proper data dependency handling
165  workerManager.setupResolvers(transitionInfo.principal());
166 
167  auto& aw = workerManager.allWorkers();
168  for (Worker* worker : boost::adaptors::reverse(aw)) {
169  worker->doWorkAsync<T>(
170  holdForLoop, transitionInfo, token, StreamID::invalidStreamID(), parentContext, globalContext.get());
171  }
172  } catch (...) {
173  holdForLoop.doneWaiting(std::current_exception());
174  }
175  } catch (...) {
176  iHolder.doneWaiting(std::current_exception());
177  }
178  }
#define CMS_SA_ALLOW
std::vector< WorkerManager > workerManagers_
static StreamID invalidStreamID()
Definition: StreamID.h:45
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
void handleException(GlobalContext const *, ServiceWeakToken const &, bool cleaningUpAfterException, std::exception_ptr &)
ProcessContext const * processContext_
long double T
unsigned int numberOfConcurrentLumis_

◆ replaceModule()

void edm::GlobalSchedule::replaceModule ( maker::ModuleHolder iMod,
std::string const &  iLabel 
)

clone the type of module with label iLabel but configure with iPSet.

Definition at line 100 of file GlobalSchedule.cc.

References newFWLiteAna::found, edm::maker::ModuleHolder::replaceModuleFor(), and workerManagers_.

100  {
101  Worker* found = nullptr;
102  for (auto& wm : workerManagers_) {
103  for (auto const& worker : wm.allWorkers()) {
104  if (worker->description()->moduleLabel() == iLabel) {
105  found = worker;
106  break;
107  }
108  }
109  if (nullptr == found) {
110  return;
111  }
112 
113  iMod->replaceModuleFor(found);
114  found->beginJob();
115  }
116  }
std::vector< WorkerManager > workerManagers_

◆ terminate()

bool edm::GlobalSchedule::terminate ( ) const

Return whether each output module has reached its maximum count.

Member Data Documentation

◆ actReg_

std::shared_ptr<ActivityRegistry> edm::GlobalSchedule::actReg_
private

◆ extraWorkers_

std::vector<edm::propagate_const<WorkerPtr> > edm::GlobalSchedule::extraWorkers_
private

Definition at line 119 of file GlobalSchedule.h.

Referenced by GlobalSchedule().

◆ numberOfConcurrentLumis_

unsigned int edm::GlobalSchedule::numberOfConcurrentLumis_
private

Definition at line 121 of file GlobalSchedule.h.

Referenced by processOneGlobalAsync().

◆ processContext_

ProcessContext const* edm::GlobalSchedule::processContext_
private

Definition at line 120 of file GlobalSchedule.h.

Referenced by processOneGlobalAsync().

◆ workerManagers_

std::vector<WorkerManager> edm::GlobalSchedule::workerManagers_
private