CMS 3D CMS Logo

List of all members | Classes | Public Types | Public Member Functions | Private Member Functions | Private Attributes
edm::GlobalSchedule Class Reference

#include <GlobalSchedule.h>

Classes

class  SendTerminationSignalIfException
 

Public Types

typedef std::vector< Worker * > AllWorkers
 
typedef std::vector< std::string > vstring
 
typedef std::shared_ptr< WorkerWorkerPtr
 
typedef std::vector< Worker * > Workers
 

Public Member Functions

AllWorkers const & allWorkers () const
 returns the collection of pointers to workers More...
 
void beginJob (ProductRegistry const &, eventsetup::ESRecordsToProxyIndices const &)
 
void endJob (ExceptionCollector &collector)
 
std::vector< ModuleDescription const * > getAllModuleDescriptions () const
 
void getTriggerReport (TriggerReport &rep) const
 
 GlobalSchedule (std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry > modReg, std::vector< std::string > const &modulesToUse, ParameterSet &proc_pset, ProductRegistry &pregistry, PreallocationConfiguration const &prealloc, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, ProcessContext const *processContext)
 
 GlobalSchedule (GlobalSchedule const &)=delete
 
template<typename T >
void processOneGlobalAsync (WaitingTaskHolder holder, typename T::MyPrincipal &principal, EventSetupImpl const &eventSetup, ServiceToken const &token, bool cleaningUpAfterException=false)
 
void replaceModule (maker::ModuleHolder *iMod, std::string const &iLabel)
 clone the type of module with label iLabel but configure with iPSet. More...
 
bool terminate () const
 Return whether each output module has reached its maximum count. More...
 

Private Member Functions

ExceptionToActionTable const & actionTable () const
 returns the action table More...
 

Private Attributes

std::shared_ptr< ActivityRegistryactReg_
 
std::vector< edm::propagate_const< WorkerPtr > > extraWorkers_
 
ProcessContext const * processContext_
 
std::vector< WorkerManagerworkerManagers_
 

Detailed Description

Definition at line 75 of file GlobalSchedule.h.

Member Typedef Documentation

typedef std::vector<Worker*> edm::GlobalSchedule::AllWorkers

Definition at line 78 of file GlobalSchedule.h.

typedef std::vector<std::string> edm::GlobalSchedule::vstring

Definition at line 77 of file GlobalSchedule.h.

typedef std::shared_ptr<Worker> edm::GlobalSchedule::WorkerPtr

Definition at line 79 of file GlobalSchedule.h.

typedef std::vector<Worker*> edm::GlobalSchedule::Workers

Definition at line 80 of file GlobalSchedule.h.

Constructor & Destructor Documentation

edm::GlobalSchedule::GlobalSchedule ( std::shared_ptr< TriggerResultInserter inserter,
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters,
std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &  endPathStatusInserters,
std::shared_ptr< ModuleRegistry modReg,
std::vector< std::string > const &  modulesToUse,
ParameterSet proc_pset,
ProductRegistry pregistry,
PreallocationConfiguration const &  prealloc,
ExceptionToActionTable const &  actions,
std::shared_ptr< ActivityRegistry areg,
std::shared_ptr< ProcessConfiguration processConfiguration,
ProcessContext const *  processContext 
)

Definition at line 22 of file GlobalSchedule.cc.

References actions, actReg_, extraWorkers_, edm::get_underlying(), edm::ParameterSet::getPSetForUpdate(), mps_fire::i, eostools::move(), edm::PreallocationConfiguration::numberOfLuminosityBlocks(), and workerManagers_.

35  : actReg_(areg), processContext_(processContext) {
36  workerManagers_.reserve(prealloc.numberOfLuminosityBlocks());
37  for (unsigned int i = 0; i < prealloc.numberOfLuminosityBlocks(); ++i) {
38  workerManagers_.emplace_back(modReg, areg, actions);
39  }
40  for (auto const& moduleLabel : iModulesToUse) {
41  bool isTracked;
42  ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
43  if (modpset != nullptr) { // It will be null for PathStatusInserters, it should
44  // be impossible to be null for anything else
45  assert(isTracked);
46 
47  //side effect keeps this module around
48  for (auto& wm : workerManagers_) {
49  wm.addToAllWorkers(wm.getWorker(*modpset, pregistry, &prealloc, processConfiguration, moduleLabel));
50  }
51  }
52  }
53  if (inserter) {
54  inserter->doPreallocate(prealloc);
55  for (auto& wm : workerManagers_) {
57  inserter, inserter->moduleDescription(), &actions)); // propagate_const<T> has no reset() function
58  results_inserter->setActivityRegistry(actReg_);
59  wm.addToAllWorkers(results_inserter.get());
60  extraWorkers_.emplace_back(std::move(results_inserter));
61  }
62  }
63 
64  for (auto& pathStatusInserter : pathStatusInserters) {
65  std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
66  inserterPtr->doPreallocate(prealloc);
67 
68  for (auto& wm : workerManagers_) {
69  WorkerPtr workerPtr(
70  new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
71  workerPtr->setActivityRegistry(actReg_);
72  wm.addToAllWorkers(workerPtr.get());
73  extraWorkers_.emplace_back(std::move(workerPtr));
74  }
75  }
76 
77  for (auto& endPathStatusInserter : endPathStatusInserters) {
78  std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
79  inserterPtr->doPreallocate(prealloc);
80  for (auto& wm : workerManagers_) {
82  inserterPtr, inserterPtr->moduleDescription(), &actions));
83  workerPtr->setActivityRegistry(actReg_);
84  wm.addToAllWorkers(workerPtr.get());
85  extraWorkers_.emplace_back(std::move(workerPtr));
86  }
87  }
88 
89  } // GlobalSchedule::GlobalSchedule
std::vector< WorkerManager > workerManagers_
roAction_t actions[nactions]
Definition: GenABIO.cc:181
T & get_underlying(propagate_const< T > &)
std::shared_ptr< Worker > WorkerPtr
std::shared_ptr< ActivityRegistry > actReg_
ProcessContext const * processContext_
def move(src, dest)
Definition: eostools.py:511
std::vector< edm::propagate_const< WorkerPtr > > extraWorkers_
edm::GlobalSchedule::GlobalSchedule ( GlobalSchedule const &  )
delete

Member Function Documentation

ExceptionToActionTable const& edm::GlobalSchedule::actionTable ( ) const
inlineprivate

returns the action table

Definition at line 149 of file GlobalSchedule.h.

149 { return workerManagers_[0].actionTable(); }
std::vector< WorkerManager > workerManagers_
AllWorkers const& edm::GlobalSchedule::allWorkers ( ) const
inline

returns the collection of pointers to workers

Definition at line 125 of file GlobalSchedule.h.

Referenced by getAllModuleDescriptions().

125 { return workerManagers_[0].allWorkers(); }
std::vector< WorkerManager > workerManagers_
void edm::GlobalSchedule::beginJob ( ProductRegistry const &  iRegistry,
eventsetup::ESRecordsToProxyIndices const &  iESIndices 
)

Definition at line 93 of file GlobalSchedule.cc.

References workerManagers_.

94  {
95  workerManagers_[0].beginJob(iRegistry, iESIndices);
96  }
std::vector< WorkerManager > workerManagers_
void edm::GlobalSchedule::endJob ( ExceptionCollector collector)

Definition at line 91 of file GlobalSchedule.cc.

References workerManagers_.

91 { workerManagers_[0].endJob(collector); }
std::vector< WorkerManager > workerManagers_
std::vector< ModuleDescription const * > edm::GlobalSchedule::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this GlobalSchedule. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 116 of file GlobalSchedule.cc.

References allWorkers(), AlCaHLTBitMon_ParallelJobs::p, mps_fire::result, and findQualityFiles::size.

116  {
117  std::vector<ModuleDescription const*> result;
118  result.reserve(allWorkers().size());
119 
120  for (auto const& worker : allWorkers()) {
121  ModuleDescription const* p = worker->descPtr();
122  result.push_back(p);
123  }
124  return result;
125  }
size
Write out results.
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
void edm::GlobalSchedule::getTriggerReport ( TriggerReport rep) const

Return the trigger report information on paths, modules-in-path, modules-in-endpath, and modules.

template<typename T >
void edm::GlobalSchedule::processOneGlobalAsync ( WaitingTaskHolder  holder,
typename T::MyPrincipal &  principal,
EventSetupImpl const &  eventSetup,
ServiceToken const &  token,
bool  cleaningUpAfterException = false 
)

Definition at line 158 of file GlobalSchedule.h.

References edm::addContextAndPrintException(), lumiQTWidget::aw, cms::Exception::context(), edm::WaitingTaskHolder::doneWaiting(), edm::ExceptionFromThisContext, edm::StreamID::invalidStreamID(), edm::make_waiting_task(), groupFilesInBlocks::reverse, and edm::convertException::wrap().

162  {
163  try {
164  //need the doneTask to own the memory
165  auto globalContext = std::make_shared<GlobalContext>(T::makeGlobalContext(ep, processContext_));
166 
167  if (actReg_) {
168  //Services may depend upon each other
169  ServiceRegistry::Operate op(token);
170  T::preScheduleSignal(actReg_.get(), globalContext.get());
171  }
172 
173  auto doneTask = make_waiting_task(
174  tbb::task::allocate_root(),
175  [this, iHolder, cleaningUpAfterException, globalContext, token](std::exception_ptr const* iPtr) mutable {
176  std::exception_ptr excpt;
177  if (iPtr) {
178  excpt = *iPtr;
179  //add context information to the exception and print message
180  try {
181  convertException::wrap([&]() { std::rethrow_exception(excpt); });
182  } catch (cms::Exception& ex) {
183  //TODO: should add the transition type info
184  std::ostringstream ost;
185  if (ex.context().empty()) {
186  ost << "Processing " << T::transitionName() << " ";
187  }
188  ServiceRegistry::Operate op(token);
189  addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
190  excpt = std::current_exception();
191  }
192  if (actReg_) {
193  ServiceRegistry::Operate op(token);
194  actReg_->preGlobalEarlyTerminationSignal_(*globalContext, TerminationOrigin::ExceptionFromThisContext);
195  }
196  }
197  if (actReg_) {
198  try {
199  ServiceRegistry::Operate op(token);
200  T::postScheduleSignal(actReg_.get(), globalContext.get());
201  } catch (...) {
202  if (not excpt) {
203  excpt = std::current_exception();
204  }
205  }
206  }
207  iHolder.doneWaiting(excpt);
208  });
209  workerManagers_[ep.index()].resetAll();
210 
211  ParentContext parentContext(globalContext.get());
212  //make sure the ProductResolvers know about their
213  // workers to allow proper data dependency handling
214  workerManagers_[ep.index()].setupOnDemandSystem(ep, es);
215 
216  //make sure the task doesn't get run until all workers have beens started
217  WaitingTaskHolder holdForLoop(doneTask);
218  auto& aw = workerManagers_[ep.index()].allWorkers();
219  for (Worker* worker : boost::adaptors::reverse(aw)) {
220  worker->doWorkAsync<T>(
221  doneTask, ep, es, token, StreamID::invalidStreamID(), parentContext, globalContext.get());
222  }
223  } catch (...) {
224  iHolder.doneWaiting(std::current_exception());
225  }
226  }
std::vector< WorkerManager > workerManagers_
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
static StreamID invalidStreamID()
Definition: StreamID.h:44
std::list< std::string > const & context() const
Definition: Exception.cc:147
std::shared_ptr< ActivityRegistry > actReg_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:87
auto wrap(F iFunc) -> decltype(iFunc())
ProcessContext const * processContext_
long double T
void edm::GlobalSchedule::replaceModule ( maker::ModuleHolder iMod,
std::string const &  iLabel 
)

clone the type of module with label iLabel but configure with iPSet.

Definition at line 98 of file GlobalSchedule.cc.

References edm::Worker::beginJob(), runEdmFileComparison::found, edm::maker::ModuleHolder::replaceModuleFor(), and workerManagers_.

98  {
99  Worker* found = nullptr;
100  for (auto& wm : workerManagers_) {
101  for (auto const& worker : wm.allWorkers()) {
102  if (worker->description().moduleLabel() == iLabel) {
103  found = worker;
104  break;
105  }
106  }
107  if (nullptr == found) {
108  return;
109  }
110 
111  iMod->replaceModuleFor(found);
112  found->beginJob();
113  }
114  }
std::vector< WorkerManager > workerManagers_
bool edm::GlobalSchedule::terminate ( ) const

Return whether each output module has reached its maximum count.

Member Data Documentation

std::shared_ptr<ActivityRegistry> edm::GlobalSchedule::actReg_
private

Definition at line 152 of file GlobalSchedule.h.

Referenced by GlobalSchedule().

std::vector<edm::propagate_const<WorkerPtr> > edm::GlobalSchedule::extraWorkers_
private

Definition at line 153 of file GlobalSchedule.h.

Referenced by GlobalSchedule().

ProcessContext const* edm::GlobalSchedule::processContext_
private

Definition at line 154 of file GlobalSchedule.h.

std::vector<WorkerManager> edm::GlobalSchedule::workerManagers_
private

Definition at line 151 of file GlobalSchedule.h.

Referenced by beginJob(), endJob(), GlobalSchedule(), and replaceModule().