CMS 3D CMS Logo

List of all members | Classes | Public Types | Public Member Functions | Private Member Functions | Private Attributes
edm::GlobalSchedule Class Reference

#include <GlobalSchedule.h>

Classes

class  SendTerminationSignalIfException
 

Public Types

typedef std::vector< Worker * > AllWorkers
 
typedef std::vector< std::string > vstring
 
typedef std::shared_ptr< WorkerWorkerPtr
 
typedef std::vector< Worker * > Workers
 

Public Member Functions

AllWorkers const & allWorkers () const
 returns the collection of pointers to workers More...
 
void beginJob (ProductRegistry const &, eventsetup::ESRecordsToProductResolverIndices const &, ProcessBlockHelperBase const &)
 
void deleteModule (std::string const &iLabel)
 Delete the module with label iLabel. More...
 
void endJob (ExceptionCollector &collector)
 
std::vector< ModuleDescription const * > getAllModuleDescriptions () const
 
 GlobalSchedule (std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry > modReg, std::vector< std::string > const &modulesToUse, ParameterSet &proc_pset, ProductRegistry &pregistry, PreallocationConfiguration const &prealloc, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, ProcessContext const *processContext)
 
 GlobalSchedule (GlobalSchedule const &)=delete
 
template<typename T >
void processOneGlobalAsync (WaitingTaskHolder holder, typename T::TransitionInfoType &, ServiceToken const &token, bool cleaningUpAfterException=false)
 
void replaceModule (maker::ModuleHolder *iMod, std::string const &iLabel)
 clone the type of module with label iLabel but configure with iPSet. More...
 
bool terminate () const
 Return whether each output module has reached its maximum count. More...
 

Private Member Functions

ExceptionToActionTable const & actionTable () const
 returns the action table More...
 

Private Attributes

std::shared_ptr< ActivityRegistryactReg_
 
std::vector< edm::propagate_const< WorkerPtr > > extraWorkers_
 
unsigned int numberOfConcurrentLumis_
 
ProcessContext const * processContext_
 
std::vector< WorkerManagerworkerManagers_
 

Detailed Description

Definition at line 81 of file GlobalSchedule.h.

Member Typedef Documentation

◆ AllWorkers

typedef std::vector<Worker*> edm::GlobalSchedule::AllWorkers

Definition at line 84 of file GlobalSchedule.h.

◆ vstring

typedef std::vector<std::string> edm::GlobalSchedule::vstring

Definition at line 83 of file GlobalSchedule.h.

◆ WorkerPtr

typedef std::shared_ptr<Worker> edm::GlobalSchedule::WorkerPtr

Definition at line 85 of file GlobalSchedule.h.

◆ Workers

typedef std::vector<Worker*> edm::GlobalSchedule::Workers

Definition at line 86 of file GlobalSchedule.h.

Constructor & Destructor Documentation

◆ GlobalSchedule() [1/2]

edm::GlobalSchedule::GlobalSchedule ( std::shared_ptr< TriggerResultInserter inserter,
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters,
std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &  endPathStatusInserters,
std::shared_ptr< ModuleRegistry modReg,
std::vector< std::string > const &  modulesToUse,
ParameterSet proc_pset,
ProductRegistry pregistry,
PreallocationConfiguration const &  prealloc,
ExceptionToActionTable const &  actions,
std::shared_ptr< ActivityRegistry areg,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
ProcessContext const *  processContext 
)

Definition at line 22 of file GlobalSchedule.cc.

References actions, actReg_, cms::cuda::assert(), extraWorkers_, edm::get_underlying(), edm::ParameterSet::getPSetForUpdate(), mps_fire::i, HerwigMaxPtPartonFilter_cfi::moduleLabel, eostools::move(), edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfRuns(), and workerManagers_.

35  : actReg_(areg), processContext_(processContext), numberOfConcurrentLumis_(prealloc.numberOfLuminosityBlocks()) {
36  unsigned int nManagers = prealloc.numberOfLuminosityBlocks() + prealloc.numberOfRuns();
37  workerManagers_.reserve(nManagers);
38  for (unsigned int i = 0; i < nManagers; ++i) {
39  workerManagers_.emplace_back(modReg, areg, actions);
40  }
41  for (auto const& moduleLabel : iModulesToUse) {
42  bool isTracked;
43  ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
44  if (modpset != nullptr) { // It will be null for PathStatusInserters, it should
45  // be impossible to be null for anything else
46  assert(isTracked);
47 
48  //side effect keeps this module around
49  for (auto& wm : workerManagers_) {
50  wm.addToAllWorkers(wm.getWorker(*modpset, pregistry, &prealloc, processConfiguration, moduleLabel));
51  }
52  }
53  }
54  if (inserter) {
55  inserter->doPreallocate(prealloc);
56  for (auto& wm : workerManagers_) {
58  inserter, inserter->moduleDescription(), &actions)); // propagate_const<T> has no reset() function
59  results_inserter->setActivityRegistry(actReg_);
60  wm.addToAllWorkers(results_inserter.get());
61  extraWorkers_.emplace_back(std::move(results_inserter));
62  }
63  }
64 
65  for (auto& pathStatusInserter : pathStatusInserters) {
66  std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
67  inserterPtr->doPreallocate(prealloc);
68 
69  for (auto& wm : workerManagers_) {
70  WorkerPtr workerPtr(
71  new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
72  workerPtr->setActivityRegistry(actReg_);
73  wm.addToAllWorkers(workerPtr.get());
74  extraWorkers_.emplace_back(std::move(workerPtr));
75  }
76  }
77 
78  for (auto& endPathStatusInserter : endPathStatusInserters) {
79  std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
80  inserterPtr->doPreallocate(prealloc);
81  for (auto& wm : workerManagers_) {
83  inserterPtr, inserterPtr->moduleDescription(), &actions));
84  workerPtr->setActivityRegistry(actReg_);
85  wm.addToAllWorkers(workerPtr.get());
86  extraWorkers_.emplace_back(std::move(workerPtr));
87  }
88  }
89 
90  } // GlobalSchedule::GlobalSchedule
std::vector< WorkerManager > workerManagers_
roAction_t actions[nactions]
Definition: GenABIO.cc:181
assert(be >=bs)
std::shared_ptr< Worker > WorkerPtr
std::shared_ptr< ActivityRegistry > actReg_
constexpr T & get_underlying(propagate_const< T > &)
ProcessContext const * processContext_
unsigned int numberOfConcurrentLumis_
def move(src, dest)
Definition: eostools.py:511
std::vector< edm::propagate_const< WorkerPtr > > extraWorkers_

◆ GlobalSchedule() [2/2]

edm::GlobalSchedule::GlobalSchedule ( GlobalSchedule const &  )
delete

Member Function Documentation

◆ actionTable()

ExceptionToActionTable const& edm::GlobalSchedule::actionTable ( ) const
inlineprivate

returns the action table

Definition at line 155 of file GlobalSchedule.h.

References workerManagers_.

155 { return workerManagers_[0].actionTable(); }
std::vector< WorkerManager > workerManagers_

◆ allWorkers()

AllWorkers const& edm::GlobalSchedule::allWorkers ( ) const
inline

returns the collection of pointers to workers

Definition at line 131 of file GlobalSchedule.h.

References workerManagers_.

Referenced by getAllModuleDescriptions().

131 { return workerManagers_[0].allWorkers(); }
std::vector< WorkerManager > workerManagers_

◆ beginJob()

void edm::GlobalSchedule::beginJob ( ProductRegistry const &  iRegistry,
eventsetup::ESRecordsToProductResolverIndices const &  iESIndices,
ProcessBlockHelperBase const &  processBlockHelperBase 
)

Definition at line 94 of file GlobalSchedule.cc.

References workerManagers_.

96  {
97  workerManagers_[0].beginJob(iRegistry, iESIndices, processBlockHelperBase);
98  }
std::vector< WorkerManager > workerManagers_

◆ deleteModule()

void edm::GlobalSchedule::deleteModule ( std::string const &  iLabel)

Delete the module with label iLabel.

Definition at line 118 of file GlobalSchedule.cc.

References workerManagers_.

118  {
119  for (auto& wm : workerManagers_) {
120  wm.deleteModuleIfExists(iLabel);
121  }
122  }
std::vector< WorkerManager > workerManagers_

◆ endJob()

void edm::GlobalSchedule::endJob ( ExceptionCollector collector)

Definition at line 92 of file GlobalSchedule.cc.

References workerManagers_.

92 { workerManagers_[0].endJob(collector); }
std::vector< WorkerManager > workerManagers_

◆ getAllModuleDescriptions()

std::vector< ModuleDescription const * > edm::GlobalSchedule::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this GlobalSchedule. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 124 of file GlobalSchedule.cc.

References allWorkers(), AlCaHLTBitMon_ParallelJobs::p, mps_fire::result, and findQualityFiles::size.

124  {
125  std::vector<ModuleDescription const*> result;
126  result.reserve(allWorkers().size());
127 
128  for (auto const& worker : allWorkers()) {
129  ModuleDescription const* p = worker->description();
130  result.push_back(p);
131  }
132  return result;
133  }
size
Write out results.
AllWorkers const & allWorkers() const
returns the collection of pointers to workers

◆ processOneGlobalAsync()

template<typename T >
void edm::GlobalSchedule::processOneGlobalAsync ( WaitingTaskHolder  holder,
typename T::TransitionInfoType &  transitionInfo,
ServiceToken const &  token,
bool  cleaningUpAfterException = false 
)

Definition at line 165 of file GlobalSchedule.h.

References actReg_, edm::addContextAndPrintException(), edm::WorkerManager::allWorkers(), CMS_SA_ALLOW, cms::Exception::context(), edm::WaitingTaskHolder::doneWaiting(), edm::ExceptionFromThisContext, edm::WaitingTaskHolder::group(), edm::InRun, edm::StreamID::invalidStreamID(), edm::ServiceWeakToken::lock(), edm::make_waiting_task(), numberOfConcurrentLumis_, findAndChange::op, processContext_, edm::WorkerManager::resetAll(), groupFilesInBlocks::reverse, edm::WorkerManager::setupResolvers(), unpackBuffers-CaloStage2::token, edm::transitionName(), workerManagers_, and edm::convertException::wrap().

168  {
169  auto const& principal = transitionInfo.principal();
170 
171  // Caught exception is propagated via WaitingTaskHolder
172  CMS_SA_ALLOW try {
173  //need the doneTask to own the memory
174  auto globalContext = std::make_shared<GlobalContext>(T::makeGlobalContext(principal, processContext_));
175 
176  if (actReg_) {
177  //Services may depend upon each other
179  T::preScheduleSignal(actReg_.get(), globalContext.get());
180  }
181 
182  ServiceWeakToken weakToken = token;
183  auto doneTask = make_waiting_task(
184  [this, iHolder, cleaningUpAfterException, globalContext, weakToken](std::exception_ptr const* iPtr) mutable {
185  std::exception_ptr excpt;
186  if (iPtr) {
187  excpt = *iPtr;
188  //add context information to the exception and print message
189  try {
190  convertException::wrap([&]() { std::rethrow_exception(excpt); });
191  } catch (cms::Exception& ex) {
192  //TODO: should add the transition type info
193  std::ostringstream ost;
194  if (ex.context().empty()) {
195  ost << "Processing " << T::transitionName() << " ";
196  }
197  ServiceRegistry::Operate op(weakToken.lock());
198  addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
199  excpt = std::current_exception();
200  }
201  if (actReg_) {
202  ServiceRegistry::Operate op(weakToken.lock());
203  actReg_->preGlobalEarlyTerminationSignal_(*globalContext, TerminationOrigin::ExceptionFromThisContext);
204  }
205  }
206  if (actReg_) {
207  // Caught exception is propagated via WaitingTaskHolder
208  CMS_SA_ALLOW try {
209  ServiceRegistry::Operate op(weakToken.lock());
210  T::postScheduleSignal(actReg_.get(), globalContext.get());
211  } catch (...) {
212  if (not excpt) {
213  excpt = std::current_exception();
214  }
215  }
216  }
217  iHolder.doneWaiting(excpt);
218  });
219  unsigned int managerIndex = principal.index();
220  if constexpr (T::branchType_ == InRun) {
221  managerIndex += numberOfConcurrentLumis_;
222  }
223  WorkerManager& workerManager = workerManagers_[managerIndex];
224  workerManager.resetAll();
225 
226  ParentContext parentContext(globalContext.get());
227  //make sure the ProductResolvers know about their
228  // workers to allow proper data dependency handling
229  workerManager.setupResolvers(transitionInfo.principal());
230 
231  //make sure the task doesn't get run until all workers have beens started
232  WaitingTaskHolder holdForLoop(*iHolder.group(), doneTask);
233  auto& aw = workerManager.allWorkers();
234  for (Worker* worker : boost::adaptors::reverse(aw)) {
235  worker->doWorkAsync<T>(
236  holdForLoop, transitionInfo, token, StreamID::invalidStreamID(), parentContext, globalContext.get());
237  }
238  } catch (...) {
239  iHolder.doneWaiting(std::current_exception());
240  }
241  }
std::string_view transitionName(GlobalContext::Transition)
#define CMS_SA_ALLOW
std::vector< WorkerManager > workerManagers_
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
static StreamID invalidStreamID()
Definition: StreamID.h:45
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
std::shared_ptr< ActivityRegistry > actReg_
auto wrap(F iFunc) -> decltype(iFunc())
ProcessContext const * processContext_
std::list< std::string > const & context() const
Definition: Exception.cc:151
long double T
unsigned int numberOfConcurrentLumis_

◆ replaceModule()

void edm::GlobalSchedule::replaceModule ( maker::ModuleHolder iMod,
std::string const &  iLabel 
)

clone the type of module with label iLabel but configure with iPSet.

Definition at line 100 of file GlobalSchedule.cc.

References newFWLiteAna::found, edm::maker::ModuleHolder::replaceModuleFor(), and workerManagers_.

100  {
101  Worker* found = nullptr;
102  for (auto& wm : workerManagers_) {
103  for (auto const& worker : wm.allWorkers()) {
104  if (worker->description()->moduleLabel() == iLabel) {
105  found = worker;
106  break;
107  }
108  }
109  if (nullptr == found) {
110  return;
111  }
112 
113  iMod->replaceModuleFor(found);
114  found->beginJob();
115  }
116  }
std::vector< WorkerManager > workerManagers_

◆ terminate()

bool edm::GlobalSchedule::terminate ( ) const

Return whether each output module has reached its maximum count.

Member Data Documentation

◆ actReg_

std::shared_ptr<ActivityRegistry> edm::GlobalSchedule::actReg_
private

Definition at line 158 of file GlobalSchedule.h.

Referenced by GlobalSchedule(), and processOneGlobalAsync().

◆ extraWorkers_

std::vector<edm::propagate_const<WorkerPtr> > edm::GlobalSchedule::extraWorkers_
private

Definition at line 159 of file GlobalSchedule.h.

Referenced by GlobalSchedule().

◆ numberOfConcurrentLumis_

unsigned int edm::GlobalSchedule::numberOfConcurrentLumis_
private

Definition at line 161 of file GlobalSchedule.h.

Referenced by processOneGlobalAsync().

◆ processContext_

ProcessContext const* edm::GlobalSchedule::processContext_
private

Definition at line 160 of file GlobalSchedule.h.

Referenced by processOneGlobalAsync().

◆ workerManagers_

std::vector<WorkerManager> edm::GlobalSchedule::workerManagers_
private