CMS 3D CMS Logo

List of all members | Classes | Public Types | Public Member Functions | Private Member Functions | Private Attributes
edm::StreamSchedule Class Reference

#include <StreamSchedule.h>

Classes

class  SendTerminationSignalIfException
 

Public Types

typedef std::vector< Worker * > AllWorkers
 
typedef std::vector< PathNonTrigPaths
 
typedef std::vector< WorkerInPathPathWorkers
 
typedef std::vector< PathTrigPaths
 
typedef std::shared_ptr< HLTGlobalStatus const > TrigResConstPtr
 
typedef std::shared_ptr< HLTGlobalStatusTrigResPtr
 
typedef std::vector< std::string > vstring
 
typedef std::shared_ptr< WorkerWorkerPtr
 
typedef std::vector< Worker * > Workers
 

Public Member Functions

AllWorkers const & allWorkers () const
 returns the collection of pointers to workers More...
 
void availablePaths (std::vector< std::string > &oLabelsToFill) const
 adds to oLabelsToFill the labels for all paths in the process More...
 
void beginStream ()
 
void clearCounters ()
 Clear all the counters in the trigger report. More...
 
StreamContext const & context () const
 
void deleteModule (std::string const &iLabel)
 Delete the module with label iLabel. More...
 
void enableEndPaths (bool active)
 
bool endPathsEnabled () const
 
void endStream ()
 
std::vector< ModuleDescription const * > getAllModuleDescriptions () const
 
void getTriggerReport (TriggerReport &rep) const
 
void moduleDescriptionsInEndPath (std::string const &iEndPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
 
void moduleDescriptionsInPath (std::string const &iPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
 
void modulesInPath (std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
 adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel More...
 
unsigned int numberOfUnscheduledModules () const
 
void processOneEventAsync (WaitingTaskHolder iTask, EventTransitionInfo &, ServiceToken const &token, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
 
template<typename T >
void processOneStreamAsync (WaitingTaskHolder iTask, typename T::TransitionInfoType &transitionInfo, ServiceToken const &token, bool cleaningUpAfterException=false)
 
void replaceModule (maker::ModuleHolder *iMod, std::string const &iLabel)
 clone the type of module with label iLabel but configure with iPSet. More...
 
StreamID streamID () const
 
 StreamSchedule (std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, bool allowEarlyDelete, StreamID streamID, ProcessContext const *processContext)
 
 StreamSchedule (StreamSchedule const &)=delete
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 

Private Member Functions

ExceptionToActionTable const & actionTable () const
 returns the action table More...
 
void addToAllWorkers (Worker *w)
 
void fillEndPath (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames)
 
void fillTrigPath (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames)
 
void fillWorkers (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames)
 
void finishedPaths (std::atomic< std::exception_ptr * > &, WaitingTaskHolder, EventTransitionInfo &)
 
std::exception_ptr finishProcessOneEvent (std::exception_ptr)
 
void initializeEarlyDelete (ModuleRegistry &modReg, edm::ParameterSet const &opts, edm::ProductRegistry const &preg, bool allowEarlyDelete)
 
void makePathStatusInserters (std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
 
void reportSkipped (EventPrincipal const &ep) const
 
void resetAll ()
 
void resetEarlyDelete ()
 
TrigResPtrresults ()
 
TrigResConstPtr results () const
 

Private Attributes

std::shared_ptr< ActivityRegistryactReg_
 
std::vector< BranchToCountearlyDeleteBranchToCount_
 
std::vector< EarlyDeleteHelperearlyDeleteHelpers_
 
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
 
std::vector< int > empty_end_paths_
 
std::vector< int > empty_trig_paths_
 
TrigPaths end_paths_
 
volatile bool endpathsAreActive_
 
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
 
unsigned int number_of_unscheduled_modules_
 
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
 
edm::propagate_const< TrigResPtrresults_
 
edm::propagate_const< WorkerPtrresults_inserter_
 
std::atomic< bool > skippingEvent_
 
StreamContext streamContext_
 
StreamID streamID_
 
int total_events_
 
int total_passed_
 
TrigPaths trig_paths_
 
WorkerManager workerManager_
 

Detailed Description

Definition at line 152 of file StreamSchedule.h.

Member Typedef Documentation

◆ AllWorkers

typedef std::vector<Worker*> edm::StreamSchedule::AllWorkers

Definition at line 160 of file StreamSchedule.h.

◆ NonTrigPaths

Definition at line 156 of file StreamSchedule.h.

◆ PathWorkers

Definition at line 164 of file StreamSchedule.h.

◆ TrigPaths

typedef std::vector<Path> edm::StreamSchedule::TrigPaths

Definition at line 155 of file StreamSchedule.h.

◆ TrigResConstPtr

typedef std::shared_ptr<HLTGlobalStatus const> edm::StreamSchedule::TrigResConstPtr

Definition at line 158 of file StreamSchedule.h.

◆ TrigResPtr

Definition at line 157 of file StreamSchedule.h.

◆ vstring

typedef std::vector<std::string> edm::StreamSchedule::vstring

Definition at line 154 of file StreamSchedule.h.

◆ WorkerPtr

typedef std::shared_ptr<Worker> edm::StreamSchedule::WorkerPtr

Definition at line 159 of file StreamSchedule.h.

◆ Workers

typedef std::vector<Worker*> edm::StreamSchedule::Workers

Definition at line 162 of file StreamSchedule.h.

Constructor & Destructor Documentation

◆ StreamSchedule() [1/2]

edm::StreamSchedule::StreamSchedule ( std::shared_ptr< TriggerResultInserter inserter,
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters,
std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &  endPathStatusInserters,
std::shared_ptr< ModuleRegistry modReg,
ParameterSet proc_pset,
service::TriggerNamesService const &  tns,
PreallocationConfiguration const &  prealloc,
ProductRegistry pregistry,
BranchIDListHelper branchIDListHelper,
ExceptionToActionTable const &  actions,
std::shared_ptr< ActivityRegistry areg,
std::shared_ptr< ProcessConfiguration processConfiguration,
bool  allowEarlyDelete,
StreamID  streamID,
ProcessContext const *  processContext 
)

Definition at line 138 of file StreamSchedule.cc.

154  : workerManager_(modReg, areg, actions),
155  actReg_(areg),
156  results_(new HLTGlobalStatus(tns.getTrigPaths().size())),
158  trig_paths_(),
159  end_paths_(),
160  total_events_(),
161  total_passed_(),
164  streamContext_(streamID_, processContext),
165  endpathsAreActive_(true),
166  skippingEvent_(false) {
167  ParameterSet const& opts = proc_pset.getUntrackedParameterSet("options", ParameterSet());
168  bool hasPath = false;
169  std::vector<std::string> const& pathNames = tns.getTrigPaths();
170  std::vector<std::string> const& endPathNames = tns.getEndPaths();
171 
172  int trig_bitpos = 0;
173  trig_paths_.reserve(pathNames.size());
174  for (auto const& trig_name : pathNames) {
175  fillTrigPath(proc_pset, preg, &prealloc, processConfiguration, trig_bitpos, trig_name, results(), endPathNames);
176  ++trig_bitpos;
177  hasPath = true;
178  }
179 
180  if (hasPath) {
181  // the results inserter stands alone
182  inserter->setTrigResultForStream(streamID.value(), results());
183 
184  results_inserter_ = makeInserter(actions, actReg_, inserter);
186  }
187 
188  // fill normal endpaths
189  int bitpos = 0;
190  end_paths_.reserve(endPathNames.size());
191  for (auto const& end_path_name : endPathNames) {
192  fillEndPath(proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name, endPathNames);
193  ++bitpos;
194  }
195 
196  makePathStatusInserters(pathStatusInserters, endPathStatusInserters, actions);
197 
198  //See if all modules were used
199  std::set<std::string> usedWorkerLabels;
200  for (auto const& worker : allWorkers()) {
201  usedWorkerLabels.insert(worker->description()->moduleLabel());
202  }
203  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
204  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
205  std::vector<std::string> unusedLabels;
206  set_difference(modulesInConfigSet.begin(),
207  modulesInConfigSet.end(),
208  usedWorkerLabels.begin(),
209  usedWorkerLabels.end(),
210  back_inserter(unusedLabels));
211  std::set<std::string> unscheduledLabels;
212  std::vector<std::string> shouldBeUsedLabels;
213  if (!unusedLabels.empty()) {
214  //Need to
215  // 1) create worker
216  // 2) if it is a WorkerT<EDProducer>, add it to our list
217  // 3) hand list to our delayed reader
218  for (auto const& label : unusedLabels) {
219  bool isTracked;
220  ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
221  assert(isTracked);
222  assert(modulePSet != nullptr);
224  *modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
225  }
226  if (!shouldBeUsedLabels.empty()) {
227  std::ostringstream unusedStream;
228  unusedStream << "'" << shouldBeUsedLabels.front() << "'";
229  for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
230  itLabelEnd = shouldBeUsedLabels.end();
231  itLabel != itLabelEnd;
232  ++itLabel) {
233  unusedStream << ",'" << *itLabel << "'";
234  }
235  LogInfo("path") << "The following module labels are not assigned to any path:\n" << unusedStream.str() << "\n";
236  }
237  }
238  number_of_unscheduled_modules_ = unscheduledLabels.size();
239 
240  initializeEarlyDelete(*modReg, opts, preg, allowEarlyDelete);
241 
242  } // StreamSchedule::StreamSchedule

References actions, actReg_, addToAllWorkers(), edm::WorkerManager::addToUnscheduledWorkers(), allWorkers(), cms::cuda::assert(), end_paths_, fillEndPath(), fillTrigPath(), edm::propagate_const< T >::get(), edm::service::TriggerNamesService::getEndPaths(), edm::ParameterSet::getParameter(), edm::ParameterSet::getPSetForUpdate(), edm::service::TriggerNamesService::getTrigPaths(), edm::ParameterSet::getUntrackedParameterSet(), initializeEarlyDelete(), label, makePathStatusInserters(), number_of_unscheduled_modules_, geometryDiff::opts, results(), results_inserter_, streamID(), trig_paths_, edm::StreamID::value(), and workerManager_.

◆ StreamSchedule() [2/2]

edm::StreamSchedule::StreamSchedule ( StreamSchedule const &  )
delete

Member Function Documentation

◆ actionTable()

ExceptionToActionTable const& edm::StreamSchedule::actionTable ( ) const
inlineprivate

returns the action table

Definition at line 286 of file StreamSchedule.h.

286 { return workerManager_.actionTable(); }

References edm::WorkerManager::actionTable(), and workerManager_.

Referenced by fillEndPath(), fillTrigPath(), and finishedPaths().

◆ addToAllWorkers()

void edm::StreamSchedule::addToAllWorkers ( Worker w)
private

◆ allWorkers()

AllWorkers const& edm::StreamSchedule::allWorkers ( ) const
inline

returns the collection of pointers to workers

Definition at line 258 of file StreamSchedule.h.

258 { return workerManager_.allWorkers(); }

References edm::WorkerManager::allWorkers(), and workerManager_.

Referenced by clearCounters(), getAllModuleDescriptions(), getTriggerReport(), initializeEarlyDelete(), replaceModule(), and StreamSchedule().

◆ availablePaths()

void edm::StreamSchedule::availablePaths ( std::vector< std::string > &  oLabelsToFill) const

adds to oLabelsToFill the labels for all paths in the process

Definition at line 756 of file StreamSchedule.cc.

756  {
757  oLabelsToFill.reserve(trig_paths_.size());
758  std::transform(trig_paths_.begin(),
759  trig_paths_.end(),
760  std::back_inserter(oLabelsToFill),
761  std::bind(&Path::name, std::placeholders::_1));
762  }

References edm::Path::name(), HcalDetIdTransform::transform(), and trig_paths_.

◆ beginStream()

void edm::StreamSchedule::beginStream ( )

◆ clearCounters()

void edm::StreamSchedule::clearCounters ( )

Clear all the counters in the trigger report.

Definition at line 891 of file StreamSchedule.cc.

891  {
892  using std::placeholders::_1;
894  for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
895  for_all(end_paths_, std::bind(&Path::clearCounters, _1));
896  for_all(allWorkers(), std::bind(&Worker::clearCounters, _1));
897  }

References allWorkers(), edm::Path::clearCounters(), edm::Worker::clearCounters(), end_paths_, edm::for_all(), total_events_, total_passed_, and trig_paths_.

◆ context()

StreamContext const& edm::StreamSchedule::context ( ) const
inline

Definition at line 262 of file StreamSchedule.h.

262 { return streamContext_; }

References streamContext_.

◆ deleteModule()

void edm::StreamSchedule::deleteModule ( std::string const &  iLabel)

Delete the module with label iLabel.

Definition at line 550 of file StreamSchedule.cc.

References edm::WorkerManager::deleteModuleIfExists(), and workerManager_.

◆ enableEndPaths()

void edm::StreamSchedule::enableEndPaths ( bool  active)

Turn end_paths "off" if "active" is false; turn end_paths "on" if "active" is true.

Definition at line 835 of file StreamSchedule.cc.

835 { endpathsAreActive_ = active; }

References endpathsAreActive_.

◆ endPathsEnabled()

bool edm::StreamSchedule::endPathsEnabled ( ) const

Return true if end_paths are active, and false if they are inactive.

Definition at line 837 of file StreamSchedule.cc.

837 { return endpathsAreActive_; }

References endpathsAreActive_.

◆ endStream()

void edm::StreamSchedule::endStream ( )

◆ fillEndPath()

void edm::StreamSchedule::fillEndPath ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
int  bitpos,
std::string const &  name,
std::vector< std::string > const &  endPathNames 
)
private

Definition at line 501 of file StreamSchedule.cc.

507  {
508  PathWorkers tmpworkers;
509  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, true, tmpworkers, endPathNames);
510 
511  if (!tmpworkers.empty()) {
512  //EndPaths are not supposed to stop if SkipEvent type exception happens
513  end_paths_.emplace_back(bitpos,
514  name,
515  tmpworkers,
516  TrigResPtr(),
517  actionTable(),
518  actReg_,
520  nullptr,
522  } else {
523  empty_end_paths_.push_back(bitpos);
524  }
525  for (WorkerInPath const& workerInPath : tmpworkers) {
526  addToAllWorkers(workerInPath.getWorker());
527  }
528  }

References actionTable(), actReg_, addToAllWorkers(), empty_end_paths_, end_paths_, fillWorkers(), edm::PathContext::kEndPath, Skims_PA_cff::name, and streamContext_.

Referenced by StreamSchedule().

◆ fillTrigPath()

void edm::StreamSchedule::fillTrigPath ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
int  bitpos,
std::string const &  name,
TrigResPtr  trptr,
std::vector< std::string > const &  endPathNames 
)
private

Definition at line 471 of file StreamSchedule.cc.

478  {
479  PathWorkers tmpworkers;
480  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, false, tmpworkers, endPathNames);
481 
482  // an empty path will cause an extra bit that is not used
483  if (!tmpworkers.empty()) {
484  trig_paths_.emplace_back(bitpos,
485  name,
486  tmpworkers,
487  trptr,
488  actionTable(),
489  actReg_,
493  } else {
494  empty_trig_paths_.push_back(bitpos);
495  }
496  for (WorkerInPath const& workerInPath : tmpworkers) {
497  addToAllWorkers(workerInPath.getWorker());
498  }
499  }

References actionTable(), actReg_, addToAllWorkers(), empty_trig_paths_, fillWorkers(), edm::PathContext::kPath, Skims_PA_cff::name, skippingEvent_, streamContext_, and trig_paths_.

Referenced by StreamSchedule().

◆ fillWorkers()

void edm::StreamSchedule::fillWorkers ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
std::string const &  name,
bool  ignoreFilters,
PathWorkers out,
std::vector< std::string > const &  endPathNames 
)
private

Definition at line 400 of file StreamSchedule.cc.

407  {
408  vstring modnames = proc_pset.getParameter<vstring>(pathName);
409  PathWorkers tmpworkers;
410 
411  unsigned int placeInPath = 0;
412  for (auto const& name : modnames) {
413  //Modules except EDFilters are set to run concurrently by default
414  bool doNotRunConcurrently = false;
416  if (name[0] == '!') {
417  filterAction = WorkerInPath::Veto;
418  } else if (name[0] == '-' or name[0] == '+') {
419  filterAction = WorkerInPath::Ignore;
420  }
421  if (name[0] == '|' or name[0] == '+') {
422  //cms.wait was specified so do not run concurrently
423  doNotRunConcurrently = true;
424  }
425 
427  if (filterAction != WorkerInPath::Normal or name[0] == '|') {
428  moduleLabel.erase(0, 1);
429  }
430 
431  bool isTracked;
432  ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
433  if (modpset == nullptr) {
434  std::string pathType("endpath");
435  if (!search_all(endPathNames, pathName)) {
436  pathType = std::string("path");
437  }
439  << "The unknown module label \"" << moduleLabel << "\" appears in " << pathType << " \"" << pathName
440  << "\"\n please check spelling or remove that label from the path.";
441  }
442  assert(isTracked);
443 
444  Worker* worker = workerManager_.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel);
445  if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType() == Worker::kFilter) {
446  // We have a filter on an end path, and the filter is not explicitly ignored.
447  // See if the filter is allowed.
448  std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
449  if (!search_all(allowed_filters, worker->description()->moduleName())) {
450  // Filter is not allowed. Ignore the result, and issue a warning.
451  filterAction = WorkerInPath::Ignore;
452  LogWarning("FilterOnEndPath") << "The EDFilter '" << worker->description()->moduleName()
453  << "' with module label '" << moduleLabel << "' appears on EndPath '"
454  << pathName << "'.\n"
455  << "The return value of the filter will be ignored.\n"
456  << "To suppress this warning, either remove the filter from the endpath,\n"
457  << "or explicitly ignore it in the configuration by using cms.ignore().\n";
458  }
459  }
460  bool runConcurrently = not doNotRunConcurrently;
461  if (runConcurrently && worker->moduleType() == Worker::kFilter and filterAction != WorkerInPath::Ignore) {
462  runConcurrently = false;
463  }
464  tmpworkers.emplace_back(worker, filterAction, placeInPath, runConcurrently);
465  ++placeInPath;
466  }
467 
468  out.swap(tmpworkers);
469  }

References cms::cuda::assert(), edm::errors::Configuration, edm::Worker::description(), Exception, edm::ParameterSet::getParameter(), edm::ParameterSet::getPSetForUpdate(), edm::ParameterSet::getUntrackedParameter(), edm::WorkerManager::getWorker(), edm::WorkerInPath::Ignore, edm::Worker::kFilter, HerwigMaxPtPartonFilter_cfi::moduleLabel, edm::ModuleDescription::moduleName(), edm::Worker::moduleType(), Skims_PA_cff::name, edm::WorkerInPath::Normal, or, MillePedeFileConverter_cfg::out, hltMonBTagIPClient_cfi::pathName, edm::search_all(), AlCaHLTBitMon_QueryRunRegistry::string, edm::WorkerInPath::Veto, and workerManager_.

Referenced by fillEndPath(), and fillTrigPath().

◆ finishedPaths()

void edm::StreamSchedule::finishedPaths ( std::atomic< std::exception_ptr * > &  iExcept,
WaitingTaskHolder  iWait,
EventTransitionInfo info 
)
private

Definition at line 666 of file StreamSchedule.cc.

668  {
669  if (iExcept) {
670  // Caught exception is propagated via WaitingTaskHolder
671  CMS_SA_ALLOW try { std::rethrow_exception(*(iExcept.load())); } catch (cms::Exception& e) {
676  edm::printCmsExceptionWarning("SkipEvent", e);
677  *(iExcept.load()) = std::exception_ptr();
678  } else {
679  *(iExcept.load()) = std::current_exception();
680  }
681  } catch (...) {
682  *(iExcept.load()) = std::current_exception();
683  }
684  }
685 
686  if ((not iExcept) and results_->accept()) {
687  ++total_passed_;
688  }
689 
690  if (nullptr != results_inserter_.get()) {
691  // Caught exception is propagated to the caller
692  CMS_SA_ALLOW try {
693  //Even if there was an exception, we need to allow results inserter
694  // to run since some module may be waiting on its results.
695  ParentContext parentContext(&streamContext_);
696  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
697 
698  auto expt = results_inserter_->runModuleDirectly<Traits>(info, streamID_, parentContext, &streamContext_);
699  if (expt) {
700  std::rethrow_exception(expt);
701  }
702  } catch (cms::Exception& ex) {
703  if (not iExcept) {
704  if (ex.context().empty()) {
705  std::ostringstream ost;
706  ost << "Processing Event " << info.principal().id();
707  ex.addContext(ost.str());
708  }
709  iExcept.store(new std::exception_ptr(std::current_exception()));
710  }
711  } catch (...) {
712  if (not iExcept) {
713  iExcept.store(new std::exception_ptr(std::current_exception()));
714  }
715  }
716  }
717  std::exception_ptr ptr;
718  if (iExcept) {
719  ptr = *iExcept.load();
720  }
721  iWait.doneWaiting(ptr);
722  }

References writedatasetfile::action, actionTable(), cms::Exception::addContext(), cms::cuda::assert(), CMS_SA_ALLOW, cms::Exception::context(), edm::WaitingTaskHolder::doneWaiting(), MillePedeFileConverter_cfg::e, edm::exception_actions::FailPath, edm::ExceptionToActionTable::find(), edm::propagate_const< T >::get(), edm::exception_actions::IgnoreCompletely, info(), edm::printCmsExceptionWarning(), results_, results_inserter_, edm::exception_actions::SkipEvent, streamContext_, streamID_, and total_passed_.

Referenced by processOneEventAsync().

◆ finishProcessOneEvent()

std::exception_ptr edm::StreamSchedule::finishProcessOneEvent ( std::exception_ptr  iExcept)
private

Definition at line 724 of file StreamSchedule.cc.

724  {
725  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
726 
727  if (iExcept) {
728  //add context information to the exception and print message
729  try {
730  convertException::wrap([&]() { std::rethrow_exception(iExcept); });
731  } catch (cms::Exception& ex) {
732  bool const cleaningUpAfterException = false;
733  if (ex.context().empty()) {
734  addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
735  } else {
736  addContextAndPrintException("", ex, cleaningUpAfterException);
737  }
738  iExcept = std::current_exception();
739  }
740 
741  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
742  }
743  // Caught exception is propagated to the caller
744  CMS_SA_ALLOW try { Traits::postScheduleSignal(actReg_.get(), &streamContext_); } catch (...) {
745  if (not iExcept) {
746  iExcept = std::current_exception();
747  }
748  }
749  if (not iExcept) {
751  }
752 
753  return iExcept;
754  }

References actReg_, edm::addContextAndPrintException(), CMS_SA_ALLOW, cms::Exception::context(), edm::ExceptionFromThisContext, resetEarlyDelete(), streamContext_, and edm::convertException::wrap().

Referenced by processOneEventAsync().

◆ getAllModuleDescriptions()

std::vector< ModuleDescription const * > edm::StreamSchedule::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this StreamSchedule. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 552 of file StreamSchedule.cc.

552  {
553  std::vector<ModuleDescription const*> result;
554  result.reserve(allWorkers().size());
555 
556  for (auto const& worker : allWorkers()) {
557  ModuleDescription const* p = worker->description();
558  result.push_back(p);
559  }
560  return result;
561  }

References allWorkers(), AlCaHLTBitMon_ParallelJobs::p, mps_fire::result, and findQualityFiles::size.

◆ getTriggerReport()

void edm::StreamSchedule::getTriggerReport ( TriggerReport rep) const

Return the trigger report information on paths, modules-in-path, modules-in-endpath, and modules.

Definition at line 881 of file StreamSchedule.cc.

881  {
882  rep.eventSummary.totalEvents += totalEvents();
883  rep.eventSummary.totalEventsPassed += totalEventsPassed();
884  rep.eventSummary.totalEventsFailed += totalEventsFailed();
885 
886  fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
887  fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
888  fill_summary(allWorkers(), rep.workerSummaries, &fillWorkerSummary);
889  }

References allWorkers(), end_paths_, edm::fillPathSummary(), edm::fillWorkerSummary(), cuy::rep, totalEvents(), totalEventsFailed(), totalEventsPassed(), and trig_paths_.

◆ initializeEarlyDelete()

void edm::StreamSchedule::initializeEarlyDelete ( ModuleRegistry modReg,
edm::ParameterSet const &  opts,
edm::ProductRegistry const &  preg,
bool  allowEarlyDelete 
)
private

Definition at line 244 of file StreamSchedule.cc.

247  {
248  //for now, if have a subProcess, don't allow early delete
249  // In the future we should use the SubProcess's 'keep list' to decide what can be kept
250  if (not allowEarlyDelete)
251  return;
252 
253  //see if 'canDeleteEarly' was set and if so setup the list with those products actually
254  // registered for this job
255  std::multimap<std::string, Worker*> branchToReadingWorker;
256  initializeBranchToReadingWorker(opts, preg, branchToReadingWorker);
257 
258  //If no delete early items have been specified we don't have to do anything
259  if (branchToReadingWorker.empty()) {
260  return;
261  }
262  const std::vector<std::string> kEmpty;
263  std::map<Worker*, unsigned int> reserveSizeForWorker;
264  unsigned int upperLimitOnReadingWorker = 0;
265  unsigned int upperLimitOnIndicies = 0;
266  unsigned int nUniqueBranchesToDelete = branchToReadingWorker.size();
267 
268  //talk with output modules first
269  modReg.forAllModuleHolders([&branchToReadingWorker, &nUniqueBranchesToDelete](maker::ModuleHolder* iHolder) {
270  auto comm = iHolder->createOutputModuleCommunicator();
271  if (comm) {
272  if (!branchToReadingWorker.empty()) {
273  //If an OutputModule needs a product, we can't delete it early
274  // so we should remove it from our list
275  SelectedProductsForBranchType const& kept = comm->keptProducts();
276  for (auto const& item : kept[InEvent]) {
277  BranchDescription const& desc = *item.first;
278  auto found = branchToReadingWorker.equal_range(desc.branchName());
279  if (found.first != found.second) {
280  --nUniqueBranchesToDelete;
281  branchToReadingWorker.erase(found.first, found.second);
282  }
283  }
284  }
285  }
286  });
287 
288  if (branchToReadingWorker.empty()) {
289  return;
290  }
291 
292  for (auto w : allWorkers()) {
293  //determine if this module could read a branch we want to delete early
294  auto pset = pset::Registry::instance()->getMapped(w->description()->parameterSetID());
295  if (nullptr != pset) {
296  auto branches = pset->getUntrackedParameter<std::vector<std::string>>("mightGet", kEmpty);
297  if (not branches.empty()) {
298  ++upperLimitOnReadingWorker;
299  }
300  for (auto const& branch : branches) {
301  auto found = branchToReadingWorker.equal_range(branch);
302  if (found.first != found.second) {
303  ++upperLimitOnIndicies;
304  ++reserveSizeForWorker[w];
305  if (nullptr == found.first->second) {
306  found.first->second = w;
307  } else {
308  branchToReadingWorker.insert(make_pair(found.first->first, w));
309  }
310  }
311  }
312  }
313  }
314  {
315  auto it = branchToReadingWorker.begin();
316  std::vector<std::string> unusedBranches;
317  while (it != branchToReadingWorker.end()) {
318  if (it->second == nullptr) {
319  unusedBranches.push_back(it->first);
320  //erasing the object invalidates the iterator so must advance it first
321  auto temp = it;
322  ++it;
323  branchToReadingWorker.erase(temp);
324  } else {
325  ++it;
326  }
327  }
328  if (not unusedBranches.empty()) {
329  LogWarning l("UnusedProductsForCanDeleteEarly");
330  l << "The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
331  " If possible, remove the producer from the job or add the product to the producer's own 'mightGet' list.";
332  for (auto const& n : unusedBranches) {
333  l << "\n " << n;
334  }
335  }
336  }
337  if (!branchToReadingWorker.empty()) {
338  earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
339  earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies, 0);
340  earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
341  std::map<const Worker*, EarlyDeleteHelper*> alreadySeenWorkers;
342  std::string lastBranchName;
343  size_t nextOpenIndex = 0;
344  unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
345  for (auto& branchAndWorker : branchToReadingWorker) {
346  if (lastBranchName != branchAndWorker.first) {
347  //have to put back the period we removed earlier in order to get the proper name
348  BranchID bid(branchAndWorker.first + ".");
349  earlyDeleteBranchToCount_.emplace_back(bid, 0U);
350  lastBranchName = branchAndWorker.first;
351  }
352  auto found = alreadySeenWorkers.find(branchAndWorker.second);
353  if (alreadySeenWorkers.end() == found) {
354  //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
355  // all the branches that might be read by this worker. However, initially we will only tell the
356  // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
357  // EarlyDeleteHelper will automatically advance its internal end pointer.
358  size_t index = nextOpenIndex;
359  size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
361  earlyDeleteHelpers_.emplace_back(beginAddress + index, beginAddress + index + 1, &earlyDeleteBranchToCount_);
362  branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
363  alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second, &(earlyDeleteHelpers_.back())));
364  nextOpenIndex += nIndices;
365  } else {
366  found->second->appendIndex(earlyDeleteBranchToCount_.size() - 1);
367  }
368  }
369 
370  //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
371  // space needed for each module
372  auto itLast = earlyDeleteHelpers_.begin();
373  for (auto it = earlyDeleteHelpers_.begin() + 1; it != earlyDeleteHelpers_.end(); ++it) {
374  if (itLast->end() != it->begin()) {
375  //figure the offset for next Worker since it hasn't been moved yet so it has the original address
376  unsigned int delta = it->begin() - itLast->end();
377  it->shiftIndexPointers(delta);
378 
380  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
381  earlyDeleteHelperToBranchIndicies_.begin() + (it->begin() - beginAddress));
382  }
383  itLast = it;
384  }
386  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
388 
389  //now tell the paths about the deleters
390  for (auto& p : trig_paths_) {
391  p.setEarlyDeleteHelpers(alreadySeenWorkers);
392  }
393  for (auto& p : end_paths_) {
394  p.setEarlyDeleteHelpers(alreadySeenWorkers);
395  }
397  }
398  }

References allWorkers(), MicroEventContent_cff::branch, edm::maker::ModuleHolder::createOutputModuleCommunicator(), dumpMFGeometry_cfg::delta, submitPVResolutionJobs::desc, earlyDeleteBranchToCount_, earlyDeleteHelpers_, earlyDeleteHelperToBranchIndicies_, end_paths_, edm::ModuleRegistry::forAllModuleHolders(), newFWLiteAna::found, edm::pset::Registry::getMapped(), edm::InEvent, edm::pset::Registry::instance(), B2GTnPMonitor_cfi::item, cmsLHEtoEOSManager::l, dqmiodumpmetadata::n, geometryDiff::opts, AlCaHLTBitMon_ParallelJobs::p, muonDTDigis_cfi::pset, resetEarlyDelete(), AlCaHLTBitMon_QueryRunRegistry::string, groupFilesInBlocks::temp, trig_paths_, mitigatedMETSequence_cff::U, and w.

Referenced by StreamSchedule().

◆ makePathStatusInserters()

void edm::StreamSchedule::makePathStatusInserters ( std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters,
std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &  endPathStatusInserters,
ExceptionToActionTable const &  actions 
)
private

Definition at line 920 of file StreamSchedule.cc.

923  {
924  int bitpos = 0;
925  unsigned int indexEmpty = 0;
926  unsigned int indexOfPath = 0;
927  for (auto& pathStatusInserter : pathStatusInserters) {
928  std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
929  WorkerPtr workerPtr(
930  new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
931  pathStatusInserterWorkers_.emplace_back(workerPtr);
932  workerPtr->setActivityRegistry(actReg_);
933  addToAllWorkers(workerPtr.get());
934 
935  // A little complexity here because a C++ Path object is not
936  // instantiated and put into end_paths if there are no modules
937  // on the configured path.
938  if (indexEmpty < empty_trig_paths_.size() && bitpos == empty_trig_paths_.at(indexEmpty)) {
939  ++indexEmpty;
940  } else {
941  trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(), workerPtr.get());
942  ++indexOfPath;
943  }
944  ++bitpos;
945  }
946 
947  bitpos = 0;
948  indexEmpty = 0;
949  indexOfPath = 0;
950  for (auto& endPathStatusInserter : endPathStatusInserters) {
951  std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
952  WorkerPtr workerPtr(
953  new edm::WorkerT<EndPathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
954  endPathStatusInserterWorkers_.emplace_back(workerPtr);
955  workerPtr->setActivityRegistry(actReg_);
956  addToAllWorkers(workerPtr.get());
957 
958  // A little complexity here because a C++ Path object is not
959  // instantiated and put into end_paths if there are no modules
960  // on the configured path.
961  if (indexEmpty < empty_end_paths_.size() && bitpos == empty_end_paths_.at(indexEmpty)) {
962  ++indexEmpty;
963  } else {
964  end_paths_.at(indexOfPath).setPathStatusInserter(nullptr, workerPtr.get());
965  ++indexOfPath;
966  }
967  ++bitpos;
968  }
969  }

References actions, actReg_, addToAllWorkers(), empty_end_paths_, empty_trig_paths_, end_paths_, endPathStatusInserterWorkers_, edm::get_underlying(), pathStatusInserterWorkers_, and trig_paths_.

Referenced by StreamSchedule().

◆ moduleDescriptionsInEndPath()

void edm::StreamSchedule::moduleDescriptionsInEndPath ( std::string const &  iEndPathLabel,
std::vector< ModuleDescription const * > &  descriptions,
unsigned int  hint 
) const

Definition at line 806 of file StreamSchedule.cc.

808  {
809  descriptions.clear();
810  bool found = false;
811  TrigPaths::const_iterator itFound;
812 
813  if (hint < end_paths_.size()) {
814  itFound = end_paths_.begin() + hint;
815  if (itFound->name() == iEndPathLabel)
816  found = true;
817  }
818  if (!found) {
819  // if the hint did not work, do it the slow way
820  itFound = std::find_if(
821  end_paths_.begin(),
822  end_paths_.end(),
823  std::bind(std::equal_to<std::string>(), iEndPathLabel, std::bind(&Path::name, std::placeholders::_1)));
824  if (itFound != end_paths_.end())
825  found = true;
826  }
827  if (found) {
828  descriptions.reserve(itFound->size());
829  for (size_t i = 0; i < itFound->size(); ++i) {
830  descriptions.push_back(itFound->getWorker(i)->description());
831  }
832  }
833  }

References end_paths_, newFWLiteAna::found, mps_fire::i, and edm::Path::name().

◆ moduleDescriptionsInPath()

void edm::StreamSchedule::moduleDescriptionsInPath ( std::string const &  iPathLabel,
std::vector< ModuleDescription const * > &  descriptions,
unsigned int  hint 
) const

Definition at line 777 of file StreamSchedule.cc.

779  {
780  descriptions.clear();
781  bool found = false;
782  TrigPaths::const_iterator itFound;
783 
784  if (hint < trig_paths_.size()) {
785  itFound = trig_paths_.begin() + hint;
786  if (itFound->name() == iPathLabel)
787  found = true;
788  }
789  if (!found) {
790  // if the hint did not work, do it the slow way
791  itFound = std::find_if(
792  trig_paths_.begin(),
793  trig_paths_.end(),
794  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
795  if (itFound != trig_paths_.end())
796  found = true;
797  }
798  if (found) {
799  descriptions.reserve(itFound->size());
800  for (size_t i = 0; i < itFound->size(); ++i) {
801  descriptions.push_back(itFound->getWorker(i)->description());
802  }
803  }
804  }

References newFWLiteAna::found, mps_fire::i, edm::Path::name(), and trig_paths_.

◆ modulesInPath()

void edm::StreamSchedule::modulesInPath ( std::string const &  iPathLabel,
std::vector< std::string > &  oLabelsToFill 
) const

adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel

Definition at line 764 of file StreamSchedule.cc.

764  {
765  TrigPaths::const_iterator itFound = std::find_if(
766  trig_paths_.begin(),
767  trig_paths_.end(),
768  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
769  if (itFound != trig_paths_.end()) {
770  oLabelsToFill.reserve(itFound->size());
771  for (size_t i = 0; i < itFound->size(); ++i) {
772  oLabelsToFill.push_back(itFound->getWorker(i)->description()->moduleLabel());
773  }
774  }
775  }

References mps_fire::i, edm::Path::name(), and trig_paths_.

◆ numberOfUnscheduledModules()

unsigned int edm::StreamSchedule::numberOfUnscheduledModules ( ) const
inline

Definition at line 260 of file StreamSchedule.h.

References number_of_unscheduled_modules_.

◆ processOneEventAsync()

void edm::StreamSchedule::processOneEventAsync ( WaitingTaskHolder  iTask,
EventTransitionInfo info,
ServiceToken const &  token,
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters 
)

Definition at line 563 of file StreamSchedule.cc.

567  {
568  EventPrincipal& ep = info.principal();
569 
570  // Caught exception is propagated via WaitingTaskHolder
571  CMS_SA_ALLOW try {
572  this->resetAll();
573 
574  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
575 
576  Traits::setStreamContext(streamContext_, ep);
577  //a service may want to communicate with another service
578  ServiceRegistry::Operate guard(serviceToken);
579  Traits::preScheduleSignal(actReg_.get(), &streamContext_);
580 
581  HLTPathStatus hltPathStatus(hlt::Pass, 0);
582  for (int empty_trig_path : empty_trig_paths_) {
583  results_->at(empty_trig_path) = hltPathStatus;
584  pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
585  std::exception_ptr except = pathStatusInserterWorkers_[empty_trig_path]
586  ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
587  info, streamID_, ParentContext(&streamContext_), &streamContext_);
588  if (except) {
589  iTask.doneWaiting(except);
590  return;
591  }
592  }
593  for (int empty_end_path : empty_end_paths_) {
594  std::exception_ptr except = endPathStatusInserterWorkers_[empty_end_path]
595  ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
596  info, streamID_, ParentContext(&streamContext_), &streamContext_);
597  if (except) {
598  iTask.doneWaiting(except);
599  return;
600  }
601  }
602 
605 
606  ++total_events_;
607 
608  //use to give priorities on an error to ones from Paths
609  auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(nullptr);
610  auto pathErrorPtr = pathErrorHolder.get();
611  ServiceWeakToken weakToken = serviceToken;
612  auto allPathsDone = make_waiting_task(
613  [iTask, this, weakToken, pathError = std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable {
614  ServiceRegistry::Operate operate(weakToken.lock());
615 
616  std::exception_ptr ptr;
617  if (pathError->load()) {
618  ptr = *pathError->load();
619  delete pathError->load();
620  }
621  if ((not ptr) and iPtr) {
622  ptr = *iPtr;
623  }
624  iTask.doneWaiting(finishProcessOneEvent(ptr));
625  });
626  //The holder guarantees that if the paths finish before the loop ends
627  // that we do not start too soon. It also guarantees that the task will
628  // run under that condition.
629  WaitingTaskHolder allPathsHolder(*iTask.group(), allPathsDone);
630 
631  auto pathsDone = make_waiting_task([allPathsHolder, pathErrorPtr, transitionInfo = info, this, weakToken](
632  std::exception_ptr const* iPtr) mutable {
633  ServiceRegistry::Operate operate(weakToken.lock());
634 
635  if (iPtr) {
636  //this is used to prioritize this error over one
637  // that happens in EndPath or Accumulate
638  pathErrorPtr->store(new std::exception_ptr(*iPtr));
639  }
640  finishedPaths(*pathErrorPtr, std::move(allPathsHolder), transitionInfo);
641  });
642 
643  //The holder guarantees that if the paths finish before the loop ends
644  // that we do not start too soon. It also guarantees that the task will
645  // run under that condition.
646  WaitingTaskHolder taskHolder(*iTask.group(), pathsDone);
647 
648  //start end paths first so on single threaded the paths will run first
649  WaitingTaskHolder hAllPathsDone(*iTask.group(), allPathsDone);
650  for (auto it = end_paths_.rbegin(), itEnd = end_paths_.rend(); it != itEnd; ++it) {
651  it->processOneOccurrenceAsync(hAllPathsDone, info, serviceToken, streamID_, &streamContext_);
652  }
653 
654  for (auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend(); it != itEnd; ++it) {
655  it->processOneOccurrenceAsync(taskHolder, info, serviceToken, streamID_, &streamContext_);
656  }
657 
658  ParentContext parentContext(&streamContext_);
659  workerManager_.processAccumulatorsAsync<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
660  hAllPathsDone, info, serviceToken, streamID_, parentContext, &streamContext_);
661  } catch (...) {
662  iTask.doneWaiting(std::current_exception());
663  }
664  }

References actReg_, CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), empty_end_paths_, empty_trig_paths_, end_paths_, endPathStatusInserterWorkers_, SiStripBadComponentsDQMServiceTemplate_cfg::ep, finishedPaths(), finishProcessOneEvent(), edm::WaitingTaskHolder::group(), info(), edm::ServiceWeakToken::lock(), edm::make_waiting_task(), eostools::move(), edm::hlt::Pass, pathStatusInserterWorkers_, edm::WorkerManager::processAccumulatorsAsync(), resetAll(), results_, edm::WorkerManager::setupOnDemandSystem(), edm::WorkerManager::setupResolvers(), streamContext_, streamID_, total_events_, trig_paths_, and workerManager_.

◆ processOneStreamAsync()

template<typename T >
void edm::StreamSchedule::processOneStreamAsync ( WaitingTaskHolder  iTask,
typename T::TransitionInfoType &  transitionInfo,
ServiceToken const &  token,
bool  cleaningUpAfterException = false 
)

Definition at line 381 of file StreamSchedule.h.

384  {
385  auto const& principal = transitionInfo.principal();
386  T::setStreamContext(streamContext_, principal);
387 
388  auto id = principal.id();
389  ServiceWeakToken weakToken = token;
390  auto doneTask = make_waiting_task(
391  [this, iHolder, id, cleaningUpAfterException, weakToken](std::exception_ptr const* iPtr) mutable {
392  std::exception_ptr excpt;
393  if (iPtr) {
394  excpt = *iPtr;
395  //add context information to the exception and print message
396  try {
397  convertException::wrap([&]() { std::rethrow_exception(excpt); });
398  } catch (cms::Exception& ex) {
399  //TODO: should add the transition type info
400  std::ostringstream ost;
401  if (ex.context().empty()) {
402  ost << "Processing " << T::transitionName() << " " << id;
403  }
404  ServiceRegistry::Operate op(weakToken.lock());
405  addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
406  excpt = std::current_exception();
407  }
408 
409  ServiceRegistry::Operate op(weakToken.lock());
410  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
411  }
412  // Caught exception is propagated via WaitingTaskHolder
413  CMS_SA_ALLOW try {
414  ServiceRegistry::Operate op(weakToken.lock());
415  T::postScheduleSignal(actReg_.get(), &streamContext_);
416  } catch (...) {
417  if (not excpt) {
418  excpt = std::current_exception();
419  }
420  }
421  iHolder.doneWaiting(excpt);
422  });
423 
424  auto task = make_functor_task(
425  [this, h = WaitingTaskHolder(*iHolder.group(), doneTask), info = transitionInfo, weakToken]() mutable {
426  auto token = weakToken.lock();
428  // Caught exception is propagated via WaitingTaskHolder
429  CMS_SA_ALLOW try {
430  T::preScheduleSignal(actReg_.get(), &streamContext_);
431 
433  } catch (...) {
434  h.doneWaiting(std::current_exception());
435  return;
436  }
437 
438  for (auto& p : end_paths_) {
439  p.runAllModulesAsync<T>(h, info, token, streamID_, &streamContext_);
440  }
441 
442  for (auto& p : trig_paths_) {
443  p.runAllModulesAsync<T>(h, info, token, streamID_, &streamContext_);
444  }
445 
447  });
448 
449  if (streamID_.value() == 0) {
450  //Enqueueing will start another thread if there is only
451  // one thread in the job. Having stream == 0 use spawn
452  // avoids starting up another thread when there is only one stream.
453  iHolder.group()->run([task]() {
454  TaskSentry s{task};
455  task->execute();
456  });
457  } else {
458  tbb::task_arena arena{tbb::task_arena::attach()};
459  arena.enqueue([task]() {
460  TaskSentry s{task};
461  task->execute();
462  });
463  }
464  }

References actReg_, edm::addContextAndPrintException(), CMS_SA_ALLOW, cms::Exception::context(), edm::WaitingTaskHolder::doneWaiting(), end_paths_, edm::ExceptionFromThisContext, edm::WaitingTaskHolder::group(), h, triggerObjects_cff::id, info(), edm::ServiceWeakToken::lock(), edm::make_functor_task(), edm::make_waiting_task(), AlCaHLTBitMon_ParallelJobs::p, edm::WorkerManager::processOneOccurrenceAsync(), edm::WorkerManager::resetAll(), alignCSCRings::s, streamContext_, streamID_, TrackValidation_cff::task, unpackBuffers-CaloStage2::token, trig_paths_, edm::StreamID::value(), workerManager_, and edm::convertException::wrap().

◆ replaceModule()

void edm::StreamSchedule::replaceModule ( maker::ModuleHolder iMod,
std::string const &  iLabel 
)

clone the type of module with label iLabel but configure with iPSet.

Definition at line 534 of file StreamSchedule.cc.

534  {
535  Worker* found = nullptr;
536  for (auto const& worker : allWorkers()) {
537  if (worker->description()->moduleLabel() == iLabel) {
538  found = worker;
539  break;
540  }
541  }
542  if (nullptr == found) {
543  return;
544  }
545 
546  iMod->replaceModuleFor(found);
547  found->beginStream(streamID_, streamContext_);
548  }

References allWorkers(), newFWLiteAna::found, edm::maker::ModuleHolder::replaceModuleFor(), streamContext_, and streamID_.

◆ reportSkipped()

void edm::StreamSchedule::reportSkipped ( EventPrincipal const &  ep) const
inlineprivate

Definition at line 375 of file StreamSchedule.h.

375  {
376  Service<JobReport> reportSvc;
377  reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
378  }

References SiStripBadComponentsDQMServiceTemplate_cfg::ep.

◆ resetAll()

void edm::StreamSchedule::resetAll ( )
private

Definition at line 899 of file StreamSchedule.cc.

899  {
900  skippingEvent_ = false;
901  results_->reset();
902  }

References results_, and skippingEvent_.

Referenced by processOneEventAsync().

◆ resetEarlyDelete()

void edm::StreamSchedule::resetEarlyDelete ( )
private

Definition at line 906 of file StreamSchedule.cc.

906  {
907  //must be sure we have cleared the count first
908  for (auto& count : earlyDeleteBranchToCount_) {
909  count.count = 0;
910  }
911  //now reset based on how many helpers use that branch
913  ++(earlyDeleteBranchToCount_[index].count);
914  }
915  for (auto& helper : earlyDeleteHelpers_) {
916  helper.reset();
917  }
918  }

References submitPVResolutionJobs::count, earlyDeleteBranchToCount_, earlyDeleteHelpers_, and earlyDeleteHelperToBranchIndicies_.

Referenced by finishProcessOneEvent(), and initializeEarlyDelete().

◆ results() [1/2]

TrigResPtr& edm::StreamSchedule::results ( )
inlineprivate

Definition at line 329 of file StreamSchedule.h.

329 { return get_underlying_safe(results_); }

References edm::get_underlying_safe(), and results_.

◆ results() [2/2]

TrigResConstPtr edm::StreamSchedule::results ( ) const
inlineprivate

Definition at line 328 of file StreamSchedule.h.

328 { return get_underlying_safe(results_); }

References edm::get_underlying_safe(), and results_.

Referenced by StreamSchedule().

◆ streamID()

StreamID edm::StreamSchedule::streamID ( ) const
inline

Definition at line 199 of file StreamSchedule.h.

199 { return streamID_; }

References streamID_.

Referenced by StreamSchedule().

◆ totalEvents()

int edm::StreamSchedule::totalEvents ( ) const
inline

Return the number of events this StreamSchedule has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 226 of file StreamSchedule.h.

226 { return total_events_; }

References total_events_.

Referenced by getTriggerReport(), and totalEventsFailed().

◆ totalEventsFailed()

int edm::StreamSchedule::totalEventsFailed ( ) const
inline

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 234 of file StreamSchedule.h.

234 { return totalEvents() - totalEventsPassed(); }

References totalEvents(), and totalEventsPassed().

Referenced by getTriggerReport().

◆ totalEventsPassed()

int edm::StreamSchedule::totalEventsPassed ( ) const
inline

Return the number of events which have been passed by one or more trigger paths.

Definition at line 230 of file StreamSchedule.h.

230 { return total_passed_; }

References total_passed_.

Referenced by getTriggerReport(), and totalEventsFailed().

Member Data Documentation

◆ actReg_

std::shared_ptr<ActivityRegistry> edm::StreamSchedule::actReg_
private

◆ earlyDeleteBranchToCount_

std::vector<BranchToCount> edm::StreamSchedule::earlyDeleteBranchToCount_
private

Definition at line 353 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

◆ earlyDeleteHelpers_

std::vector<EarlyDeleteHelper> edm::StreamSchedule::earlyDeleteHelpers_
private

Definition at line 363 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

◆ earlyDeleteHelperToBranchIndicies_

std::vector<unsigned int> edm::StreamSchedule::earlyDeleteHelperToBranchIndicies_
private

Definition at line 360 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

◆ empty_end_paths_

std::vector<int> edm::StreamSchedule::empty_end_paths_
private

Definition at line 348 of file StreamSchedule.h.

Referenced by fillEndPath(), makePathStatusInserters(), and processOneEventAsync().

◆ empty_trig_paths_

std::vector<int> edm::StreamSchedule::empty_trig_paths_
private

Definition at line 347 of file StreamSchedule.h.

Referenced by fillTrigPath(), makePathStatusInserters(), and processOneEventAsync().

◆ end_paths_

TrigPaths edm::StreamSchedule::end_paths_
private

◆ endpathsAreActive_

volatile bool edm::StreamSchedule::endpathsAreActive_
private

Definition at line 371 of file StreamSchedule.h.

Referenced by enableEndPaths(), and endPathsEnabled().

◆ endPathStatusInserterWorkers_

std::vector<edm::propagate_const<WorkerPtr> > edm::StreamSchedule::endPathStatusInserterWorkers_
private

Definition at line 343 of file StreamSchedule.h.

Referenced by makePathStatusInserters(), and processOneEventAsync().

◆ number_of_unscheduled_modules_

unsigned int edm::StreamSchedule::number_of_unscheduled_modules_
private

Definition at line 367 of file StreamSchedule.h.

Referenced by numberOfUnscheduledModules(), and StreamSchedule().

◆ pathStatusInserterWorkers_

std::vector<edm::propagate_const<WorkerPtr> > edm::StreamSchedule::pathStatusInserterWorkers_
private

Definition at line 342 of file StreamSchedule.h.

Referenced by makePathStatusInserters(), and processOneEventAsync().

◆ results_

edm::propagate_const<TrigResPtr> edm::StreamSchedule::results_
private

Definition at line 339 of file StreamSchedule.h.

Referenced by finishedPaths(), processOneEventAsync(), resetAll(), and results().

◆ results_inserter_

edm::propagate_const<WorkerPtr> edm::StreamSchedule::results_inserter_
private

Definition at line 341 of file StreamSchedule.h.

Referenced by finishedPaths(), and StreamSchedule().

◆ skippingEvent_

std::atomic<bool> edm::StreamSchedule::skippingEvent_
private

Definition at line 372 of file StreamSchedule.h.

Referenced by fillTrigPath(), and resetAll().

◆ streamContext_

StreamContext edm::StreamSchedule::streamContext_
private

◆ streamID_

StreamID edm::StreamSchedule::streamID_
private

◆ total_events_

int edm::StreamSchedule::total_events_
private

Definition at line 365 of file StreamSchedule.h.

Referenced by clearCounters(), processOneEventAsync(), and totalEvents().

◆ total_passed_

int edm::StreamSchedule::total_passed_
private

Definition at line 366 of file StreamSchedule.h.

Referenced by clearCounters(), finishedPaths(), and totalEventsPassed().

◆ trig_paths_

TrigPaths edm::StreamSchedule::trig_paths_
private

◆ workerManager_

WorkerManager edm::StreamSchedule::workerManager_
private
edm::StreamSchedule::fillEndPath
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames)
Definition: StreamSchedule.cc:501
edm::StreamSchedule::addToAllWorkers
void addToAllWorkers(Worker *w)
Definition: StreamSchedule.cc:904
edm::pset::Registry::instance
static Registry * instance()
Definition: Registry.cc:12
edm::PathContext::PathType::kPath
edm::WorkerManager::addToAllWorkers
void addToAllWorkers(Worker *w)
Definition: WorkerManager.cc:135
edm::TerminationOrigin::ExceptionFromThisContext
mps_fire.i
i
Definition: mps_fire.py:428
edm::StreamSchedule::endPathStatusInserterWorkers_
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
Definition: StreamSchedule.h:343
edm::StreamSchedule::empty_end_paths_
std::vector< int > empty_end_paths_
Definition: StreamSchedule.h:348
dqmiodumpmetadata.n
n
Definition: dqmiodumpmetadata.py:28
edm::StreamSchedule::results
TrigResConstPtr results() const
Definition: StreamSchedule.h:328
cms::Exception::addContext
void addContext(std::string const &context)
Definition: Exception.cc:165
edm::StreamSchedule::makePathStatusInserters
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
Definition: StreamSchedule.cc:920
edm::StreamSchedule::fillWorkers
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames)
Definition: StreamSchedule.cc:400
edm::WorkerManager::resetAll
void resetAll()
Definition: WorkerManager.cc:133
edm::StreamSchedule::workerManager_
WorkerManager workerManager_
Definition: StreamSchedule.h:336
MicroEventContent_cff.branch
branch
Definition: MicroEventContent_cff.py:174
edm::StreamSchedule::streamContext_
StreamContext streamContext_
Definition: StreamSchedule.h:370
edm::printCmsExceptionWarning
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
Definition: ExceptionMessages.cc:25
edm::StreamSchedule::earlyDeleteBranchToCount_
std::vector< BranchToCount > earlyDeleteBranchToCount_
Definition: StreamSchedule.h:353
edm::StreamSchedule::total_events_
int total_events_
Definition: StreamSchedule.h:365
edm::WorkerManager::addToUnscheduledWorkers
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
Definition: WorkerManager.cc:52
edm::StreamSchedule::PathWorkers
std::vector< WorkerInPath > PathWorkers
Definition: StreamSchedule.h:164
edm::StreamSchedule::totalEventsPassed
int totalEventsPassed() const
Definition: StreamSchedule.h:230
edm::fillWorkerSummary
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
Definition: StreamSchedule.cc:879
cms::cuda::assert
assert(be >=bs)
edm::propagate_const::get
constexpr element_type const * get() const
Definition: propagate_const.h:64
info
static const TGPicture * info(bool iBackgroundIsBlack)
Definition: FWCollectionSummaryWidget.cc:153
edm::StreamSchedule::allWorkers
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
Definition: StreamSchedule.h:258
edm::WorkerManager::actionTable
ExceptionToActionTable const & actionTable() const
Definition: WorkerManager.h:89
edm::StreamID::value
unsigned int value() const
Definition: StreamID.h:43
edm::get_underlying_safe
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
Definition: get_underlying_safe.h:41
edm::WorkerT
Definition: Frameworkfwd.h:63
edm::StreamSchedule::WorkerPtr
std::shared_ptr< Worker > WorkerPtr
Definition: StreamSchedule.h:159
edm::LogInfo
Log< level::Info, false > LogInfo
Definition: MessageLogger.h:125
h
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
newFWLiteAna.found
found
Definition: newFWLiteAna.py:118
groupFilesInBlocks.temp
list temp
Definition: groupFilesInBlocks.py:142
edm::LogWarning
Log< level::Warning, false > LogWarning
Definition: MessageLogger.h:122
CMS_SA_ALLOW
#define CMS_SA_ALLOW
Definition: thread_safety_macros.h:5
edm::StreamSchedule::results_inserter_
edm::propagate_const< WorkerPtr > results_inserter_
Definition: StreamSchedule.h:341
edm::for_all
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::make_functor_task
FunctorTask< F > * make_functor_task(F f)
Definition: FunctorTask.h:44
alignCSCRings.s
s
Definition: alignCSCRings.py:92
edm::Worker::kFilter
Definition: Worker.h:94
edm::StreamSchedule::number_of_unscheduled_modules_
unsigned int number_of_unscheduled_modules_
Definition: StreamSchedule.h:367
edm::StreamSchedule::totalEvents
int totalEvents() const
Definition: StreamSchedule.h:226
hltMonBTagIPClient_cfi.pathName
pathName
Definition: hltMonBTagIPClient_cfi.py:5
w
const double w
Definition: UKUtility.cc:23
edm::convertException::wrap
auto wrap(F iFunc) -> decltype(iFunc())
Definition: ConvertException.h:19
TrackValidation_cff.task
task
Definition: TrackValidation_cff.py:253
edm::WorkerInPath::Ignore
Definition: WorkerInPath.h:27
edm::WorkerManager::allWorkers
AllWorkers const & allWorkers() const
Definition: WorkerManager.h:85
edm::exception_actions::FailPath
Definition: ExceptionActions.h:11
Service
WaitingTaskHolder
submitPVResolutionJobs.count
count
Definition: submitPVResolutionJobs.py:352
HcalDetIdTransform::transform
unsigned transform(const HcalDetId &id, unsigned transformCode)
Definition: HcalDetIdTransform.cc:7
edm::StreamSchedule::actionTable
ExceptionToActionTable const & actionTable() const
returns the action table
Definition: StreamSchedule.h:286
edm::InEvent
Definition: BranchType.h:11
h
edm::exception_actions::SkipEvent
Definition: ExceptionActions.h:11
edm::WorkerInPath::Normal
Definition: WorkerInPath.h:27
edm::StreamSchedule::results_
edm::propagate_const< TrigResPtr > results_
Definition: StreamSchedule.h:339
edm::StreamSchedule::empty_trig_paths_
std::vector< int > empty_trig_paths_
Definition: StreamSchedule.h:347
edm::StreamSchedule::initializeEarlyDelete
void initializeEarlyDelete(ModuleRegistry &modReg, edm::ParameterSet const &opts, edm::ProductRegistry const &preg, bool allowEarlyDelete)
Definition: StreamSchedule.cc:244
mitigatedMETSequence_cff.U
U
Definition: mitigatedMETSequence_cff.py:36
edm::WorkerInPath::FilterAction
FilterAction
Definition: WorkerInPath.h:27
edm::WorkerManager::beginStream
void beginStream(StreamID iID, StreamContext &streamContext)
Definition: WorkerManager.cc:121
edm::ServiceRegistry::Operate
friend class Operate
Definition: ServiceRegistry.h:54
cms::Exception::context
std::list< std::string > const & context() const
Definition: Exception.cc:147
AlCaHLTBitMon_ParallelJobs.p
def p
Definition: AlCaHLTBitMon_ParallelJobs.py:153
edm::make_waiting_task
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:101
ParameterSet
Definition: Functions.h:16
edm::StreamSchedule::skippingEvent_
std::atomic< bool > skippingEvent_
Definition: StreamSchedule.h:372
edm::WorkerManager::setupOnDemandSystem
void setupOnDemandSystem(EventTransitionInfo const &)
Definition: WorkerManager.cc:150
edm::StreamSchedule::actReg_
std::shared_ptr< ActivityRegistry > actReg_
Definition: StreamSchedule.h:337
dumpMFGeometry_cfg.delta
delta
Definition: dumpMFGeometry_cfg.py:25
helper
Definition: helper.py:1
edm::get_underlying
constexpr T & get_underlying(propagate_const< T > &)
Definition: propagate_const.h:103
edm::addContextAndPrintException
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
Definition: ExceptionHelpers.cc:11
edm::StreamSchedule::fillTrigPath
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames)
Definition: StreamSchedule.cc:471
writedatasetfile.action
action
Definition: writedatasetfile.py:8
cuy.rep
rep
Definition: cuy.py:1189
edm::StreamSchedule::end_paths_
TrigPaths end_paths_
Definition: StreamSchedule.h:346
edm::WorkerManager::processAccumulatorsAsync
void processAccumulatorsAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: WorkerManager.h:119
B2GTnPMonitor_cfi.item
item
Definition: B2GTnPMonitor_cfi.py:147
edm::fillPathSummary
static void fillPathSummary(Path const &path, PathSummary &sum)
Definition: StreamSchedule.cc:847
AlCaHLTBitMon_QueryRunRegistry.string
string string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
edm::StreamSchedule::streamID_
StreamID streamID_
Definition: StreamSchedule.h:369
cmsLHEtoEOSManager.l
l
Definition: cmsLHEtoEOSManager.py:204
edm::StreamSchedule::resetAll
void resetAll()
Definition: StreamSchedule.cc:899
edm::StreamSchedule::finishedPaths
void finishedPaths(std::atomic< std::exception_ptr * > &, WaitingTaskHolder, EventTransitionInfo &)
Definition: StreamSchedule.cc:666
edm::Worker::clearCounters
void clearCounters()
Definition: Worker.h:222
submitPVResolutionJobs.desc
string desc
Definition: submitPVResolutionJobs.py:251
edm::StreamSchedule::earlyDeleteHelpers_
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
Definition: StreamSchedule.h:363
edm::SelectedProductsForBranchType
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
Definition: SelectedProducts.h:13
eostools.move
def move(src, dest)
Definition: eostools.py:511
edm::StreamSchedule::TrigResPtr
std::shared_ptr< HLTGlobalStatus > TrigResPtr
Definition: StreamSchedule.h:157
edm::Path::name
std::string const & name() const
Definition: Path.h:74
triggerObjects_cff.id
id
Definition: triggerObjects_cff.py:29
T
long double T
Definition: Basic3DVectorLD.h:48
Exception
Definition: hltDiff.cc:245
edm::StreamSchedule::trig_paths_
TrigPaths trig_paths_
Definition: StreamSchedule.h:345
edm::PathContext::PathType::kEndPath
edm::WorkerInPath::Veto
Definition: WorkerInPath.h:27
edm::StreamSchedule::resetEarlyDelete
void resetEarlyDelete()
Definition: StreamSchedule.cc:906
Skims_PA_cff.name
name
Definition: Skims_PA_cff.py:17
or
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
vstring
vector< string > vstring
Definition: ExoticaDQM.cc:8
edm::ExceptionToActionTable::find
exception_actions::ActionCodes find(const std::string &category) const
Definition: ExceptionActions.cc:85
edm::WorkerManager::setupResolvers
void setupResolvers(Principal &principal)
Definition: WorkerManager.cc:141
edm::WorkerManager::endStream
void endStream(StreamID iID, StreamContext &streamContext)
Definition: WorkerManager.cc:127
AlignmentPI::index
index
Definition: AlignmentPayloadInspectorHelper.h:46
edm::WorkerManager::getWorker
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
Definition: WorkerManager.cc:43
MillePedeFileConverter_cfg.out
out
Definition: MillePedeFileConverter_cfg.py:31
edm::WorkerManager::processOneOccurrenceAsync
void processOneOccurrenceAsync(WaitingTaskHolder, typename T::TransitionInfoType &, ServiceToken const &, StreamID, typename T::Context const *topContext, U const *context)
Definition: WorkerManager.h:108
actions
roAction_t actions[nactions]
Definition: GenABIO.cc:181
mps_fire.result
result
Definition: mps_fire.py:311
edm::search_all
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:36
cms::Exception
Definition: Exception.h:70
edm::StreamSchedule::streamID
StreamID streamID() const
Definition: StreamSchedule.h:199
HerwigMaxPtPartonFilter_cfi.moduleLabel
moduleLabel
Definition: HerwigMaxPtPartonFilter_cfi.py:4
edm::StreamSchedule::pathStatusInserterWorkers_
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
Definition: StreamSchedule.h:342
edm::hlt::Pass
accept
Definition: HLTenums.h:18
edm::exception_actions::IgnoreCompletely
Definition: ExceptionActions.h:11
edm::StreamSchedule::earlyDeleteHelperToBranchIndicies_
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
Definition: StreamSchedule.h:360
edm::Path::clearCounters
void clearCounters()
Definition: Path.cc:198
edm::WorkerManager::deleteModuleIfExists
void deleteModuleIfExists(std::string const &moduleLabel)
Definition: WorkerManager.cc:33
edm::StreamSchedule::finishProcessOneEvent
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
Definition: StreamSchedule.cc:724
edm::StreamSchedule::totalEventsFailed
int totalEventsFailed() const
Definition: StreamSchedule.h:234
edm::pset::Registry::getMapped
bool getMapped(key_type const &k, value_type &result) const
Definition: Registry.cc:17
edm::errors::Configuration
Definition: EDMException.h:36
SiStripBadComponentsDQMServiceTemplate_cfg.ep
ep
Definition: SiStripBadComponentsDQMServiceTemplate_cfg.py:86
label
const char * label
Definition: PFTauDecayModeTools.cc:11
edm::StreamSchedule::endpathsAreActive_
volatile bool endpathsAreActive_
Definition: StreamSchedule.h:371
muonDTDigis_cfi.pset
pset
Definition: muonDTDigis_cfi.py:27
geometryDiff.opts
opts
Definition: geometryDiff.py:11
edm::StreamSchedule::total_passed_
int total_passed_
Definition: StreamSchedule.h:366
edm::exception_actions::ActionCodes
ActionCodes
Definition: ExceptionActions.h:11
findQualityFiles.size
size
Write out results.
Definition: findQualityFiles.py:443
MillePedeFileConverter_cfg.e
e
Definition: MillePedeFileConverter_cfg.py:37
unpackBuffers-CaloStage2.token
token
Definition: unpackBuffers-CaloStage2.py:316