CMS 3D CMS Logo

List of all members | Classes | Public Types | Public Member Functions | Private Member Functions | Private Attributes
edm::StreamSchedule Class Reference

#include <StreamSchedule.h>

Classes

class  SendTerminationSignalIfException
 

Public Types

typedef std::vector< Worker * > AllWorkers
 
typedef std::vector< PathNonTrigPaths
 
typedef std::vector< WorkerInPathPathWorkers
 
typedef std::vector< PathTrigPaths
 
typedef std::shared_ptr< HLTGlobalStatus const > TrigResConstPtr
 
typedef std::shared_ptr< HLTGlobalStatusTrigResPtr
 
typedef std::vector< std::string > vstring
 
typedef std::shared_ptr< WorkerWorkerPtr
 
typedef std::vector< Worker * > Workers
 

Public Member Functions

AllWorkers const & allWorkers () const
 returns the collection of pointers to workers More...
 
void availablePaths (std::vector< std::string > &oLabelsToFill) const
 adds to oLabelsToFill the labels for all paths in the process More...
 
void beginStream ()
 
void clearCounters ()
 Clear all the counters in the trigger report. More...
 
StreamContext const & context () const
 
void enableEndPaths (bool active)
 
bool endPathsEnabled () const
 
void endStream ()
 
std::vector< ModuleDescription const * > getAllModuleDescriptions () const
 
void getTriggerReport (TriggerReport &rep) const
 
void moduleDescriptionsInEndPath (std::string const &iEndPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
 
void moduleDescriptionsInPath (std::string const &iPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
 
void modulesInPath (std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
 adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel More...
 
unsigned int numberOfUnscheduledModules () const
 
void processOneEventAsync (WaitingTaskHolder iTask, EventPrincipal &ep, EventSetupImpl const &es, ServiceToken const &token, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
 
template<typename T >
void processOneStreamAsync (WaitingTaskHolder iTask, typename T::MyPrincipal &principal, EventSetupImpl const &eventSetup, ServiceToken const &token, bool cleaningUpAfterException=false)
 
void replaceModule (maker::ModuleHolder *iMod, std::string const &iLabel)
 clone the type of module with label iLabel but configure with iPSet. More...
 
StreamID streamID () const
 
 StreamSchedule (std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, bool allowEarlyDelete, StreamID streamID, ProcessContext const *processContext)
 
 StreamSchedule (StreamSchedule const &)=delete
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 

Private Member Functions

ExceptionToActionTable const & actionTable () const
 returns the action table More...
 
void addToAllWorkers (Worker *w)
 
void fillEndPath (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames)
 
void fillTrigPath (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames)
 
void fillWorkers (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames)
 
void finishedPaths (std::atomic< std::exception_ptr * > &, WaitingTaskHolder, EventPrincipal &ep, EventSetupImpl const &es)
 
std::exception_ptr finishProcessOneEvent (std::exception_ptr)
 
void initializeEarlyDelete (ModuleRegistry &modReg, edm::ParameterSet const &opts, edm::ProductRegistry const &preg, bool allowEarlyDelete)
 
void makePathStatusInserters (std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
 
void reportSkipped (EventPrincipal const &ep) const
 
void resetAll ()
 
void resetEarlyDelete ()
 
TrigResPtrresults ()
 
TrigResConstPtr results () const
 

Private Attributes

std::shared_ptr< ActivityRegistryactReg_
 
std::vector< BranchToCountearlyDeleteBranchToCount_
 
std::vector< EarlyDeleteHelperearlyDeleteHelpers_
 
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
 
std::vector< int > empty_end_paths_
 
std::vector< int > empty_trig_paths_
 
TrigPaths end_paths_
 
volatile bool endpathsAreActive_
 
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
 
unsigned int number_of_unscheduled_modules_
 
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
 
edm::propagate_const< TrigResPtrresults_
 
edm::propagate_const< WorkerPtrresults_inserter_
 
std::atomic< bool > skippingEvent_
 
StreamContext streamContext_
 
StreamID streamID_
 
int total_events_
 
int total_passed_
 
TrigPaths trig_paths_
 
WorkerManager workerManager_
 

Detailed Description

Definition at line 152 of file StreamSchedule.h.

Member Typedef Documentation

◆ AllWorkers

typedef std::vector<Worker*> edm::StreamSchedule::AllWorkers

Definition at line 160 of file StreamSchedule.h.

◆ NonTrigPaths

Definition at line 156 of file StreamSchedule.h.

◆ PathWorkers

Definition at line 164 of file StreamSchedule.h.

◆ TrigPaths

typedef std::vector<Path> edm::StreamSchedule::TrigPaths

Definition at line 155 of file StreamSchedule.h.

◆ TrigResConstPtr

typedef std::shared_ptr<HLTGlobalStatus const> edm::StreamSchedule::TrigResConstPtr

Definition at line 158 of file StreamSchedule.h.

◆ TrigResPtr

Definition at line 157 of file StreamSchedule.h.

◆ vstring

typedef std::vector<std::string> edm::StreamSchedule::vstring

Definition at line 154 of file StreamSchedule.h.

◆ WorkerPtr

typedef std::shared_ptr<Worker> edm::StreamSchedule::WorkerPtr

Definition at line 159 of file StreamSchedule.h.

◆ Workers

typedef std::vector<Worker*> edm::StreamSchedule::Workers

Definition at line 162 of file StreamSchedule.h.

Constructor & Destructor Documentation

◆ StreamSchedule() [1/2]

edm::StreamSchedule::StreamSchedule ( std::shared_ptr< TriggerResultInserter inserter,
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters,
std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &  endPathStatusInserters,
std::shared_ptr< ModuleRegistry modReg,
ParameterSet proc_pset,
service::TriggerNamesService const &  tns,
PreallocationConfiguration const &  prealloc,
ProductRegistry pregistry,
BranchIDListHelper branchIDListHelper,
ExceptionToActionTable const &  actions,
std::shared_ptr< ActivityRegistry areg,
std::shared_ptr< ProcessConfiguration processConfiguration,
bool  allowEarlyDelete,
StreamID  streamID,
ProcessContext const *  processContext 
)

Definition at line 137 of file StreamSchedule.cc.

153  : workerManager_(modReg, areg, actions),
154  actReg_(areg),
155  results_(new HLTGlobalStatus(tns.getTrigPaths().size())),
157  trig_paths_(),
158  end_paths_(),
159  total_events_(),
160  total_passed_(),
163  streamContext_(streamID_, processContext),
164  endpathsAreActive_(true),
165  skippingEvent_(false) {
166  ParameterSet const& opts = proc_pset.getUntrackedParameterSet("options", ParameterSet());
167  bool hasPath = false;
168  std::vector<std::string> const& pathNames = tns.getTrigPaths();
169  std::vector<std::string> const& endPathNames = tns.getEndPaths();
170 
171  int trig_bitpos = 0;
172  trig_paths_.reserve(pathNames.size());
173  for (auto const& trig_name : pathNames) {
174  fillTrigPath(proc_pset, preg, &prealloc, processConfiguration, trig_bitpos, trig_name, results(), endPathNames);
175  ++trig_bitpos;
176  hasPath = true;
177  }
178 
179  if (hasPath) {
180  // the results inserter stands alone
181  inserter->setTrigResultForStream(streamID.value(), results());
182 
183  results_inserter_ = makeInserter(actions, actReg_, inserter);
185  }
186 
187  // fill normal endpaths
188  int bitpos = 0;
189  end_paths_.reserve(endPathNames.size());
190  for (auto const& end_path_name : endPathNames) {
191  fillEndPath(proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name, endPathNames);
192  ++bitpos;
193  }
194 
195  makePathStatusInserters(pathStatusInserters, endPathStatusInserters, actions);
196 
197  //See if all modules were used
198  std::set<std::string> usedWorkerLabels;
199  for (auto const& worker : allWorkers()) {
200  usedWorkerLabels.insert(worker->description().moduleLabel());
201  }
202  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
203  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
204  std::vector<std::string> unusedLabels;
205  set_difference(modulesInConfigSet.begin(),
206  modulesInConfigSet.end(),
207  usedWorkerLabels.begin(),
208  usedWorkerLabels.end(),
209  back_inserter(unusedLabels));
210  std::set<std::string> unscheduledLabels;
211  std::vector<std::string> shouldBeUsedLabels;
212  if (!unusedLabels.empty()) {
213  //Need to
214  // 1) create worker
215  // 2) if it is a WorkerT<EDProducer>, add it to our list
216  // 3) hand list to our delayed reader
217  for (auto const& label : unusedLabels) {
218  bool isTracked;
219  ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
220  assert(isTracked);
221  assert(modulePSet != nullptr);
223  *modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
224  }
225  if (!shouldBeUsedLabels.empty()) {
226  std::ostringstream unusedStream;
227  unusedStream << "'" << shouldBeUsedLabels.front() << "'";
228  for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
229  itLabelEnd = shouldBeUsedLabels.end();
230  itLabel != itLabelEnd;
231  ++itLabel) {
232  unusedStream << ",'" << *itLabel << "'";
233  }
234  LogInfo("path") << "The following module labels are not assigned to any path:\n" << unusedStream.str() << "\n";
235  }
236  }
237  number_of_unscheduled_modules_ = unscheduledLabels.size();
238 
239  initializeEarlyDelete(*modReg, opts, preg, allowEarlyDelete);
240 
241  } // StreamSchedule::StreamSchedule

References actions, actReg_, addToAllWorkers(), edm::WorkerManager::addToUnscheduledWorkers(), allWorkers(), cms::cuda::assert(), end_paths_, fillEndPath(), fillTrigPath(), edm::propagate_const< T >::get(), edm::service::TriggerNamesService::getEndPaths(), edm::ParameterSet::getParameter(), edm::ParameterSet::getPSetForUpdate(), edm::service::TriggerNamesService::getTrigPaths(), edm::ParameterSet::getUntrackedParameterSet(), initializeEarlyDelete(), label, makePathStatusInserters(), number_of_unscheduled_modules_, geometryDiff::opts, patCandidatesForDimuonsSequences_cff::pathNames, results(), results_inserter_, streamID(), trig_paths_, edm::StreamID::value(), and workerManager_.

◆ StreamSchedule() [2/2]

edm::StreamSchedule::StreamSchedule ( StreamSchedule const &  )
delete

Member Function Documentation

◆ actionTable()

ExceptionToActionTable const& edm::StreamSchedule::actionTable ( ) const
inlineprivate

returns the action table

Definition at line 285 of file StreamSchedule.h.

285 { return workerManager_.actionTable(); }

References edm::WorkerManager::actionTable(), and workerManager_.

Referenced by fillEndPath(), fillTrigPath(), and finishedPaths().

◆ addToAllWorkers()

void edm::StreamSchedule::addToAllWorkers ( Worker w)
private

◆ allWorkers()

AllWorkers const& edm::StreamSchedule::allWorkers ( ) const
inline

returns the collection of pointers to workers

Definition at line 257 of file StreamSchedule.h.

257 { return workerManager_.allWorkers(); }

References edm::WorkerManager::allWorkers(), and workerManager_.

Referenced by clearCounters(), getAllModuleDescriptions(), getTriggerReport(), initializeEarlyDelete(), replaceModule(), and StreamSchedule().

◆ availablePaths()

void edm::StreamSchedule::availablePaths ( std::vector< std::string > &  oLabelsToFill) const

adds to oLabelsToFill the labels for all paths in the process

Definition at line 750 of file StreamSchedule.cc.

750  {
751  oLabelsToFill.reserve(trig_paths_.size());
752  std::transform(trig_paths_.begin(),
753  trig_paths_.end(),
754  std::back_inserter(oLabelsToFill),
755  std::bind(&Path::name, std::placeholders::_1));
756  }

References edm::Path::name(), HcalDetIdTransform::transform(), and trig_paths_.

◆ beginStream()

void edm::StreamSchedule::beginStream ( )

◆ clearCounters()

void edm::StreamSchedule::clearCounters ( )

Clear all the counters in the trigger report.

Definition at line 885 of file StreamSchedule.cc.

885  {
886  using std::placeholders::_1;
888  for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
889  for_all(end_paths_, std::bind(&Path::clearCounters, _1));
890  for_all(allWorkers(), std::bind(&Worker::clearCounters, _1));
891  }

References allWorkers(), edm::Path::clearCounters(), edm::Worker::clearCounters(), end_paths_, edm::for_all(), total_events_, total_passed_, and trig_paths_.

◆ context()

StreamContext const& edm::StreamSchedule::context ( ) const
inline

Definition at line 261 of file StreamSchedule.h.

261 { return streamContext_; }

References streamContext_.

◆ enableEndPaths()

void edm::StreamSchedule::enableEndPaths ( bool  active)

Turn end_paths "off" if "active" is false; turn end_paths "on" if "active" is true.

Definition at line 829 of file StreamSchedule.cc.

829 { endpathsAreActive_ = active; }

References endpathsAreActive_.

◆ endPathsEnabled()

bool edm::StreamSchedule::endPathsEnabled ( ) const

Return true if end_paths are active, and false if they are inactive.

Definition at line 831 of file StreamSchedule.cc.

831 { return endpathsAreActive_; }

References endpathsAreActive_.

◆ endStream()

void edm::StreamSchedule::endStream ( )

◆ fillEndPath()

void edm::StreamSchedule::fillEndPath ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
int  bitpos,
std::string const &  name,
std::vector< std::string > const &  endPathNames 
)
private

Definition at line 500 of file StreamSchedule.cc.

506  {
507  PathWorkers tmpworkers;
508  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, true, tmpworkers, endPathNames);
509 
510  if (!tmpworkers.empty()) {
511  //EndPaths are not supposed to stop if SkipEvent type exception happens
512  end_paths_.emplace_back(bitpos,
513  name,
514  tmpworkers,
515  TrigResPtr(),
516  actionTable(),
517  actReg_,
519  nullptr,
521  } else {
522  empty_end_paths_.push_back(bitpos);
523  }
524  for (WorkerInPath const& workerInPath : tmpworkers) {
525  addToAllWorkers(workerInPath.getWorker());
526  }
527  }

References actionTable(), actReg_, addToAllWorkers(), empty_end_paths_, end_paths_, fillWorkers(), edm::PathContext::kEndPath, Skims_PA_cff::name, and streamContext_.

Referenced by StreamSchedule().

◆ fillTrigPath()

void edm::StreamSchedule::fillTrigPath ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
int  bitpos,
std::string const &  name,
TrigResPtr  trptr,
std::vector< std::string > const &  endPathNames 
)
private

Definition at line 470 of file StreamSchedule.cc.

477  {
478  PathWorkers tmpworkers;
479  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, false, tmpworkers, endPathNames);
480 
481  // an empty path will cause an extra bit that is not used
482  if (!tmpworkers.empty()) {
483  trig_paths_.emplace_back(bitpos,
484  name,
485  tmpworkers,
486  trptr,
487  actionTable(),
488  actReg_,
492  } else {
493  empty_trig_paths_.push_back(bitpos);
494  }
495  for (WorkerInPath const& workerInPath : tmpworkers) {
496  addToAllWorkers(workerInPath.getWorker());
497  }
498  }

References actionTable(), actReg_, addToAllWorkers(), empty_trig_paths_, fillWorkers(), edm::PathContext::kPath, Skims_PA_cff::name, skippingEvent_, streamContext_, and trig_paths_.

Referenced by StreamSchedule().

◆ fillWorkers()

void edm::StreamSchedule::fillWorkers ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
std::string const &  name,
bool  ignoreFilters,
PathWorkers out,
std::vector< std::string > const &  endPathNames 
)
private

Definition at line 399 of file StreamSchedule.cc.

406  {
407  vstring modnames = proc_pset.getParameter<vstring>(pathName);
408  PathWorkers tmpworkers;
409 
410  unsigned int placeInPath = 0;
411  for (auto const& name : modnames) {
412  //Modules except EDFilters are set to run concurrently by default
413  bool doNotRunConcurrently = false;
415  if (name[0] == '!') {
416  filterAction = WorkerInPath::Veto;
417  } else if (name[0] == '-' or name[0] == '+') {
418  filterAction = WorkerInPath::Ignore;
419  }
420  if (name[0] == '|' or name[0] == '+') {
421  //cms.wait was specified so do not run concurrently
422  doNotRunConcurrently = true;
423  }
424 
426  if (filterAction != WorkerInPath::Normal or name[0] == '|') {
427  moduleLabel.erase(0, 1);
428  }
429 
430  bool isTracked;
431  ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
432  if (modpset == nullptr) {
433  std::string pathType("endpath");
434  if (!search_all(endPathNames, pathName)) {
435  pathType = std::string("path");
436  }
438  << "The unknown module label \"" << moduleLabel << "\" appears in " << pathType << " \"" << pathName
439  << "\"\n please check spelling or remove that label from the path.";
440  }
441  assert(isTracked);
442 
443  Worker* worker = workerManager_.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel);
444  if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType() == Worker::kFilter) {
445  // We have a filter on an end path, and the filter is not explicitly ignored.
446  // See if the filter is allowed.
447  std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
448  if (!search_all(allowed_filters, worker->description().moduleName())) {
449  // Filter is not allowed. Ignore the result, and issue a warning.
450  filterAction = WorkerInPath::Ignore;
451  LogWarning("FilterOnEndPath") << "The EDFilter '" << worker->description().moduleName()
452  << "' with module label '" << moduleLabel << "' appears on EndPath '"
453  << pathName << "'.\n"
454  << "The return value of the filter will be ignored.\n"
455  << "To suppress this warning, either remove the filter from the endpath,\n"
456  << "or explicitly ignore it in the configuration by using cms.ignore().\n";
457  }
458  }
459  bool runConcurrently = not doNotRunConcurrently;
460  if (runConcurrently && worker->moduleType() == Worker::kFilter and filterAction != WorkerInPath::Ignore) {
461  runConcurrently = false;
462  }
463  tmpworkers.emplace_back(worker, filterAction, placeInPath, runConcurrently);
464  ++placeInPath;
465  }
466 
467  out.swap(tmpworkers);
468  }

References cms::cuda::assert(), edm::errors::Configuration, edm::Worker::description(), Exception, edm::ParameterSet::getParameter(), edm::ParameterSet::getPSetForUpdate(), edm::ParameterSet::getUntrackedParameter(), edm::WorkerManager::getWorker(), edm::WorkerInPath::Ignore, edm::Worker::kFilter, HerwigMaxPtPartonFilter_cfi::moduleLabel, edm::ModuleDescription::moduleName(), edm::Worker::moduleType(), Skims_PA_cff::name, edm::WorkerInPath::Normal, or, MillePedeFileConverter_cfg::out, hltMonBTagIPClient_cfi::pathName, edm::search_all(), AlCaHLTBitMon_QueryRunRegistry::string, edm::WorkerInPath::Veto, and workerManager_.

Referenced by fillEndPath(), and fillTrigPath().

◆ finishedPaths()

void edm::StreamSchedule::finishedPaths ( std::atomic< std::exception_ptr * > &  iExcept,
WaitingTaskHolder  iWait,
EventPrincipal ep,
EventSetupImpl const &  es 
)
private

Definition at line 662 of file StreamSchedule.cc.

665  {
666  if (iExcept) {
667  // Caught exception is propagated via WaitingTaskHolder
668  CMS_SA_ALLOW try { std::rethrow_exception(*(iExcept.load())); } catch (cms::Exception& e) {
673  edm::printCmsExceptionWarning("SkipEvent", e);
674  *(iExcept.load()) = std::exception_ptr();
675  } else {
676  *(iExcept.load()) = std::current_exception();
677  }
678  } catch (...) {
679  *(iExcept.load()) = std::current_exception();
680  }
681  }
682 
683  if ((not iExcept) and results_->accept()) {
684  ++total_passed_;
685  }
686 
687  if (nullptr != results_inserter_.get()) {
688  // Caught exception is propagated to the caller
689  CMS_SA_ALLOW try {
690  //Even if there was an exception, we need to allow results inserter
691  // to run since some module may be waiting on its results.
692  ParentContext parentContext(&streamContext_);
693  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
694 
695  results_inserter_->doWork<Traits>(ep, es, streamID_, parentContext, &streamContext_);
696  } catch (cms::Exception& ex) {
697  if (not iExcept) {
698  if (ex.context().empty()) {
699  std::ostringstream ost;
700  ost << "Processing Event " << ep.id();
701  ex.addContext(ost.str());
702  }
703  iExcept.store(new std::exception_ptr(std::current_exception()));
704  }
705  } catch (...) {
706  if (not iExcept) {
707  iExcept.store(new std::exception_ptr(std::current_exception()));
708  }
709  }
710  }
711  std::exception_ptr ptr;
712  if (iExcept) {
713  ptr = *iExcept.load();
714  }
715  iWait.doneWaiting(ptr);
716  }

References writedatasetfile::action, actionTable(), cms::Exception::addContext(), cms::cuda::assert(), CMS_SA_ALLOW, cms::Exception::context(), edm::WaitingTaskHolder::doneWaiting(), MillePedeFileConverter_cfg::e, SiStripBadComponentsDQMServiceTemplate_cfg::ep, edm::exception_actions::FailPath, edm::ExceptionToActionTable::find(), edm::propagate_const< T >::get(), edm::exception_actions::IgnoreCompletely, edm::printCmsExceptionWarning(), results_, results_inserter_, edm::exception_actions::SkipEvent, streamContext_, streamID_, and total_passed_.

Referenced by processOneEventAsync().

◆ finishProcessOneEvent()

std::exception_ptr edm::StreamSchedule::finishProcessOneEvent ( std::exception_ptr  iExcept)
private

Definition at line 718 of file StreamSchedule.cc.

718  {
719  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
720 
721  if (iExcept) {
722  //add context information to the exception and print message
723  try {
724  convertException::wrap([&]() { std::rethrow_exception(iExcept); });
725  } catch (cms::Exception& ex) {
726  bool const cleaningUpAfterException = false;
727  if (ex.context().empty()) {
728  addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
729  } else {
730  addContextAndPrintException("", ex, cleaningUpAfterException);
731  }
732  iExcept = std::current_exception();
733  }
734 
735  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
736  }
737  // Caught exception is propagated to the caller
738  CMS_SA_ALLOW try { Traits::postScheduleSignal(actReg_.get(), &streamContext_); } catch (...) {
739  if (not iExcept) {
740  iExcept = std::current_exception();
741  }
742  }
743  if (not iExcept) {
745  }
746 
747  return iExcept;
748  }

References actReg_, edm::addContextAndPrintException(), CMS_SA_ALLOW, cms::Exception::context(), edm::ExceptionFromThisContext, resetEarlyDelete(), streamContext_, and edm::convertException::wrap().

Referenced by processOneEventAsync().

◆ getAllModuleDescriptions()

std::vector< ModuleDescription const * > edm::StreamSchedule::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this StreamSchedule. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 549 of file StreamSchedule.cc.

549  {
550  std::vector<ModuleDescription const*> result;
551  result.reserve(allWorkers().size());
552 
553  for (auto const& worker : allWorkers()) {
554  ModuleDescription const* p = worker->descPtr();
555  result.push_back(p);
556  }
557  return result;
558  }

References allWorkers(), AlCaHLTBitMon_ParallelJobs::p, mps_fire::result, and findQualityFiles::size.

◆ getTriggerReport()

void edm::StreamSchedule::getTriggerReport ( TriggerReport rep) const

Return the trigger report information on paths, modules-in-path, modules-in-endpath, and modules.

Definition at line 875 of file StreamSchedule.cc.

875  {
876  rep.eventSummary.totalEvents += totalEvents();
877  rep.eventSummary.totalEventsPassed += totalEventsPassed();
878  rep.eventSummary.totalEventsFailed += totalEventsFailed();
879 
880  fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
881  fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
882  fill_summary(allWorkers(), rep.workerSummaries, &fillWorkerSummary);
883  }

References allWorkers(), end_paths_, edm::fillPathSummary(), edm::fillWorkerSummary(), cuy::rep, totalEvents(), totalEventsFailed(), totalEventsPassed(), and trig_paths_.

◆ initializeEarlyDelete()

void edm::StreamSchedule::initializeEarlyDelete ( ModuleRegistry modReg,
edm::ParameterSet const &  opts,
edm::ProductRegistry const &  preg,
bool  allowEarlyDelete 
)
private

Definition at line 243 of file StreamSchedule.cc.

246  {
247  //for now, if have a subProcess, don't allow early delete
248  // In the future we should use the SubProcess's 'keep list' to decide what can be kept
249  if (not allowEarlyDelete)
250  return;
251 
252  //see if 'canDeleteEarly' was set and if so setup the list with those products actually
253  // registered for this job
254  std::multimap<std::string, Worker*> branchToReadingWorker;
255  initializeBranchToReadingWorker(opts, preg, branchToReadingWorker);
256 
257  //If no delete early items have been specified we don't have to do anything
258  if (branchToReadingWorker.empty()) {
259  return;
260  }
261  const std::vector<std::string> kEmpty;
262  std::map<Worker*, unsigned int> reserveSizeForWorker;
263  unsigned int upperLimitOnReadingWorker = 0;
264  unsigned int upperLimitOnIndicies = 0;
265  unsigned int nUniqueBranchesToDelete = branchToReadingWorker.size();
266 
267  //talk with output modules first
268  modReg.forAllModuleHolders([&branchToReadingWorker, &nUniqueBranchesToDelete](maker::ModuleHolder* iHolder) {
269  auto comm = iHolder->createOutputModuleCommunicator();
270  if (comm) {
271  if (!branchToReadingWorker.empty()) {
272  //If an OutputModule needs a product, we can't delete it early
273  // so we should remove it from our list
274  SelectedProductsForBranchType const& kept = comm->keptProducts();
275  for (auto const& item : kept[InEvent]) {
276  BranchDescription const& desc = *item.first;
277  auto found = branchToReadingWorker.equal_range(desc.branchName());
278  if (found.first != found.second) {
279  --nUniqueBranchesToDelete;
280  branchToReadingWorker.erase(found.first, found.second);
281  }
282  }
283  }
284  }
285  });
286 
287  if (branchToReadingWorker.empty()) {
288  return;
289  }
290 
291  for (auto w : allWorkers()) {
292  //determine if this module could read a branch we want to delete early
293  auto pset = pset::Registry::instance()->getMapped(w->description().parameterSetID());
294  if (nullptr != pset) {
295  auto branches = pset->getUntrackedParameter<std::vector<std::string>>("mightGet", kEmpty);
296  if (not branches.empty()) {
297  ++upperLimitOnReadingWorker;
298  }
299  for (auto const& branch : branches) {
300  auto found = branchToReadingWorker.equal_range(branch);
301  if (found.first != found.second) {
302  ++upperLimitOnIndicies;
303  ++reserveSizeForWorker[w];
304  if (nullptr == found.first->second) {
305  found.first->second = w;
306  } else {
307  branchToReadingWorker.insert(make_pair(found.first->first, w));
308  }
309  }
310  }
311  }
312  }
313  {
314  auto it = branchToReadingWorker.begin();
315  std::vector<std::string> unusedBranches;
316  while (it != branchToReadingWorker.end()) {
317  if (it->second == nullptr) {
318  unusedBranches.push_back(it->first);
319  //erasing the object invalidates the iterator so must advance it first
320  auto temp = it;
321  ++it;
322  branchToReadingWorker.erase(temp);
323  } else {
324  ++it;
325  }
326  }
327  if (not unusedBranches.empty()) {
328  LogWarning l("UnusedProductsForCanDeleteEarly");
329  l << "The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
330  " If possible, remove the producer from the job or add the product to the producer's own 'mightGet' list.";
331  for (auto const& n : unusedBranches) {
332  l << "\n " << n;
333  }
334  }
335  }
336  if (!branchToReadingWorker.empty()) {
337  earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
338  earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies, 0);
339  earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
340  std::map<const Worker*, EarlyDeleteHelper*> alreadySeenWorkers;
341  std::string lastBranchName;
342  size_t nextOpenIndex = 0;
343  unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
344  for (auto& branchAndWorker : branchToReadingWorker) {
345  if (lastBranchName != branchAndWorker.first) {
346  //have to put back the period we removed earlier in order to get the proper name
347  BranchID bid(branchAndWorker.first + ".");
348  earlyDeleteBranchToCount_.emplace_back(bid, 0U);
349  lastBranchName = branchAndWorker.first;
350  }
351  auto found = alreadySeenWorkers.find(branchAndWorker.second);
352  if (alreadySeenWorkers.end() == found) {
353  //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
354  // all the branches that might be read by this worker. However, initially we will only tell the
355  // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
356  // EarlyDeleteHelper will automatically advance its internal end pointer.
357  size_t index = nextOpenIndex;
358  size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
360  earlyDeleteHelpers_.emplace_back(beginAddress + index, beginAddress + index + 1, &earlyDeleteBranchToCount_);
361  branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
362  alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second, &(earlyDeleteHelpers_.back())));
363  nextOpenIndex += nIndices;
364  } else {
365  found->second->appendIndex(earlyDeleteBranchToCount_.size() - 1);
366  }
367  }
368 
369  //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
370  // space needed for each module
371  auto itLast = earlyDeleteHelpers_.begin();
372  for (auto it = earlyDeleteHelpers_.begin() + 1; it != earlyDeleteHelpers_.end(); ++it) {
373  if (itLast->end() != it->begin()) {
374  //figure the offset for next Worker since it hasn't been moved yet so it has the original address
375  unsigned int delta = it->begin() - itLast->end();
376  it->shiftIndexPointers(delta);
377 
379  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
380  earlyDeleteHelperToBranchIndicies_.begin() + (it->begin() - beginAddress));
381  }
382  itLast = it;
383  }
385  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
387 
388  //now tell the paths about the deleters
389  for (auto& p : trig_paths_) {
390  p.setEarlyDeleteHelpers(alreadySeenWorkers);
391  }
392  for (auto& p : end_paths_) {
393  p.setEarlyDeleteHelpers(alreadySeenWorkers);
394  }
396  }
397  }

References allWorkers(), MicroEventContent_cff::branch, edm::BranchDescription::branchName(), edm::maker::ModuleHolder::createOutputModuleCommunicator(), dumpMFGeometry_cfg::delta, earlyDeleteBranchToCount_, earlyDeleteHelpers_, earlyDeleteHelperToBranchIndicies_, end_paths_, edm::ModuleRegistry::forAllModuleHolders(), newFWLiteAna::found, edm::pset::Registry::getMapped(), edm::InEvent, edm::pset::Registry::instance(), B2GTnPMonitor_cfi::item, cmsLHEtoEOSManager::l, dqmiodumpmetadata::n, geometryDiff::opts, AlCaHLTBitMon_ParallelJobs::p, muonDTDigis_cfi::pset, resetEarlyDelete(), AlCaHLTBitMon_QueryRunRegistry::string, groupFilesInBlocks::temp, trig_paths_, mitigatedMETSequence_cff::U, and w.

Referenced by StreamSchedule().

◆ makePathStatusInserters()

void edm::StreamSchedule::makePathStatusInserters ( std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters,
std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &  endPathStatusInserters,
ExceptionToActionTable const &  actions 
)
private

Definition at line 914 of file StreamSchedule.cc.

917  {
918  int bitpos = 0;
919  unsigned int indexEmpty = 0;
920  unsigned int indexOfPath = 0;
921  for (auto& pathStatusInserter : pathStatusInserters) {
922  std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
923  WorkerPtr workerPtr(
924  new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
925  pathStatusInserterWorkers_.emplace_back(workerPtr);
926  workerPtr->setActivityRegistry(actReg_);
927  addToAllWorkers(workerPtr.get());
928 
929  // A little complexity here because a C++ Path object is not
930  // instantiated and put into end_paths if there are no modules
931  // on the configured path.
932  if (indexEmpty < empty_trig_paths_.size() && bitpos == empty_trig_paths_.at(indexEmpty)) {
933  ++indexEmpty;
934  } else {
935  trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(), workerPtr.get());
936  ++indexOfPath;
937  }
938  ++bitpos;
939  }
940 
941  bitpos = 0;
942  indexEmpty = 0;
943  indexOfPath = 0;
944  for (auto& endPathStatusInserter : endPathStatusInserters) {
945  std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
946  WorkerPtr workerPtr(
947  new edm::WorkerT<EndPathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
948  endPathStatusInserterWorkers_.emplace_back(workerPtr);
949  workerPtr->setActivityRegistry(actReg_);
950  addToAllWorkers(workerPtr.get());
951 
952  // A little complexity here because a C++ Path object is not
953  // instantiated and put into end_paths if there are no modules
954  // on the configured path.
955  if (indexEmpty < empty_end_paths_.size() && bitpos == empty_end_paths_.at(indexEmpty)) {
956  ++indexEmpty;
957  } else {
958  end_paths_.at(indexOfPath).setPathStatusInserter(nullptr, workerPtr.get());
959  ++indexOfPath;
960  }
961  ++bitpos;
962  }
963  }

References actions, actReg_, addToAllWorkers(), empty_end_paths_, empty_trig_paths_, end_paths_, endPathStatusInserterWorkers_, edm::get_underlying(), pathStatusInserterWorkers_, and trig_paths_.

Referenced by StreamSchedule().

◆ moduleDescriptionsInEndPath()

void edm::StreamSchedule::moduleDescriptionsInEndPath ( std::string const &  iEndPathLabel,
std::vector< ModuleDescription const * > &  descriptions,
unsigned int  hint 
) const

Definition at line 800 of file StreamSchedule.cc.

802  {
803  descriptions.clear();
804  bool found = false;
805  TrigPaths::const_iterator itFound;
806 
807  if (hint < end_paths_.size()) {
808  itFound = end_paths_.begin() + hint;
809  if (itFound->name() == iEndPathLabel)
810  found = true;
811  }
812  if (!found) {
813  // if the hint did not work, do it the slow way
814  itFound = std::find_if(
815  end_paths_.begin(),
816  end_paths_.end(),
817  std::bind(std::equal_to<std::string>(), iEndPathLabel, std::bind(&Path::name, std::placeholders::_1)));
818  if (itFound != end_paths_.end())
819  found = true;
820  }
821  if (found) {
822  descriptions.reserve(itFound->size());
823  for (size_t i = 0; i < itFound->size(); ++i) {
824  descriptions.push_back(itFound->getWorker(i)->descPtr());
825  }
826  }
827  }

References end_paths_, newFWLiteAna::found, mps_fire::i, and edm::Path::name().

◆ moduleDescriptionsInPath()

void edm::StreamSchedule::moduleDescriptionsInPath ( std::string const &  iPathLabel,
std::vector< ModuleDescription const * > &  descriptions,
unsigned int  hint 
) const

Definition at line 771 of file StreamSchedule.cc.

773  {
774  descriptions.clear();
775  bool found = false;
776  TrigPaths::const_iterator itFound;
777 
778  if (hint < trig_paths_.size()) {
779  itFound = trig_paths_.begin() + hint;
780  if (itFound->name() == iPathLabel)
781  found = true;
782  }
783  if (!found) {
784  // if the hint did not work, do it the slow way
785  itFound = std::find_if(
786  trig_paths_.begin(),
787  trig_paths_.end(),
788  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
789  if (itFound != trig_paths_.end())
790  found = true;
791  }
792  if (found) {
793  descriptions.reserve(itFound->size());
794  for (size_t i = 0; i < itFound->size(); ++i) {
795  descriptions.push_back(itFound->getWorker(i)->descPtr());
796  }
797  }
798  }

References newFWLiteAna::found, mps_fire::i, edm::Path::name(), and trig_paths_.

◆ modulesInPath()

void edm::StreamSchedule::modulesInPath ( std::string const &  iPathLabel,
std::vector< std::string > &  oLabelsToFill 
) const

adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel

Definition at line 758 of file StreamSchedule.cc.

758  {
759  TrigPaths::const_iterator itFound = std::find_if(
760  trig_paths_.begin(),
761  trig_paths_.end(),
762  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
763  if (itFound != trig_paths_.end()) {
764  oLabelsToFill.reserve(itFound->size());
765  for (size_t i = 0; i < itFound->size(); ++i) {
766  oLabelsToFill.push_back(itFound->getWorker(i)->description().moduleLabel());
767  }
768  }
769  }

References mps_fire::i, edm::Path::name(), and trig_paths_.

◆ numberOfUnscheduledModules()

unsigned int edm::StreamSchedule::numberOfUnscheduledModules ( ) const
inline

Definition at line 259 of file StreamSchedule.h.

References number_of_unscheduled_modules_.

◆ processOneEventAsync()

void edm::StreamSchedule::processOneEventAsync ( WaitingTaskHolder  iTask,
EventPrincipal ep,
EventSetupImpl const &  es,
ServiceToken const &  token,
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters 
)

Definition at line 560 of file StreamSchedule.cc.

565  {
566  // Caught exception is propagated via WaitingTaskHolder
567  CMS_SA_ALLOW try {
568  this->resetAll();
569 
570  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
571 
572  Traits::setStreamContext(streamContext_, ep);
573  //a service may want to communicate with another service
574  ServiceRegistry::Operate guard(serviceToken);
575  Traits::preScheduleSignal(actReg_.get(), &streamContext_);
576 
577  HLTPathStatus hltPathStatus(hlt::Pass, 0);
578  for (int empty_trig_path : empty_trig_paths_) {
579  results_->at(empty_trig_path) = hltPathStatus;
580  pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
581  std::exception_ptr except = pathStatusInserterWorkers_[empty_trig_path]
582  ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
583  ep, es, streamID_, ParentContext(&streamContext_), &streamContext_);
584  if (except) {
585  iTask.doneWaiting(except);
586  return;
587  }
588  }
589  for (int empty_end_path : empty_end_paths_) {
590  std::exception_ptr except = endPathStatusInserterWorkers_[empty_end_path]
591  ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
592  ep, es, streamID_, ParentContext(&streamContext_), &streamContext_);
593  if (except) {
594  iTask.doneWaiting(except);
595  return;
596  }
597  }
598 
599  // This call takes care of the unscheduled processing.
601 
602  ++total_events_;
603 
604  //use to give priorities on an error to ones from Paths
605  auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(nullptr);
606  auto pathErrorPtr = pathErrorHolder.get();
607  auto allPathsDone = make_waiting_task(
608  tbb::task::allocate_root(),
609  [iTask, this, serviceToken, pathError = std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable {
610  ServiceRegistry::Operate operate(serviceToken);
611 
612  std::exception_ptr ptr;
613  if (pathError->load()) {
614  ptr = *pathError->load();
615  delete pathError->load();
616  }
617  if ((not ptr) and iPtr) {
618  ptr = *iPtr;
619  }
620  iTask.doneWaiting(finishProcessOneEvent(ptr));
621  });
622  //The holder guarantees that if the paths finish before the loop ends
623  // that we do not start too soon. It also guarantees that the task will
624  // run under that condition.
625  WaitingTaskHolder allPathsHolder(allPathsDone);
626 
627  auto pathsDone = make_waiting_task(
628  tbb::task::allocate_root(),
629  [allPathsHolder, pathErrorPtr, &ep, &es, this, serviceToken](std::exception_ptr const* iPtr) mutable {
630  ServiceRegistry::Operate operate(serviceToken);
631 
632  if (iPtr) {
633  //this is used to prioritize this error over one
634  // that happens in EndPath or Accumulate
635  pathErrorPtr->store(new std::exception_ptr(*iPtr));
636  }
637  finishedPaths(*pathErrorPtr, std::move(allPathsHolder), ep, es);
638  });
639 
640  //The holder guarantees that if the paths finish before the loop ends
641  // that we do not start too soon. It also guarantees that the task will
642  // run under that condition.
643  WaitingTaskHolder taskHolder(pathsDone);
644 
645  //start end paths first so on single threaded the paths will run first
646  for (auto it = end_paths_.rbegin(), itEnd = end_paths_.rend(); it != itEnd; ++it) {
647  it->processOneOccurrenceAsync(allPathsDone, ep, es, serviceToken, streamID_, &streamContext_);
648  }
649 
650  for (auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend(); it != itEnd; ++it) {
651  it->processOneOccurrenceAsync(pathsDone, ep, es, serviceToken, streamID_, &streamContext_);
652  }
653 
654  ParentContext parentContext(&streamContext_);
655  workerManager_.processAccumulatorsAsync<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
656  allPathsDone, ep, es, serviceToken, streamID_, parentContext, &streamContext_);
657  } catch (...) {
658  iTask.doneWaiting(std::current_exception());
659  }
660  }

References actReg_, CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), empty_end_paths_, empty_trig_paths_, end_paths_, endPathStatusInserterWorkers_, SiStripBadComponentsDQMServiceTemplate_cfg::ep, finishedPaths(), finishProcessOneEvent(), edm::make_waiting_task(), eostools::move(), edm::hlt::Pass, pathStatusInserterWorkers_, edm::WorkerManager::processAccumulatorsAsync(), resetAll(), results_, edm::WorkerManager::setupOnDemandSystem(), streamContext_, streamID_, total_events_, trig_paths_, and workerManager_.

◆ processOneStreamAsync()

template<typename T >
void edm::StreamSchedule::processOneStreamAsync ( WaitingTaskHolder  iTask,
typename T::MyPrincipal &  principal,
EventSetupImpl const &  eventSetup,
ServiceToken const &  token,
bool  cleaningUpAfterException = false 
)

Definition at line 382 of file StreamSchedule.h.

386  {
387  T::setStreamContext(streamContext_, ep);
388 
389  auto id = ep.id();
390  auto doneTask = make_waiting_task(
391  tbb::task::allocate_root(),
392  [this, iHolder, id, cleaningUpAfterException, token](std::exception_ptr const* iPtr) mutable {
393  std::exception_ptr excpt;
394  if (iPtr) {
395  excpt = *iPtr;
396  //add context information to the exception and print message
397  try {
398  convertException::wrap([&]() { std::rethrow_exception(excpt); });
399  } catch (cms::Exception& ex) {
400  //TODO: should add the transition type info
401  std::ostringstream ost;
402  if (ex.context().empty()) {
403  ost << "Processing " << T::transitionName() << " " << id;
404  }
406  addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
407  excpt = std::current_exception();
408  }
409 
411  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
412  }
413  // Caught exception is propagated via WaitingTaskHolder
414  CMS_SA_ALLOW try {
416  T::postScheduleSignal(actReg_.get(), &streamContext_);
417  } catch (...) {
418  if (not excpt) {
419  excpt = std::current_exception();
420  }
421  }
422  iHolder.doneWaiting(excpt);
423  });
424 
425  auto task = make_functor_task(tbb::task::allocate_root(),
426  [this, doneTask, h = WaitingTaskHolder(doneTask), &ep, &es, token]() mutable {
428  // Caught exception is propagated via WaitingTaskHolder
429  CMS_SA_ALLOW try {
430  T::preScheduleSignal(actReg_.get(), &streamContext_);
431 
433  } catch (...) {
434  h.doneWaiting(std::current_exception());
435  return;
436  }
437 
438  for (auto& p : end_paths_) {
439  p.runAllModulesAsync<T>(doneTask, ep, es, token, streamID_, &streamContext_);
440  }
441 
442  for (auto& p : trig_paths_) {
443  p.runAllModulesAsync<T>(doneTask, ep, es, token, streamID_, &streamContext_);
444  }
445 
447  doneTask, ep, es, token, streamID_, &streamContext_, &streamContext_);
448  });
449 
450  if (streamID_.value() == 0) {
451  //Enqueueing will start another thread if there is only
452  // one thread in the job. Having stream == 0 use spawn
453  // avoids starting up another thread when there is only one stream.
454  tbb::task::spawn(*task);
455  } else {
456  tbb::task::enqueue(*task);
457  }
458  }

References actReg_, edm::addContextAndPrintException(), CMS_SA_ALLOW, cms::Exception::context(), edm::WaitingTaskHolder::doneWaiting(), end_paths_, SiStripBadComponentsDQMServiceTemplate_cfg::ep, edm::ExceptionFromThisContext, triggerObjects_cff::id, edm::make_functor_task(), edm::make_waiting_task(), AlCaHLTBitMon_ParallelJobs::p, edm::WorkerManager::processOneOccurrenceAsync(), edm::WorkerManager::resetAll(), streamContext_, streamID_, TrackValidation_cff::task, unpackBuffers-CaloStage2::token, trig_paths_, edm::StreamID::value(), workerManager_, and edm::convertException::wrap().

◆ replaceModule()

void edm::StreamSchedule::replaceModule ( maker::ModuleHolder iMod,
std::string const &  iLabel 
)

clone the type of module with label iLabel but configure with iPSet.

Definition at line 533 of file StreamSchedule.cc.

533  {
534  Worker* found = nullptr;
535  for (auto const& worker : allWorkers()) {
536  if (worker->description().moduleLabel() == iLabel) {
537  found = worker;
538  break;
539  }
540  }
541  if (nullptr == found) {
542  return;
543  }
544 
545  iMod->replaceModuleFor(found);
546  found->beginStream(streamID_, streamContext_);
547  }

References allWorkers(), newFWLiteAna::found, edm::maker::ModuleHolder::replaceModuleFor(), streamContext_, and streamID_.

◆ reportSkipped()

void edm::StreamSchedule::reportSkipped ( EventPrincipal const &  ep) const
inlineprivate

Definition at line 376 of file StreamSchedule.h.

376  {
377  Service<JobReport> reportSvc;
378  reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
379  }

References SiStripBadComponentsDQMServiceTemplate_cfg::ep.

◆ resetAll()

void edm::StreamSchedule::resetAll ( )
private

Definition at line 893 of file StreamSchedule.cc.

893  {
894  skippingEvent_ = false;
895  results_->reset();
896  }

References results_, and skippingEvent_.

Referenced by processOneEventAsync().

◆ resetEarlyDelete()

void edm::StreamSchedule::resetEarlyDelete ( )
private

Definition at line 900 of file StreamSchedule.cc.

900  {
901  //must be sure we have cleared the count first
902  for (auto& count : earlyDeleteBranchToCount_) {
903  count.count = 0;
904  }
905  //now reset based on how many helpers use that branch
907  ++(earlyDeleteBranchToCount_[index].count);
908  }
909  for (auto& helper : earlyDeleteHelpers_) {
910  helper.reset();
911  }
912  }

References KineDebug3::count(), earlyDeleteBranchToCount_, earlyDeleteHelpers_, and earlyDeleteHelperToBranchIndicies_.

Referenced by finishProcessOneEvent(), and initializeEarlyDelete().

◆ results() [1/2]

TrigResPtr& edm::StreamSchedule::results ( )
inlineprivate

Definition at line 330 of file StreamSchedule.h.

330 { return get_underlying_safe(results_); }

References edm::get_underlying_safe(), and results_.

◆ results() [2/2]

TrigResConstPtr edm::StreamSchedule::results ( ) const
inlineprivate

Definition at line 329 of file StreamSchedule.h.

329 { return get_underlying_safe(results_); }

References edm::get_underlying_safe(), and results_.

Referenced by StreamSchedule().

◆ streamID()

StreamID edm::StreamSchedule::streamID ( ) const
inline

Definition at line 201 of file StreamSchedule.h.

201 { return streamID_; }

References streamID_.

Referenced by StreamSchedule().

◆ totalEvents()

int edm::StreamSchedule::totalEvents ( ) const
inline

Return the number of events this StreamSchedule has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 228 of file StreamSchedule.h.

228 { return total_events_; }

References total_events_.

Referenced by getTriggerReport(), and totalEventsFailed().

◆ totalEventsFailed()

int edm::StreamSchedule::totalEventsFailed ( ) const
inline

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 236 of file StreamSchedule.h.

236 { return totalEvents() - totalEventsPassed(); }

References totalEvents(), and totalEventsPassed().

Referenced by getTriggerReport().

◆ totalEventsPassed()

int edm::StreamSchedule::totalEventsPassed ( ) const
inline

Return the number of events which have been passed by one or more trigger paths.

Definition at line 232 of file StreamSchedule.h.

232 { return total_passed_; }

References total_passed_.

Referenced by getTriggerReport(), and totalEventsFailed().

Member Data Documentation

◆ actReg_

std::shared_ptr<ActivityRegistry> edm::StreamSchedule::actReg_
private

◆ earlyDeleteBranchToCount_

std::vector<BranchToCount> edm::StreamSchedule::earlyDeleteBranchToCount_
private

Definition at line 354 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

◆ earlyDeleteHelpers_

std::vector<EarlyDeleteHelper> edm::StreamSchedule::earlyDeleteHelpers_
private

Definition at line 364 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

◆ earlyDeleteHelperToBranchIndicies_

std::vector<unsigned int> edm::StreamSchedule::earlyDeleteHelperToBranchIndicies_
private

Definition at line 361 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

◆ empty_end_paths_

std::vector<int> edm::StreamSchedule::empty_end_paths_
private

Definition at line 349 of file StreamSchedule.h.

Referenced by fillEndPath(), makePathStatusInserters(), and processOneEventAsync().

◆ empty_trig_paths_

std::vector<int> edm::StreamSchedule::empty_trig_paths_
private

Definition at line 348 of file StreamSchedule.h.

Referenced by fillTrigPath(), makePathStatusInserters(), and processOneEventAsync().

◆ end_paths_

TrigPaths edm::StreamSchedule::end_paths_
private

◆ endpathsAreActive_

volatile bool edm::StreamSchedule::endpathsAreActive_
private

Definition at line 372 of file StreamSchedule.h.

Referenced by enableEndPaths(), and endPathsEnabled().

◆ endPathStatusInserterWorkers_

std::vector<edm::propagate_const<WorkerPtr> > edm::StreamSchedule::endPathStatusInserterWorkers_
private

Definition at line 344 of file StreamSchedule.h.

Referenced by makePathStatusInserters(), and processOneEventAsync().

◆ number_of_unscheduled_modules_

unsigned int edm::StreamSchedule::number_of_unscheduled_modules_
private

Definition at line 368 of file StreamSchedule.h.

Referenced by numberOfUnscheduledModules(), and StreamSchedule().

◆ pathStatusInserterWorkers_

std::vector<edm::propagate_const<WorkerPtr> > edm::StreamSchedule::pathStatusInserterWorkers_
private

Definition at line 343 of file StreamSchedule.h.

Referenced by makePathStatusInserters(), and processOneEventAsync().

◆ results_

edm::propagate_const<TrigResPtr> edm::StreamSchedule::results_
private

Definition at line 340 of file StreamSchedule.h.

Referenced by finishedPaths(), processOneEventAsync(), resetAll(), and results().

◆ results_inserter_

edm::propagate_const<WorkerPtr> edm::StreamSchedule::results_inserter_
private

Definition at line 342 of file StreamSchedule.h.

Referenced by finishedPaths(), and StreamSchedule().

◆ skippingEvent_

std::atomic<bool> edm::StreamSchedule::skippingEvent_
private

Definition at line 373 of file StreamSchedule.h.

Referenced by fillTrigPath(), and resetAll().

◆ streamContext_

StreamContext edm::StreamSchedule::streamContext_
private

◆ streamID_

StreamID edm::StreamSchedule::streamID_
private

◆ total_events_

int edm::StreamSchedule::total_events_
private

Definition at line 366 of file StreamSchedule.h.

Referenced by clearCounters(), processOneEventAsync(), and totalEvents().

◆ total_passed_

int edm::StreamSchedule::total_passed_
private

Definition at line 367 of file StreamSchedule.h.

Referenced by clearCounters(), finishedPaths(), and totalEventsPassed().

◆ trig_paths_

TrigPaths edm::StreamSchedule::trig_paths_
private

◆ workerManager_

WorkerManager edm::StreamSchedule::workerManager_
private
edm::StreamSchedule::fillEndPath
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames)
Definition: StreamSchedule.cc:500
edm::StreamSchedule::addToAllWorkers
void addToAllWorkers(Worker *w)
Definition: StreamSchedule.cc:898
edm::pset::Registry::instance
static Registry * instance()
Definition: Registry.cc:12
edm::PathContext::PathType::kPath
edm::WorkerManager::addToAllWorkers
void addToAllWorkers(Worker *w)
Definition: WorkerManager.cc:119
edm::TerminationOrigin::ExceptionFromThisContext
mps_fire.i
i
Definition: mps_fire.py:355
edm::StreamSchedule::endPathStatusInserterWorkers_
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
Definition: StreamSchedule.h:344
edm::StreamSchedule::empty_end_paths_
std::vector< int > empty_end_paths_
Definition: StreamSchedule.h:349
dqmiodumpmetadata.n
n
Definition: dqmiodumpmetadata.py:28
edm::StreamSchedule::results
TrigResConstPtr results() const
Definition: StreamSchedule.h:329
cms::Exception::addContext
void addContext(std::string const &context)
Definition: Exception.cc:165
edm::StreamSchedule::makePathStatusInserters
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
Definition: StreamSchedule.cc:914
edm::StreamSchedule::fillWorkers
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames)
Definition: StreamSchedule.cc:399
edm::WorkerManager::resetAll
void resetAll()
Definition: WorkerManager.cc:117
edm::StreamSchedule::workerManager_
WorkerManager workerManager_
Definition: StreamSchedule.h:337
MicroEventContent_cff.branch
branch
Definition: MicroEventContent_cff.py:152
edm::StreamSchedule::streamContext_
StreamContext streamContext_
Definition: StreamSchedule.h:371
AlCaHLTBitMon_ParallelJobs.p
p
Definition: AlCaHLTBitMon_ParallelJobs.py:153
edm::printCmsExceptionWarning
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
Definition: ExceptionMessages.cc:25
edm::StreamSchedule::earlyDeleteBranchToCount_
std::vector< BranchToCount > earlyDeleteBranchToCount_
Definition: StreamSchedule.h:354
edm::WorkerManager::processOneOccurrenceAsync
void processOneOccurrenceAsync(WaitingTask *task, typename T::MyPrincipal &principal, EventSetupImpl const &eventSetup, ServiceToken const &token, StreamID streamID, typename T::Context const *topContext, U const *context)
Definition: WorkerManager.h:137
edm::StreamSchedule::total_events_
int total_events_
Definition: StreamSchedule.h:366
edm::WorkerManager::addToUnscheduledWorkers
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
Definition: WorkerManager.cc:42
edm::StreamSchedule::PathWorkers
std::vector< WorkerInPath > PathWorkers
Definition: StreamSchedule.h:164
edm::make_functor_task
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
edm::StreamSchedule::totalEventsPassed
int totalEventsPassed() const
Definition: StreamSchedule.h:232
edm::fillWorkerSummary
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
Definition: StreamSchedule.cc:873
cms::cuda::assert
assert(be >=bs)
edm::StreamSchedule::allWorkers
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
Definition: StreamSchedule.h:257
edm::WorkerManager::actionTable
ExceptionToActionTable const & actionTable() const
Definition: WorkerManager.h:89
edm::StreamID::value
unsigned int value() const
Definition: StreamID.h:42
edm::WorkerT
Definition: Frameworkfwd.h:54
edm::StreamSchedule::WorkerPtr
std::shared_ptr< Worker > WorkerPtr
Definition: StreamSchedule.h:159
edm::get_underlying
T & get_underlying(propagate_const< T > &)
Definition: propagate_const.h:103
newFWLiteAna.found
found
Definition: newFWLiteAna.py:118
groupFilesInBlocks.temp
list temp
Definition: groupFilesInBlocks.py:142
CMS_SA_ALLOW
#define CMS_SA_ALLOW
Definition: thread_safety_macros.h:5
edm::StreamSchedule::results_inserter_
edm::propagate_const< WorkerPtr > results_inserter_
Definition: StreamSchedule.h:342
edm::for_all
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const::get
element_type const * get() const
Definition: propagate_const.h:64
edm::Worker::kFilter
Definition: Worker.h:86
edm::StreamSchedule::number_of_unscheduled_modules_
unsigned int number_of_unscheduled_modules_
Definition: StreamSchedule.h:368
edm::StreamSchedule::totalEvents
int totalEvents() const
Definition: StreamSchedule.h:228
hltMonBTagIPClient_cfi.pathName
pathName
Definition: hltMonBTagIPClient_cfi.py:5
w
const double w
Definition: UKUtility.cc:23
edm::convertException::wrap
auto wrap(F iFunc) -> decltype(iFunc())
Definition: ConvertException.h:19
TrackValidation_cff.task
task
Definition: TrackValidation_cff.py:252
edm::WorkerInPath::Ignore
Definition: WorkerInPath.h:27
edm::WorkerManager::allWorkers
AllWorkers const & allWorkers() const
Definition: WorkerManager.h:85
edm::exception_actions::FailPath
Definition: ExceptionActions.h:11
Service
WaitingTaskHolder
HcalDetIdTransform::transform
unsigned transform(const HcalDetId &id, unsigned transformCode)
Definition: HcalDetIdTransform.cc:7
edm::StreamSchedule::actionTable
ExceptionToActionTable const & actionTable() const
returns the action table
Definition: StreamSchedule.h:285
edm::InEvent
Definition: BranchType.h:11
h
edm::exception_actions::SkipEvent
Definition: ExceptionActions.h:11
edm::WorkerInPath::Normal
Definition: WorkerInPath.h:27
edm::make_waiting_task
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:87
edm::StreamSchedule::results_
edm::propagate_const< TrigResPtr > results_
Definition: StreamSchedule.h:340
edm::StreamSchedule::empty_trig_paths_
std::vector< int > empty_trig_paths_
Definition: StreamSchedule.h:348
edm::StreamSchedule::initializeEarlyDelete
void initializeEarlyDelete(ModuleRegistry &modReg, edm::ParameterSet const &opts, edm::ProductRegistry const &preg, bool allowEarlyDelete)
Definition: StreamSchedule.cc:243
mitigatedMETSequence_cff.U
U
Definition: mitigatedMETSequence_cff.py:36
AlCaHLTBitMon_QueryRunRegistry.string
string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
edm::WorkerInPath::FilterAction
FilterAction
Definition: WorkerInPath.h:27
edm::WorkerManager::beginStream
void beginStream(StreamID iID, StreamContext &streamContext)
Definition: WorkerManager.cc:105
edm::get_underlying_safe
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
Definition: get_underlying_safe.h:40
edm::ServiceRegistry::Operate
friend class Operate
Definition: ServiceRegistry.h:54
cms::Exception::context
std::list< std::string > const & context() const
Definition: Exception.cc:147
ParameterSet
Definition: Functions.h:16
edm::StreamSchedule::skippingEvent_
std::atomic< bool > skippingEvent_
Definition: StreamSchedule.h:373
edm::StreamSchedule::actReg_
std::shared_ptr< ActivityRegistry > actReg_
Definition: StreamSchedule.h:338
KineDebug3::count
void count()
Definition: KinematicConstrainedVertexUpdatorT.h:21
dumpMFGeometry_cfg.delta
delta
Definition: dumpMFGeometry_cfg.py:25
helper
Definition: helper.py:1
edm::addContextAndPrintException
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
Definition: ExceptionHelpers.cc:11
edm::StreamSchedule::fillTrigPath
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames)
Definition: StreamSchedule.cc:470
edm::WorkerManager::processAccumulatorsAsync
void processAccumulatorsAsync(WaitingTask *task, typename T::MyPrincipal const &ep, EventSetupImpl const &es, ServiceToken const &token, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
Definition: WorkerManager.h:149
writedatasetfile.action
action
Definition: writedatasetfile.py:8
cuy.rep
rep
Definition: cuy.py:1190
edm::StreamSchedule::end_paths_
TrigPaths end_paths_
Definition: StreamSchedule.h:347
B2GTnPMonitor_cfi.item
item
Definition: B2GTnPMonitor_cfi.py:147
edm::fillPathSummary
static void fillPathSummary(Path const &path, PathSummary &sum)
Definition: StreamSchedule.cc:841
edm::StreamSchedule::finishedPaths
void finishedPaths(std::atomic< std::exception_ptr * > &, WaitingTaskHolder, EventPrincipal &ep, EventSetupImpl const &es)
Definition: StreamSchedule.cc:662
edm::StreamSchedule::streamID_
StreamID streamID_
Definition: StreamSchedule.h:370
cmsLHEtoEOSManager.l
l
Definition: cmsLHEtoEOSManager.py:193
patCandidatesForDimuonsSequences_cff.pathNames
pathNames
Definition: patCandidatesForDimuonsSequences_cff.py:109
edm::StreamSchedule::resetAll
void resetAll()
Definition: StreamSchedule.cc:893
edm::Worker::clearCounters
void clearCounters()
Definition: Worker.h:217
edm::StreamSchedule::earlyDeleteHelpers_
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
Definition: StreamSchedule.h:364
edm::SelectedProductsForBranchType
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
Definition: SelectedProducts.h:13
eostools.move
def move(src, dest)
Definition: eostools.py:511
edm::StreamSchedule::TrigResPtr
std::shared_ptr< HLTGlobalStatus > TrigResPtr
Definition: StreamSchedule.h:157
edm::Path::name
std::string const & name() const
Definition: Path.h:80
triggerObjects_cff.id
id
Definition: triggerObjects_cff.py:31
T
long double T
Definition: Basic3DVectorLD.h:48
Exception
Definition: hltDiff.cc:246
edm::StreamSchedule::trig_paths_
TrigPaths trig_paths_
Definition: StreamSchedule.h:346
edm::PathContext::PathType::kEndPath
edm::WorkerInPath::Veto
Definition: WorkerInPath.h:27
edm::StreamSchedule::resetEarlyDelete
void resetEarlyDelete()
Definition: StreamSchedule.cc:900
Skims_PA_cff.name
name
Definition: Skims_PA_cff.py:17
or
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
vstring
vector< string > vstring
Definition: ExoticaDQM.cc:8
edm::WorkerManager::setupOnDemandSystem
void setupOnDemandSystem(Principal &principal, EventSetupImpl const &es)
Definition: WorkerManager.cc:125
edm::ExceptionToActionTable::find
exception_actions::ActionCodes find(const std::string &category) const
Definition: ExceptionActions.cc:85
edm::WorkerManager::endStream
void endStream(StreamID iID, StreamContext &streamContext)
Definition: WorkerManager.cc:111
AlignmentPI::index
index
Definition: AlignmentPayloadInspectorHelper.h:46
edm::WorkerManager::getWorker
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
Definition: WorkerManager.cc:33
MillePedeFileConverter_cfg.out
out
Definition: MillePedeFileConverter_cfg.py:31
actions
roAction_t actions[nactions]
Definition: GenABIO.cc:181
mps_fire.result
result
Definition: mps_fire.py:303
edm::search_all
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:36
cms::Exception
Definition: Exception.h:70
edm::StreamSchedule::streamID
StreamID streamID() const
Definition: StreamSchedule.h:201
HerwigMaxPtPartonFilter_cfi.moduleLabel
moduleLabel
Definition: HerwigMaxPtPartonFilter_cfi.py:4
edm::StreamSchedule::pathStatusInserterWorkers_
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
Definition: StreamSchedule.h:343
edm::hlt::Pass
accept
Definition: HLTenums.h:18
edm::exception_actions::IgnoreCompletely
Definition: ExceptionActions.h:11
edm::StreamSchedule::earlyDeleteHelperToBranchIndicies_
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
Definition: StreamSchedule.h:361
edm::Path::clearCounters
void clearCounters()
Definition: Path.cc:196
edm::StreamSchedule::finishProcessOneEvent
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
Definition: StreamSchedule.cc:718
edm::StreamSchedule::totalEventsFailed
int totalEventsFailed() const
Definition: StreamSchedule.h:236
edm::pset::Registry::getMapped
bool getMapped(key_type const &k, value_type &result) const
Definition: Registry.cc:17
edm::errors::Configuration
Definition: EDMException.h:36
SiStripBadComponentsDQMServiceTemplate_cfg.ep
ep
Definition: SiStripBadComponentsDQMServiceTemplate_cfg.py:86
label
const char * label
Definition: PFTauDecayModeTools.cc:11
edm::StreamSchedule::endpathsAreActive_
volatile bool endpathsAreActive_
Definition: StreamSchedule.h:372
muonDTDigis_cfi.pset
pset
Definition: muonDTDigis_cfi.py:27
geometryDiff.opts
opts
Definition: geometryDiff.py:11
edm::StreamSchedule::total_passed_
int total_passed_
Definition: StreamSchedule.h:367
edm::exception_actions::ActionCodes
ActionCodes
Definition: ExceptionActions.h:11
findQualityFiles.size
size
Write out results.
Definition: findQualityFiles.py:443
MillePedeFileConverter_cfg.e
e
Definition: MillePedeFileConverter_cfg.py:37
unpackBuffers-CaloStage2.token
token
Definition: unpackBuffers-CaloStage2.py:316