CMS 3D CMS Logo

List of all members | Classes | Public Types | Public Member Functions | Private Member Functions | Private Attributes
edm::GlobalSchedule Class Reference

#include <GlobalSchedule.h>

Classes

class  SendTerminationSignalIfException
 

Public Types

typedef std::vector< Worker * > AllWorkers
 
typedef std::vector< std::string > vstring
 
typedef std::shared_ptr< WorkerWorkerPtr
 
typedef std::vector< Worker * > Workers
 

Public Member Functions

AllWorkers const & allWorkers () const
 returns the collection of pointers to workers More...
 
void beginJob (ProductRegistry const &, eventsetup::ESRecordsToProxyIndices const &)
 
void deleteModule (std::string const &iLabel)
 Delete the module with label iLabel. More...
 
void endJob (ExceptionCollector &collector)
 
std::vector< ModuleDescription const * > getAllModuleDescriptions () const
 
 GlobalSchedule (GlobalSchedule const &)=delete
 
 GlobalSchedule (std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry > modReg, std::vector< std::string > const &modulesToUse, ParameterSet &proc_pset, ProductRegistry &pregistry, PreallocationConfiguration const &prealloc, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, ProcessContext const *processContext)
 
template<typename T >
void processOneGlobalAsync (WaitingTaskHolder holder, typename T::TransitionInfoType &, ServiceToken const &token, bool cleaningUpAfterException=false)
 
void replaceModule (maker::ModuleHolder *iMod, std::string const &iLabel)
 clone the type of module with label iLabel but configure with iPSet. More...
 
bool terminate () const
 Return whether each output module has reached its maximum count. More...
 

Private Member Functions

ExceptionToActionTable const & actionTable () const
 returns the action table More...
 

Private Attributes

std::shared_ptr< ActivityRegistryactReg_
 
std::vector< edm::propagate_const< WorkerPtr > > extraWorkers_
 
ProcessContext const * processContext_
 
std::vector< WorkerManagerworkerManagers_
 

Detailed Description

Definition at line 80 of file GlobalSchedule.h.

Member Typedef Documentation

◆ AllWorkers

typedef std::vector<Worker*> edm::GlobalSchedule::AllWorkers

Definition at line 83 of file GlobalSchedule.h.

◆ vstring

typedef std::vector<std::string> edm::GlobalSchedule::vstring

Definition at line 82 of file GlobalSchedule.h.

◆ WorkerPtr

typedef std::shared_ptr<Worker> edm::GlobalSchedule::WorkerPtr

Definition at line 84 of file GlobalSchedule.h.

◆ Workers

typedef std::vector<Worker*> edm::GlobalSchedule::Workers

Definition at line 85 of file GlobalSchedule.h.

Constructor & Destructor Documentation

◆ GlobalSchedule() [1/2]

edm::GlobalSchedule::GlobalSchedule ( std::shared_ptr< TriggerResultInserter inserter,
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters,
std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &  endPathStatusInserters,
std::shared_ptr< ModuleRegistry modReg,
std::vector< std::string > const &  modulesToUse,
ParameterSet proc_pset,
ProductRegistry pregistry,
PreallocationConfiguration const &  prealloc,
ExceptionToActionTable const &  actions,
std::shared_ptr< ActivityRegistry areg,
std::shared_ptr< ProcessConfiguration processConfiguration,
ProcessContext const *  processContext 
)

Definition at line 22 of file GlobalSchedule.cc.

35  : actReg_(areg), processContext_(processContext) {
36  workerManagers_.reserve(prealloc.numberOfLuminosityBlocks());
37  for (unsigned int i = 0; i < prealloc.numberOfLuminosityBlocks(); ++i) {
38  workerManagers_.emplace_back(modReg, areg, actions);
39  }
40  for (auto const& moduleLabel : iModulesToUse) {
41  bool isTracked;
42  ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
43  if (modpset != nullptr) { // It will be null for PathStatusInserters, it should
44  // be impossible to be null for anything else
45  assert(isTracked);
46 
47  //side effect keeps this module around
48  for (auto& wm : workerManagers_) {
49  wm.addToAllWorkers(wm.getWorker(*modpset, pregistry, &prealloc, processConfiguration, moduleLabel));
50  }
51  }
52  }
53  if (inserter) {
54  inserter->doPreallocate(prealloc);
55  for (auto& wm : workerManagers_) {
57  inserter, inserter->moduleDescription(), &actions)); // propagate_const<T> has no reset() function
58  results_inserter->setActivityRegistry(actReg_);
59  wm.addToAllWorkers(results_inserter.get());
60  extraWorkers_.emplace_back(std::move(results_inserter));
61  }
62  }
63 
64  for (auto& pathStatusInserter : pathStatusInserters) {
65  std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
66  inserterPtr->doPreallocate(prealloc);
67 
68  for (auto& wm : workerManagers_) {
69  WorkerPtr workerPtr(
70  new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
71  workerPtr->setActivityRegistry(actReg_);
72  wm.addToAllWorkers(workerPtr.get());
73  extraWorkers_.emplace_back(std::move(workerPtr));
74  }
75  }
76 
77  for (auto& endPathStatusInserter : endPathStatusInserters) {
78  std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
79  inserterPtr->doPreallocate(prealloc);
80  for (auto& wm : workerManagers_) {
82  inserterPtr, inserterPtr->moduleDescription(), &actions));
83  workerPtr->setActivityRegistry(actReg_);
84  wm.addToAllWorkers(workerPtr.get());
85  extraWorkers_.emplace_back(std::move(workerPtr));
86  }
87  }
88 
89  } // GlobalSchedule::GlobalSchedule

References actions, actReg_, cms::cuda::assert(), extraWorkers_, edm::get_underlying(), edm::ParameterSet::getPSetForUpdate(), mps_fire::i, HerwigMaxPtPartonFilter_cfi::moduleLabel, eostools::move(), edm::PreallocationConfiguration::numberOfLuminosityBlocks(), and workerManagers_.

◆ GlobalSchedule() [2/2]

edm::GlobalSchedule::GlobalSchedule ( GlobalSchedule const &  )
delete

Member Function Documentation

◆ actionTable()

ExceptionToActionTable const& edm::GlobalSchedule::actionTable ( ) const
inlineprivate

returns the action table

Definition at line 152 of file GlobalSchedule.h.

152 { return workerManagers_[0].actionTable(); }

References workerManagers_.

◆ allWorkers()

AllWorkers const& edm::GlobalSchedule::allWorkers ( ) const
inline

returns the collection of pointers to workers

Definition at line 128 of file GlobalSchedule.h.

128 { return workerManagers_[0].allWorkers(); }

References workerManagers_.

Referenced by getAllModuleDescriptions().

◆ beginJob()

void edm::GlobalSchedule::beginJob ( ProductRegistry const &  iRegistry,
eventsetup::ESRecordsToProxyIndices const &  iESIndices 
)

Definition at line 93 of file GlobalSchedule.cc.

94  {
95  workerManagers_[0].beginJob(iRegistry, iESIndices);
96  }

References workerManagers_.

◆ deleteModule()

void edm::GlobalSchedule::deleteModule ( std::string const &  iLabel)

Delete the module with label iLabel.

Definition at line 116 of file GlobalSchedule.cc.

116  {
117  for (auto& wm : workerManagers_) {
118  wm.deleteModuleIfExists(iLabel);
119  }
120  }

References workerManagers_.

◆ endJob()

void edm::GlobalSchedule::endJob ( ExceptionCollector collector)

Definition at line 91 of file GlobalSchedule.cc.

91 { workerManagers_[0].endJob(collector); }

References workerManagers_.

◆ getAllModuleDescriptions()

std::vector< ModuleDescription const * > edm::GlobalSchedule::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this GlobalSchedule. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 122 of file GlobalSchedule.cc.

122  {
123  std::vector<ModuleDescription const*> result;
124  result.reserve(allWorkers().size());
125 
126  for (auto const& worker : allWorkers()) {
127  ModuleDescription const* p = worker->description();
128  result.push_back(p);
129  }
130  return result;
131  }

References allWorkers(), AlCaHLTBitMon_ParallelJobs::p, mps_fire::result, and findQualityFiles::size.

◆ processOneGlobalAsync()

template<typename T >
void edm::GlobalSchedule::processOneGlobalAsync ( WaitingTaskHolder  holder,
typename T::TransitionInfoType &  transitionInfo,
ServiceToken const &  token,
bool  cleaningUpAfterException = false 
)

Definition at line 161 of file GlobalSchedule.h.

164  {
165  auto const& principal = transitionInfo.principal();
166 
167  // Caught exception is propagated via WaitingTaskHolder
168  CMS_SA_ALLOW try {
169  //need the doneTask to own the memory
170  auto globalContext = std::make_shared<GlobalContext>(T::makeGlobalContext(principal, processContext_));
171 
172  if (actReg_) {
173  //Services may depend upon each other
175  T::preScheduleSignal(actReg_.get(), globalContext.get());
176  }
177 
178  ServiceWeakToken weakToken = token;
179  auto doneTask = make_waiting_task(
180  [this, iHolder, cleaningUpAfterException, globalContext, weakToken](std::exception_ptr const* iPtr) mutable {
181  std::exception_ptr excpt;
182  if (iPtr) {
183  excpt = *iPtr;
184  //add context information to the exception and print message
185  try {
186  convertException::wrap([&]() { std::rethrow_exception(excpt); });
187  } catch (cms::Exception& ex) {
188  //TODO: should add the transition type info
189  std::ostringstream ost;
190  if (ex.context().empty()) {
191  ost << "Processing " << T::transitionName() << " ";
192  }
193  ServiceRegistry::Operate op(weakToken.lock());
194  addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
195  excpt = std::current_exception();
196  }
197  if (actReg_) {
198  ServiceRegistry::Operate op(weakToken.lock());
199  actReg_->preGlobalEarlyTerminationSignal_(*globalContext, TerminationOrigin::ExceptionFromThisContext);
200  }
201  }
202  if (actReg_) {
203  // Caught exception is propagated via WaitingTaskHolder
204  CMS_SA_ALLOW try {
205  ServiceRegistry::Operate op(weakToken.lock());
206  T::postScheduleSignal(actReg_.get(), globalContext.get());
207  } catch (...) {
208  if (not excpt) {
209  excpt = std::current_exception();
210  }
211  }
212  }
213  iHolder.doneWaiting(excpt);
214  });
215  WorkerManager& workerManager = workerManagers_[principal.index()];
216  workerManager.resetAll();
217 
218  ParentContext parentContext(globalContext.get());
219  //make sure the ProductResolvers know about their
220  // workers to allow proper data dependency handling
221  workerManager.setupResolvers(transitionInfo.principal());
222 
223  //make sure the task doesn't get run until all workers have beens started
224  WaitingTaskHolder holdForLoop(*iHolder.group(), doneTask);
225  auto& aw = workerManager.allWorkers();
226  for (Worker* worker : boost::adaptors::reverse(aw)) {
227  worker->doWorkAsync<T>(
228  holdForLoop, transitionInfo, token, StreamID::invalidStreamID(), parentContext, globalContext.get());
229  }
230  } catch (...) {
231  iHolder.doneWaiting(std::current_exception());
232  }
233  }

References actReg_, edm::addContextAndPrintException(), CMS_SA_ALLOW, cms::Exception::context(), edm::WaitingTaskHolder::doneWaiting(), edm::ExceptionFromThisContext, edm::WaitingTaskHolder::group(), edm::StreamID::invalidStreamID(), edm::ServiceWeakToken::lock(), edm::make_waiting_task(), processContext_, groupFilesInBlocks::reverse, unpackBuffers-CaloStage2::token, workerManagers_, and edm::convertException::wrap().

◆ replaceModule()

void edm::GlobalSchedule::replaceModule ( maker::ModuleHolder iMod,
std::string const &  iLabel 
)

clone the type of module with label iLabel but configure with iPSet.

Definition at line 98 of file GlobalSchedule.cc.

98  {
99  Worker* found = nullptr;
100  for (auto& wm : workerManagers_) {
101  for (auto const& worker : wm.allWorkers()) {
102  if (worker->description()->moduleLabel() == iLabel) {
103  found = worker;
104  break;
105  }
106  }
107  if (nullptr == found) {
108  return;
109  }
110 
111  iMod->replaceModuleFor(found);
112  found->beginJob();
113  }
114  }

References newFWLiteAna::found, edm::maker::ModuleHolder::replaceModuleFor(), and workerManagers_.

◆ terminate()

bool edm::GlobalSchedule::terminate ( ) const

Return whether each output module has reached its maximum count.

Member Data Documentation

◆ actReg_

std::shared_ptr<ActivityRegistry> edm::GlobalSchedule::actReg_
private

Definition at line 155 of file GlobalSchedule.h.

Referenced by GlobalSchedule(), and processOneGlobalAsync().

◆ extraWorkers_

std::vector<edm::propagate_const<WorkerPtr> > edm::GlobalSchedule::extraWorkers_
private

Definition at line 156 of file GlobalSchedule.h.

Referenced by GlobalSchedule().

◆ processContext_

ProcessContext const* edm::GlobalSchedule::processContext_
private

Definition at line 157 of file GlobalSchedule.h.

Referenced by processOneGlobalAsync().

◆ workerManagers_

std::vector<WorkerManager> edm::GlobalSchedule::workerManagers_
private
edm::GlobalSchedule::allWorkers
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
Definition: GlobalSchedule.h:128
edm::TerminationOrigin::ExceptionFromThisContext
mps_fire.i
i
Definition: mps_fire.py:428
AlCaHLTBitMon_ParallelJobs.p
p
Definition: AlCaHLTBitMon_ParallelJobs.py:153
cms::cuda::assert
assert(be >=bs)
edm::WorkerT
Definition: Frameworkfwd.h:62
newFWLiteAna.found
found
Definition: newFWLiteAna.py:118
CMS_SA_ALLOW
#define CMS_SA_ALLOW
Definition: thread_safety_macros.h:5
groupFilesInBlocks.reverse
reverse
Definition: groupFilesInBlocks.py:131
edm::GlobalSchedule::actReg_
std::shared_ptr< ActivityRegistry > actReg_
Definition: GlobalSchedule.h:155
edm::convertException::wrap
auto wrap(F iFunc) -> decltype(iFunc())
Definition: ConvertException.h:19
WaitingTaskHolder
edm::GlobalSchedule::processContext_
ProcessContext const * processContext_
Definition: GlobalSchedule.h:157
edm::GlobalSchedule::workerManagers_
std::vector< WorkerManager > workerManagers_
Definition: GlobalSchedule.h:154
edm::ServiceRegistry::Operate
friend class Operate
Definition: ServiceRegistry.h:54
cms::Exception::context
std::list< std::string > const & context() const
Definition: Exception.cc:147
edm::make_waiting_task
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:101
ParameterSet
Definition: Functions.h:16
edm::StreamID::invalidStreamID
static StreamID invalidStreamID()
Definition: StreamID.h:45
edm::get_underlying
constexpr T & get_underlying(propagate_const< T > &)
Definition: propagate_const.h:103
edm::addContextAndPrintException
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
Definition: ExceptionHelpers.cc:11
edm::GlobalSchedule::extraWorkers_
std::vector< edm::propagate_const< WorkerPtr > > extraWorkers_
Definition: GlobalSchedule.h:156
edm::GlobalSchedule::WorkerPtr
std::shared_ptr< Worker > WorkerPtr
Definition: GlobalSchedule.h:84
eostools.move
def move(src, dest)
Definition: eostools.py:511
T
long double T
Definition: Basic3DVectorLD.h:48
actions
roAction_t actions[nactions]
Definition: GenABIO.cc:181
mps_fire.result
result
Definition: mps_fire.py:311
cms::Exception
Definition: Exception.h:70
HerwigMaxPtPartonFilter_cfi.moduleLabel
moduleLabel
Definition: HerwigMaxPtPartonFilter_cfi.py:4
findQualityFiles.size
size
Write out results.
Definition: findQualityFiles.py:443
unpackBuffers-CaloStage2.token
token
Definition: unpackBuffers-CaloStage2.py:316