CMS 3D CMS Logo

List of all members | Classes | Public Types | Public Member Functions | Private Member Functions | Private Attributes
edm::GlobalSchedule Class Reference

#include <GlobalSchedule.h>

Classes

class  SendTerminationSignalIfException
 

Public Types

typedef std::vector< Worker * > AllWorkers
 
typedef std::vector< std::string > vstring
 
typedef std::shared_ptr< WorkerWorkerPtr
 
typedef std::vector< Worker * > Workers
 

Public Member Functions

AllWorkers const & allWorkers () const
 returns the collection of pointers to workers More...
 
void beginJob (ProductRegistry const &)
 
void endJob (ExceptionCollector &collector)
 
std::vector< ModuleDescription const * > getAllModuleDescriptions () const
 
void getTriggerReport (TriggerReport &rep) const
 
 GlobalSchedule (std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry > modReg, std::vector< std::string > const &modulesToUse, ParameterSet &proc_pset, ProductRegistry &pregistry, PreallocationConfiguration const &prealloc, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, ProcessContext const *processContext)
 
 GlobalSchedule (GlobalSchedule const &)=delete
 
template<typename T >
void processOneGlobal (typename T::MyPrincipal &principal, EventSetup const &eventSetup, bool cleaningUpAfterException=false)
 
template<typename T >
void processOneGlobalAsync (WaitingTaskHolder holder, typename T::MyPrincipal &principal, EventSetup const &eventSetup, bool cleaningUpAfterException=false)
 
void replaceModule (maker::ModuleHolder *iMod, std::string const &iLabel)
 clone the type of module with label iLabel but configure with iPSet. More...
 
bool terminate () const
 Return whether each output module has reached its maximum count. More...
 

Private Member Functions

ExceptionToActionTable const & actionTable () const
 returns the action table More...
 
void addToAllWorkers (Worker *w)
 
template<typename T >
void runNow (typename T::MyPrincipal const &p, EventSetup const &es, GlobalContext const *context)
 

Private Attributes

std::shared_ptr< ActivityRegistryactReg_
 
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
 
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
 
ProcessContext const * processContext_
 
edm::propagate_const< WorkerPtrresults_inserter_
 
WorkerManager workerManager_
 

Detailed Description

Definition at line 75 of file GlobalSchedule.h.

Member Typedef Documentation

typedef std::vector<Worker*> edm::GlobalSchedule::AllWorkers

Definition at line 78 of file GlobalSchedule.h.

typedef std::vector<std::string> edm::GlobalSchedule::vstring

Definition at line 77 of file GlobalSchedule.h.

typedef std::shared_ptr<Worker> edm::GlobalSchedule::WorkerPtr

Definition at line 79 of file GlobalSchedule.h.

typedef std::vector<Worker*> edm::GlobalSchedule::Workers

Definition at line 80 of file GlobalSchedule.h.

Constructor & Destructor Documentation

edm::GlobalSchedule::GlobalSchedule ( std::shared_ptr< TriggerResultInserter inserter,
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters,
std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &  endPathStatusInserters,
std::shared_ptr< ModuleRegistry modReg,
std::vector< std::string > const &  modulesToUse,
ParameterSet proc_pset,
ProductRegistry pregistry,
PreallocationConfiguration const &  prealloc,
ExceptionToActionTable const &  actions,
std::shared_ptr< ActivityRegistry areg,
std::shared_ptr< ProcessConfiguration processConfiguration,
ProcessContext const *  processContext 
)

Definition at line 21 of file GlobalSchedule.cc.

References actions, actReg_, addToAllWorkers(), endPathStatusInserterWorkers_, edm::propagate_const< T >::get(), edm::get_underlying(), edm::ParameterSet::getPSetForUpdate(), edm::WorkerManager::getWorker(), pathStatusInserterWorkers_, results_inserter_, and workerManager_.

32  :
33  workerManager_(modReg,areg,actions),
34  actReg_(areg),
35  processContext_(processContext)
36  {
37  for (auto const& moduleLabel : iModulesToUse) {
38  bool isTracked;
39  ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
40  if (modpset != nullptr) { // It will be null for PathStatusInserters, it should
41  // be impossible to be null for anything else
42  assert(isTracked);
43 
44  //side effect keeps this module around
45  addToAllWorkers(workerManager_.getWorker(*modpset, pregistry, &prealloc,processConfiguration, moduleLabel));
46  }
47  }
48  if(inserter) {
49  results_inserter_ = WorkerPtr(new edm::WorkerT<TriggerResultInserter::ModuleType>(inserter, inserter->moduleDescription(), &actions)); // propagate_const<T> has no reset() function
50  inserter->doPreallocate(prealloc);
51  results_inserter_->setActivityRegistry(actReg_);
53  }
54 
55  for(auto & pathStatusInserter : pathStatusInserters) {
56  std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
58  inserterPtr->moduleDescription(),
59  &actions));
60  pathStatusInserterWorkers_.emplace_back(workerPtr);
61  inserterPtr->doPreallocate(prealloc);
62  workerPtr->setActivityRegistry(actReg_);
63  addToAllWorkers(workerPtr.get());
64  }
65 
66  for(auto & endPathStatusInserter : endPathStatusInserters) {
67  std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
69  inserterPtr->moduleDescription(),
70  &actions));
71  endPathStatusInserterWorkers_.emplace_back(workerPtr);
72  inserterPtr->doPreallocate(prealloc);
73  workerPtr->setActivityRegistry(actReg_);
74  addToAllWorkers(workerPtr.get());
75  }
76 
77  } // GlobalSchedule::GlobalSchedule
edm::propagate_const< WorkerPtr > results_inserter_
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
roAction_t actions[nactions]
Definition: GenABIO.cc:187
void addToAllWorkers(Worker *w)
T & get_underlying(propagate_const< T > &)
std::shared_ptr< Worker > WorkerPtr
element_type const * get() const
std::shared_ptr< ActivityRegistry > actReg_
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
ProcessContext const * processContext_
WorkerManager workerManager_
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
edm::GlobalSchedule::GlobalSchedule ( GlobalSchedule const &  )
delete

Member Function Documentation

ExceptionToActionTable const& edm::GlobalSchedule::actionTable ( ) const
inlineprivate

returns the action table

Definition at line 162 of file GlobalSchedule.h.

References w.

162  {
163  return workerManager_.actionTable();
164  }
WorkerManager workerManager_
ExceptionToActionTable const & actionTable() const
Definition: WorkerManager.h:79
void edm::GlobalSchedule::addToAllWorkers ( Worker w)
private

Definition at line 117 of file GlobalSchedule.cc.

References edm::WorkerManager::addToAllWorkers(), and workerManager_.

Referenced by GlobalSchedule().

117  {
119  }
const double w
Definition: UKUtility.cc:23
WorkerManager workerManager_
void addToAllWorkers(Worker *w)
AllWorkers const& edm::GlobalSchedule::allWorkers ( ) const
inline

returns the collection of pointers to workers

Definition at line 129 of file GlobalSchedule.h.

Referenced by getAllModuleDescriptions(), and replaceModule().

129  {
130  return workerManager_.allWorkers();
131  }
AllWorkers const & allWorkers() const
Definition: WorkerManager.h:75
WorkerManager workerManager_
void edm::GlobalSchedule::beginJob ( ProductRegistry const &  iRegistry)

Definition at line 83 of file GlobalSchedule.cc.

References edm::WorkerManager::beginJob(), and workerManager_.

83  {
84  workerManager_.beginJob(iRegistry);
85  }
void beginJob(ProductRegistry const &iRegistry)
WorkerManager workerManager_
void edm::GlobalSchedule::endJob ( ExceptionCollector collector)

Definition at line 79 of file GlobalSchedule.cc.

References edm::WorkerManager::endJob(), and workerManager_.

79  {
80  workerManager_.endJob(collector);
81  }
WorkerManager workerManager_
std::vector< ModuleDescription const * > edm::GlobalSchedule::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this GlobalSchedule. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 105 of file GlobalSchedule.cc.

References allWorkers(), AlCaHLTBitMon_ParallelJobs::p, mps_fire::result, and findQualityFiles::size.

105  {
106  std::vector<ModuleDescription const*> result;
107  result.reserve(allWorkers().size());
108 
109  for (auto const& worker : allWorkers()) {
110  ModuleDescription const* p = worker->descPtr();
111  result.push_back(p);
112  }
113  return result;
114  }
size
Write out results.
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
void edm::GlobalSchedule::getTriggerReport ( TriggerReport rep) const

Return the trigger report information on paths, modules-in-path, modules-in-endpath, and modules.

template<typename T >
void edm::GlobalSchedule::processOneGlobal ( typename T::MyPrincipal &  principal,
EventSetup const &  eventSetup,
bool  cleaningUpAfterException = false 
)

Definition at line 180 of file GlobalSchedule.h.

References edm::addContextAndPrintException(), cms::Exception::context(), edm::StreamID::invalidStreamID(), and edm::convertException::wrap().

182  {
183  GlobalContext globalContext = T::makeGlobalContext(ep, processContext_);
184 
185  GlobalScheduleSignalSentry<T> sentry(actReg_.get(), &globalContext);
186 
187  SendTerminationSignalIfException terminationSentry(actReg_.get(), &globalContext);
188 
189  //If we are in an end transition, we need to reset failed items since they might
190  // be set this time around
191  if( not T::begin_) {
192  ep.resetFailedFromThisProcess();
193  }
194  // This call takes care of the unscheduled processing.
195  workerManager_.processOneOccurrence<T>(ep, es, StreamID::invalidStreamID(), &globalContext, &globalContext, cleaningUpAfterException);
196 
197  try {
198  convertException::wrap([&]() {
199  runNow<T>(ep,es,&globalContext);
200  });
201  }
202  catch(cms::Exception& ex) {
203  if (ex.context().empty()) {
204  addContextAndPrintException("Calling function GlobalSchedule::processOneGlobal", ex, cleaningUpAfterException);
205  } else {
206  addContextAndPrintException("", ex, cleaningUpAfterException);
207  }
208  throw;
209  }
210  terminationSentry.completedSuccessfully();
211 
212  //If we got here no other exception has happened so we can propogate any Service related exceptions
213  sentry.allowThrow();
214  }
void processOneOccurrence(typename T::MyPrincipal &principal, EventSetup const &eventSetup, StreamID streamID, typename T::Context const *topContext, U const *context, bool cleaningUpAfterException=false)
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
static StreamID invalidStreamID()
Definition: StreamID.h:48
std::list< std::string > const & context() const
Definition: Exception.cc:191
std::shared_ptr< ActivityRegistry > actReg_
auto wrap(F iFunc) -> decltype(iFunc())
ProcessContext const * processContext_
WorkerManager workerManager_
long double T
template<typename T >
void edm::GlobalSchedule::processOneGlobalAsync ( WaitingTaskHolder  holder,
typename T::MyPrincipal &  principal,
EventSetup const &  eventSetup,
bool  cleaningUpAfterException = false 
)

Definition at line 218 of file GlobalSchedule.h.

References edm::addContextAndPrintException(), cms::Exception::context(), edm::WaitingTaskHolder::doneWaiting(), edm::ExceptionFromThisContext, edm::ServiceRegistry::instance(), edm::StreamID::invalidStreamID(), edm::make_waiting_task(), edm::ServiceRegistry::presentToken(), groupFilesInBlocks::reverse, and edm::convertException::wrap().

221  {
223 
224  //need the doneTask to own the memory
225  auto globalContext = std::make_shared<GlobalContext>(T::makeGlobalContext(ep, processContext_));
226 
227  if(actReg_) {
228  T::preScheduleSignal(actReg_.get(), globalContext.get());
229  }
230 
231 
232  //If we are in an end transition, we need to reset failed items since they might
233  // be set this time around
234  if( not T::begin_) {
235  ep.resetFailedFromThisProcess();
236  }
237 
238  auto doneTask = make_waiting_task(tbb::task::allocate_root(),
239  [this,iHolder, cleaningUpAfterException, globalContext, token](std::exception_ptr const* iPtr) mutable
240  {
241  ServiceRegistry::Operate op(token);
242  std::exception_ptr excpt;
243  if(iPtr) {
244  excpt = *iPtr;
245  //add context information to the exception and print message
246  try {
247  convertException::wrap([&]() {
248  std::rethrow_exception(excpt);
249  });
250  } catch(cms::Exception& ex) {
251  //TODO: should add the transition type info
252  std::ostringstream ost;
253  if(ex.context().empty()) {
254  ost<<"Processing "<<T::transitionName()<<" ";
255  }
256  addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
257  excpt = std::current_exception();
258  }
259  if(actReg_) {
260  actReg_->preGlobalEarlyTerminationSignal_(*globalContext,TerminationOrigin::ExceptionFromThisContext);
261  }
262  }
263  if(actReg_) {
264  try {
265  T::postScheduleSignal(actReg_.get(), globalContext.get());
266  } catch(...) {
267  if(not excpt) {
268  excpt = std::current_exception();
269  }
270  }
271  }
272  iHolder.doneWaiting(excpt);
273 
274  });
276 
277  ParentContext parentContext(globalContext.get());
278  //make sure the ProductResolvers know about their
279  // workers to allow proper data dependency handling
281 
282  //make sure the task doesn't get run until all workers have beens started
283  WaitingTaskHolder holdForLoop(doneTask);
284  for(auto& worker: boost::adaptors::reverse((allWorkers()))) {
285  worker->doWorkAsync<T>(doneTask,ep,es,StreamID::invalidStreamID(),parentContext,globalContext.get());
286  }
287 
288  }
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
static StreamID invalidStreamID()
Definition: StreamID.h:48
ServiceToken presentToken() const
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
std::list< std::string > const & context() const
Definition: Exception.cc:191
static ServiceRegistry & instance()
std::shared_ptr< ActivityRegistry > actReg_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:90
auto wrap(F iFunc) -> decltype(iFunc())
ProcessContext const * processContext_
WorkerManager workerManager_
void setupOnDemandSystem(Principal &principal, EventSetup const &es)
long double T
void edm::GlobalSchedule::replaceModule ( maker::ModuleHolder iMod,
std::string const &  iLabel 
)

clone the type of module with label iLabel but configure with iPSet.

Definition at line 87 of file GlobalSchedule.cc.

References allWorkers(), edm::Worker::beginJob(), runEdmFileComparison::found, and edm::maker::ModuleHolder::replaceModuleFor().

88  {
89  Worker* found = nullptr;
90  for (auto const& worker : allWorkers()) {
91  if (worker->description().moduleLabel() == iLabel) {
92  found = worker;
93  break;
94  }
95  }
96  if (nullptr == found) {
97  return;
98  }
99 
100  iMod->replaceModuleFor(found);
101  found->beginJob();
102  }
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
template<typename T >
void edm::GlobalSchedule::runNow ( typename T::MyPrincipal const &  p,
EventSetup const &  es,
GlobalContext const *  context 
)
private

Definition at line 292 of file GlobalSchedule.h.

References cms::Exception::addContext(), cms::Exception::context(), edm::StreamID::invalidStreamID(), and AlCaHLTBitMon_ParallelJobs::p.

293  {
294  //do nothing for event since we will run when requested
295  for(auto & worker: allWorkers()) {
296  try {
297  ParentContext parentContext(context);
298  worker->doWork<T>(p, es,StreamID::invalidStreamID(), parentContext, context);
299  }
300  catch (cms::Exception & ex) {
301  if(ex.context().empty()) {
302  std::ostringstream ost;
303  ost << "Processing " <<T::transitionName()<<" "<< p.id();
304  ex.addContext(ost.str());
305  }
306  throw;
307  }
308  }
309  }
static StreamID invalidStreamID()
Definition: StreamID.h:48
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
std::list< std::string > const & context() const
Definition: Exception.cc:191
void addContext(std::string const &context)
Definition: Exception.cc:227
long double T
bool edm::GlobalSchedule::terminate ( ) const

Return whether each output module has reached its maximum count.

Member Data Documentation

std::shared_ptr<ActivityRegistry> edm::GlobalSchedule::actReg_
private

Definition at line 169 of file GlobalSchedule.h.

Referenced by GlobalSchedule().

std::vector<edm::propagate_const<WorkerPtr> > edm::GlobalSchedule::endPathStatusInserterWorkers_
private

Definition at line 172 of file GlobalSchedule.h.

Referenced by GlobalSchedule().

std::vector<edm::propagate_const<WorkerPtr> > edm::GlobalSchedule::pathStatusInserterWorkers_
private

Definition at line 171 of file GlobalSchedule.h.

Referenced by GlobalSchedule().

ProcessContext const* edm::GlobalSchedule::processContext_
private

Definition at line 174 of file GlobalSchedule.h.

edm::propagate_const<WorkerPtr> edm::GlobalSchedule::results_inserter_
private

Definition at line 170 of file GlobalSchedule.h.

Referenced by GlobalSchedule().

WorkerManager edm::GlobalSchedule::workerManager_
private

Definition at line 168 of file GlobalSchedule.h.

Referenced by addToAllWorkers(), beginJob(), endJob(), and GlobalSchedule().