CMS 3D CMS Logo

List of all members | Classes | Public Types | Public Member Functions | Private Member Functions | Private Attributes
edm::StreamSchedule Class Reference

#include <StreamSchedule.h>

Classes

struct  AliasInfo
 
class  SendTerminationSignalIfException
 

Public Types

typedef std::vector< Worker * > AllWorkers
 
typedef std::vector< PathNonTrigPaths
 
typedef std::vector< WorkerInPathPathWorkers
 
typedef std::vector< PathTrigPaths
 
typedef std::shared_ptr< HLTGlobalStatus const > TrigResConstPtr
 
typedef std::shared_ptr< HLTGlobalStatusTrigResPtr
 
typedef std::vector< std::string > vstring
 
typedef std::shared_ptr< WorkerWorkerPtr
 
typedef std::vector< Worker * > Workers
 

Public Member Functions

AllWorkers const & allWorkers () const
 returns the collection of pointers to workers More...
 
void availablePaths (std::vector< std::string > &oLabelsToFill) const
 adds to oLabelsToFill the labels for all paths in the process More...
 
void beginStream ()
 
void clearCounters ()
 Clear all the counters in the trigger report. More...
 
StreamContext const & context () const
 
void deleteModule (std::string const &iLabel)
 Delete the module with label iLabel. More...
 
void endStream ()
 
std::vector< ModuleDescription const * > getAllModuleDescriptions () const
 
void getTriggerReport (TriggerReport &rep) const
 
void initializeEarlyDelete (ModuleRegistry &modReg, std::vector< std::string > const &branchesToDeleteEarly, std::multimap< std::string, std::string > const &referencesToBranches, std::vector< std::string > const &modulesToSkip, edm::ProductRegistry const &preg)
 
void moduleDescriptionsInEndPath (std::string const &iEndPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
 
void moduleDescriptionsInPath (std::string const &iPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
 
void modulesInPath (std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
 adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel More...
 
unsigned int numberOfUnscheduledModules () const
 
void processOneEventAsync (WaitingTaskHolder iTask, EventTransitionInfo &, ServiceToken const &token, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
 
template<typename T >
void processOneStreamAsync (WaitingTaskHolder iTask, typename T::TransitionInfoType &transitionInfo, ServiceToken const &token, bool cleaningUpAfterException=false)
 
void replaceModule (maker::ModuleHolder *iMod, std::string const &iLabel)
 clone the type of module with label iLabel but configure with iPSet. More...
 
StreamID streamID () const
 
 StreamSchedule (std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, StreamID streamID, ProcessContext const *processContext)
 
 StreamSchedule (StreamSchedule const &)=delete
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 
AllWorkers const & unscheduledWorkers () const
 

Private Member Functions

ExceptionToActionTable const & actionTable () const
 returns the action table More...
 
void addToAllWorkers (Worker *w)
 
void fillEndPath (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
 
void fillTrigPath (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
 
void fillWorkers (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
 
void finishedPaths (std::atomic< std::exception_ptr *> &, WaitingTaskHolder, EventTransitionInfo &)
 
std::exception_ptr finishProcessOneEvent (std::exception_ptr)
 
void makePathStatusInserters (std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
 
void reportSkipped (EventPrincipal const &ep) const
 
void resetAll ()
 
void resetEarlyDelete ()
 
TrigResConstPtr results () const
 
TrigResPtrresults ()
 
std::vector< Worker * > tryToPlaceConditionalModules (Worker *, std::unordered_set< std::string > &conditionalModules, std::unordered_multimap< std::string, edm::BranchDescription const *> const &conditionalModuleBranches, std::unordered_multimap< std::string, AliasInfo > const &aliasMap, ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
 

Private Attributes

std::shared_ptr< ActivityRegistryactReg_
 
std::vector< BranchToCountearlyDeleteBranchToCount_
 
std::vector< EarlyDeleteHelperearlyDeleteHelpers_
 
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
 
std::vector< int > empty_end_paths_
 
std::vector< int > empty_trig_paths_
 
TrigPaths end_paths_
 
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
 
unsigned int number_of_unscheduled_modules_
 
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
 
edm::propagate_const< TrigResPtrresults_
 
edm::propagate_const< WorkerPtrresults_inserter_
 
StreamContext streamContext_
 
StreamID streamID_
 
int total_events_
 
int total_passed_
 
TrigPaths trig_paths_
 
WorkerManager workerManager_
 

Detailed Description

Definition at line 154 of file StreamSchedule.h.

Member Typedef Documentation

◆ AllWorkers

typedef std::vector<Worker*> edm::StreamSchedule::AllWorkers

Definition at line 162 of file StreamSchedule.h.

◆ NonTrigPaths

Definition at line 158 of file StreamSchedule.h.

◆ PathWorkers

Definition at line 166 of file StreamSchedule.h.

◆ TrigPaths

typedef std::vector<Path> edm::StreamSchedule::TrigPaths

Definition at line 157 of file StreamSchedule.h.

◆ TrigResConstPtr

typedef std::shared_ptr<HLTGlobalStatus const> edm::StreamSchedule::TrigResConstPtr

Definition at line 160 of file StreamSchedule.h.

◆ TrigResPtr

Definition at line 159 of file StreamSchedule.h.

◆ vstring

typedef std::vector<std::string> edm::StreamSchedule::vstring

Definition at line 156 of file StreamSchedule.h.

◆ WorkerPtr

typedef std::shared_ptr<Worker> edm::StreamSchedule::WorkerPtr

Definition at line 161 of file StreamSchedule.h.

◆ Workers

typedef std::vector<Worker*> edm::StreamSchedule::Workers

Definition at line 164 of file StreamSchedule.h.

Constructor & Destructor Documentation

◆ StreamSchedule() [1/2]

edm::StreamSchedule::StreamSchedule ( std::shared_ptr< TriggerResultInserter inserter,
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters,
std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &  endPathStatusInserters,
std::shared_ptr< ModuleRegistry modReg,
ParameterSet proc_pset,
service::TriggerNamesService const &  tns,
PreallocationConfiguration const &  prealloc,
ProductRegistry pregistry,
ExceptionToActionTable const &  actions,
std::shared_ptr< ActivityRegistry areg,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
StreamID  streamID,
ProcessContext const *  processContext 
)

Definition at line 349 of file StreamSchedule.cc.

References actions, actReg_, addToAllWorkers(), edm::WorkerManager::addToUnscheduledWorkers(), allWorkers(), cms::cuda::assert(), end_paths_, fillEndPath(), fillTrigPath(), edm::propagate_const< T >::get(), edm::service::TriggerNamesService::getEndPaths(), edm::ParameterSet::getParameter(), edm::ParameterSet::getPSetForUpdate(), edm::service::TriggerNamesService::getTrigPaths(), label, dqm-mbProfile::log, makePathStatusInserters(), number_of_unscheduled_modules_, results(), results_inserter_, DBoxMetadataHelper::set_difference(), streamID(), trig_paths_, edm::StreamID::value(), and workerManager_.

363  : workerManager_(modReg, areg, actions),
364  actReg_(areg),
365  results_(new HLTGlobalStatus(tns.getTrigPaths().size())),
367  trig_paths_(),
368  end_paths_(),
369  total_events_(),
370  total_passed_(),
373  streamContext_(streamID_, processContext) {
374  bool hasPath = false;
375  std::vector<std::string> const& pathNames = tns.getTrigPaths();
376  std::vector<std::string> const& endPathNames = tns.getEndPaths();
377 
378  ConditionalTaskHelper conditionalTaskHelper(
379  proc_pset, preg, &prealloc, processConfiguration, workerManager_, pathNames);
380  std::unordered_set<std::string> conditionalModules;
381 
382  int trig_bitpos = 0;
383  trig_paths_.reserve(pathNames.size());
384  for (auto const& trig_name : pathNames) {
385  fillTrigPath(proc_pset,
386  preg,
387  &prealloc,
388  processConfiguration,
389  trig_bitpos,
390  trig_name,
391  results(),
392  endPathNames,
393  conditionalTaskHelper,
394  conditionalModules);
395  ++trig_bitpos;
396  hasPath = true;
397  }
398 
399  if (hasPath) {
400  // the results inserter stands alone
401  inserter->setTrigResultForStream(streamID.value(), results());
402 
403  results_inserter_ = makeInserter(actions, actReg_, inserter);
405  }
406 
407  // fill normal endpaths
408  int bitpos = 0;
409  end_paths_.reserve(endPathNames.size());
410  for (auto const& end_path_name : endPathNames) {
411  fillEndPath(proc_pset,
412  preg,
413  &prealloc,
414  processConfiguration,
415  bitpos,
416  end_path_name,
417  endPathNames,
418  conditionalTaskHelper,
419  conditionalModules);
420  ++bitpos;
421  }
422 
423  makePathStatusInserters(pathStatusInserters, endPathStatusInserters, actions);
424 
425  //See if all modules were used
426  std::set<std::string> usedWorkerLabels;
427  for (auto const& worker : allWorkers()) {
428  usedWorkerLabels.insert(worker->description()->moduleLabel());
429  }
430  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
431  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
432  std::vector<std::string> unusedLabels;
433  set_difference(modulesInConfigSet.begin(),
434  modulesInConfigSet.end(),
435  usedWorkerLabels.begin(),
436  usedWorkerLabels.end(),
437  back_inserter(unusedLabels));
438  std::set<std::string> unscheduledLabels;
439  std::vector<std::string> shouldBeUsedLabels;
440  if (!unusedLabels.empty()) {
441  //Need to
442  // 1) create worker
443  // 2) if it is a WorkerT<EDProducer>, add it to our list
444  // 3) hand list to our delayed reader
445  for (auto const& label : unusedLabels) {
446  bool isTracked;
447  ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
448  assert(isTracked);
449  assert(modulePSet != nullptr);
451  *modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
452  }
453  if (!shouldBeUsedLabels.empty()) {
454  std::ostringstream unusedStream;
455  unusedStream << "'" << shouldBeUsedLabels.front() << "'";
456  for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
457  itLabelEnd = shouldBeUsedLabels.end();
458  itLabel != itLabelEnd;
459  ++itLabel) {
460  unusedStream << ",'" << *itLabel << "'";
461  }
462  LogInfo("path") << "The following module labels are not assigned to any path:\n" << unusedStream.str() << "\n";
463  }
464  }
465  number_of_unscheduled_modules_ = unscheduledLabels.size();
466 
467  // Print conditional modules that were not consumed in any of their associated Paths
468  if (streamID.value() == 0 and not conditionalModules.empty()) {
469  // Intersection of unscheduled and ConditionalTask modules gives
470  // directly the set of conditional modules that were not
471  // consumed by anything in the Paths associated to the
472  // corresponding ConditionalTask.
473  std::vector<std::string_view> labelsToPrint;
474  std::copy_if(
475  unscheduledLabels.begin(),
476  unscheduledLabels.end(),
477  std::back_inserter(labelsToPrint),
478  [&conditionalModules](auto const& lab) { return conditionalModules.find(lab) != conditionalModules.end(); });
479 
480  if (not labelsToPrint.empty()) {
481  edm::LogWarning log("NonConsumedConditionalModules");
482  log << "The following modules were part of some ConditionalTask, but were not\n"
483  << "consumed by any other module in any of the Paths to which the ConditionalTask\n"
484  << "was associated. Perhaps they should be either removed from the\n"
485  << "job, or moved to a Task to make it explicit they are unscheduled.\n";
486  for (auto const& modLabel : labelsToPrint) {
487  log.format("\n {}", modLabel);
488  }
489  }
490  }
491  } // StreamSchedule::StreamSchedule
roAction_t actions[nactions]
Definition: GenABIO.cc:181
void addToAllWorkers(Worker *w)
edm::propagate_const< WorkerPtr > results_inserter_
assert(be >=bs)
StreamID streamID() const
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
constexpr element_type const * get() const
char const * label
TrigResConstPtr results() const
StreamContext streamContext_
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
Log< level::Info, false > LogInfo
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
edm::propagate_const< TrigResPtr > results_
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
unsigned int value() const
Definition: StreamID.h:43
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
std::vector< std::string > set_difference(std::vector< std::string > const &v1, std::vector< std::string > const &v2)

◆ StreamSchedule() [2/2]

edm::StreamSchedule::StreamSchedule ( StreamSchedule const &  )
delete

Member Function Documentation

◆ actionTable()

ExceptionToActionTable const& edm::StreamSchedule::actionTable ( ) const
inlineprivate

returns the action table

Definition at line 292 of file StreamSchedule.h.

References edm::WorkerManager::actionTable(), and workerManager_.

Referenced by fillEndPath(), fillTrigPath(), and finishedPaths().

292 { return workerManager_.actionTable(); }
WorkerManager workerManager_
ExceptionToActionTable const & actionTable() const
Definition: WorkerManager.h:93

◆ addToAllWorkers()

void edm::StreamSchedule::addToAllWorkers ( Worker w)
private

◆ allWorkers()

AllWorkers const& edm::StreamSchedule::allWorkers ( ) const
inline

returns the collection of pointers to workers

Definition at line 256 of file StreamSchedule.h.

References edm::WorkerManager::allWorkers(), and workerManager_.

Referenced by clearCounters(), getAllModuleDescriptions(), getTriggerReport(), initializeEarlyDelete(), replaceModule(), and StreamSchedule().

256 { return workerManager_.allWorkers(); }
WorkerManager workerManager_
AllWorkers const & allWorkers() const
Definition: WorkerManager.h:88

◆ availablePaths()

void edm::StreamSchedule::availablePaths ( std::vector< std::string > &  oLabelsToFill) const

adds to oLabelsToFill the labels for all paths in the process

Definition at line 1192 of file StreamSchedule.cc.

References edm::Path::name(), HcalDetIdTransform::transform(), and trig_paths_.

1192  {
1193  oLabelsToFill.reserve(trig_paths_.size());
1194  std::transform(trig_paths_.begin(),
1195  trig_paths_.end(),
1196  std::back_inserter(oLabelsToFill),
1197  std::bind(&Path::name, std::placeholders::_1));
1198  }
std::string const & name() const
Definition: Path.h:73
unsigned transform(const HcalDetId &id, unsigned transformCode)

◆ beginStream()

void edm::StreamSchedule::beginStream ( )

Definition at line 961 of file StreamSchedule.cc.

References edm::WorkerManager::beginStream(), streamContext_, streamID_, and workerManager_.

WorkerManager workerManager_
StreamContext streamContext_
void beginStream(StreamID iID, StreamContext &streamContext)

◆ clearCounters()

void edm::StreamSchedule::clearCounters ( )

Clear all the counters in the trigger report.

Definition at line 1324 of file StreamSchedule.cc.

References allWorkers(), edm::Path::clearCounters(), edm::Worker::clearCounters(), end_paths_, edm::for_all(), total_events_, total_passed_, and trig_paths_.

1324  {
1325  using std::placeholders::_1;
1327  for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
1328  for_all(end_paths_, std::bind(&Path::clearCounters, _1));
1329  for_all(allWorkers(), std::bind(&Worker::clearCounters, _1));
1330  }
void clearCounters()
Definition: Worker.h:232
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
void clearCounters()
Definition: Path.cc:183

◆ context()

StreamContext const& edm::StreamSchedule::context ( ) const
inline

Definition at line 261 of file StreamSchedule.h.

References streamContext_.

261 { return streamContext_; }
StreamContext streamContext_

◆ deleteModule()

void edm::StreamSchedule::deleteModule ( std::string const &  iLabel)

Delete the module with label iLabel.

Definition at line 981 of file StreamSchedule.cc.

References edm::WorkerManager::deleteModuleIfExists(), and workerManager_.

void deleteModuleIfExists(std::string const &moduleLabel)
WorkerManager workerManager_

◆ endStream()

void edm::StreamSchedule::endStream ( )

Definition at line 963 of file StreamSchedule.cc.

References edm::WorkerManager::endStream(), streamContext_, streamID_, and workerManager_.

void endStream(StreamID iID, StreamContext &streamContext)
WorkerManager workerManager_
StreamContext streamContext_

◆ fillEndPath()

void edm::StreamSchedule::fillEndPath ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
int  bitpos,
std::string const &  name,
std::vector< std::string > const &  endPathNames,
ConditionalTaskHelper const &  conditionalTaskHelper,
std::unordered_set< std::string > &  allConditionalModules 
)
private

Definition at line 923 of file StreamSchedule.cc.

References actionTable(), actReg_, addToAllWorkers(), empty_end_paths_, end_paths_, fillWorkers(), edm::PathContext::kEndPath, Skims_PA_cff::name, and streamContext_.

Referenced by StreamSchedule().

931  {
932  PathWorkers tmpworkers;
933  fillWorkers(proc_pset,
934  preg,
935  prealloc,
936  processConfiguration,
937  name,
938  true,
939  tmpworkers,
940  endPathNames,
941  conditionalTaskHelper,
942  allConditionalModules);
943 
944  if (!tmpworkers.empty()) {
945  end_paths_.emplace_back(bitpos,
946  name,
947  tmpworkers,
948  TrigResPtr(),
949  actionTable(),
950  actReg_,
953  } else {
954  empty_end_paths_.push_back(bitpos);
955  }
956  for (WorkerInPath const& workerInPath : tmpworkers) {
957  addToAllWorkers(workerInPath.getWorker());
958  }
959  }
ExceptionToActionTable const & actionTable() const
returns the action table
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
std::shared_ptr< HLTGlobalStatus > TrigResPtr
void addToAllWorkers(Worker *w)
std::shared_ptr< ActivityRegistry > actReg_
std::vector< WorkerInPath > PathWorkers
std::vector< int > empty_end_paths_
StreamContext streamContext_

◆ fillTrigPath()

void edm::StreamSchedule::fillTrigPath ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
int  bitpos,
std::string const &  name,
TrigResPtr  trptr,
std::vector< std::string > const &  endPathNames,
ConditionalTaskHelper const &  conditionalTaskHelper,
std::unordered_set< std::string > &  allConditionalModules 
)
private

Definition at line 889 of file StreamSchedule.cc.

References actionTable(), actReg_, addToAllWorkers(), empty_trig_paths_, fillWorkers(), edm::PathContext::kPath, Skims_PA_cff::name, streamContext_, and trig_paths_.

Referenced by StreamSchedule().

898  {
899  PathWorkers tmpworkers;
900  fillWorkers(proc_pset,
901  preg,
902  prealloc,
903  processConfiguration,
904  name,
905  false,
906  tmpworkers,
907  endPathNames,
908  conditionalTaskHelper,
909  allConditionalModules);
910 
911  // an empty path will cause an extra bit that is not used
912  if (!tmpworkers.empty()) {
913  trig_paths_.emplace_back(
914  bitpos, name, tmpworkers, trptr, actionTable(), actReg_, &streamContext_, PathContext::PathType::kPath);
915  } else {
916  empty_trig_paths_.push_back(bitpos);
917  }
918  for (WorkerInPath const& workerInPath : tmpworkers) {
919  addToAllWorkers(workerInPath.getWorker());
920  }
921  }
ExceptionToActionTable const & actionTable() const
returns the action table
std::vector< int > empty_trig_paths_
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
void addToAllWorkers(Worker *w)
std::shared_ptr< ActivityRegistry > actReg_
std::vector< WorkerInPath > PathWorkers
StreamContext streamContext_

◆ fillWorkers()

void edm::StreamSchedule::fillWorkers ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
std::string const &  name,
bool  ignoreFilters,
PathWorkers out,
std::vector< std::string > const &  endPathNames,
ConditionalTaskHelper const &  conditionalTaskHelper,
std::unordered_set< std::string > &  allConditionalModules 
)
private

Definition at line 782 of file StreamSchedule.cc.

References edm::ConditionalTaskHelper::aliasMap(), edm::ConditionalTaskHelper::conditionalModuleBranches(), edm::errors::Configuration, edm::Worker::description(), Exception, edm::ParameterSet::getParameter(), edm::ParameterSet::getUntrackedParameter(), edm::WorkerInPath::Ignore, ALPAKA_ACCELERATOR_NAMESPACE::vertexFinder::it, edm::Worker::kFilter, HerwigMaxPtPartonFilter_cfi::moduleLabel, edm::ModuleDescription::moduleName(), edm::Worker::moduleType(), Skims_PA_cff::name, edm::WorkerInPath::Normal, or, MillePedeFileConverter_cfg::out, hltMonBTagIPClient_cfi::pathName, edm::search_all(), AlCaHLTBitMon_QueryRunRegistry::string, tryToPlaceConditionalModules(), edm::WorkerInPath::Veto, and workerManager_.

Referenced by fillEndPath(), and fillTrigPath().

791  {
792  vstring modnames = proc_pset.getParameter<vstring>(pathName);
793  PathWorkers tmpworkers;
794 
795  //Pull out ConditionalTask modules
796  auto condRange = findConditionalTaskModulesRange(modnames);
797 
798  std::unordered_set<std::string> conditionalmods;
799  //An EDAlias may be redirecting to a module on a ConditionalTask
800  std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModsBranches;
801  std::unordered_map<std::string, unsigned int> conditionalModOrder;
802  if (condRange.first != condRange.second) {
803  for (auto it = condRange.first; it != condRange.second; ++it) {
804  // ordering needs to skip the # token in the path list
805  conditionalModOrder.emplace(*it, it - modnames.begin() - 1);
806  }
807  //the last entry should be ignored since it is required to be "@"
808  conditionalmods = std::unordered_set<std::string>(std::make_move_iterator(condRange.first),
809  std::make_move_iterator(condRange.second));
810 
811  conditionalModsBranches = conditionalTaskHelper.conditionalModuleBranches(conditionalmods);
812  modnames.erase(std::prev(condRange.first), modnames.end());
813 
814  // Make a union of all conditional modules from all Paths
815  allConditionalModules.insert(conditionalmods.begin(), conditionalmods.end());
816  }
817 
818  unsigned int placeInPath = 0;
819  for (auto const& name : modnames) {
820  //Modules except EDFilters are set to run concurrently by default
821  bool doNotRunConcurrently = false;
823  if (name[0] == '!') {
824  filterAction = WorkerInPath::Veto;
825  } else if (name[0] == '-' or name[0] == '+') {
826  filterAction = WorkerInPath::Ignore;
827  }
828  if (name[0] == '|' or name[0] == '+') {
829  //cms.wait was specified so do not run concurrently
830  doNotRunConcurrently = true;
831  }
832 
834  if (filterAction != WorkerInPath::Normal or name[0] == '|') {
835  moduleLabel.erase(0, 1);
836  }
837 
838  Worker* worker = getWorker(moduleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration);
839  if (worker == nullptr) {
840  std::string pathType("endpath");
841  if (!search_all(endPathNames, pathName)) {
842  pathType = std::string("path");
843  }
845  << "The unknown module label \"" << moduleLabel << "\" appears in " << pathType << " \"" << pathName
846  << "\"\n please check spelling or remove that label from the path.";
847  }
848 
849  if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType() == Worker::kFilter) {
850  // We have a filter on an end path, and the filter is not explicitly ignored.
851  // See if the filter is allowed.
852  std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
853  if (!search_all(allowed_filters, worker->description()->moduleName())) {
854  // Filter is not allowed. Ignore the result, and issue a warning.
855  filterAction = WorkerInPath::Ignore;
856  LogWarning("FilterOnEndPath") << "The EDFilter '" << worker->description()->moduleName()
857  << "' with module label '" << moduleLabel << "' appears on EndPath '"
858  << pathName << "'.\n"
859  << "The return value of the filter will be ignored.\n"
860  << "To suppress this warning, either remove the filter from the endpath,\n"
861  << "or explicitly ignore it in the configuration by using cms.ignore().\n";
862  }
863  }
864  bool runConcurrently = not doNotRunConcurrently;
865  if (runConcurrently && worker->moduleType() == Worker::kFilter and filterAction != WorkerInPath::Ignore) {
866  runConcurrently = false;
867  }
868 
869  auto condModules = tryToPlaceConditionalModules(worker,
870  conditionalmods,
871  conditionalModsBranches,
872  conditionalTaskHelper.aliasMap(),
873  proc_pset,
874  preg,
875  prealloc,
876  processConfiguration);
877  for (auto condMod : condModules) {
878  tmpworkers.emplace_back(
879  condMod, WorkerInPath::Ignore, conditionalModOrder[condMod->description()->moduleLabel()], true);
880  }
881 
882  tmpworkers.emplace_back(worker, filterAction, placeInPath, runConcurrently);
883  ++placeInPath;
884  }
885 
886  out.swap(tmpworkers);
887  }
vector< string > vstring
Definition: ExoticaDQM.cc:7
std::vector< Worker * > tryToPlaceConditionalModules(Worker *, std::unordered_set< std::string > &conditionalModules, std::unordered_multimap< std::string, edm::BranchDescription const *> const &conditionalModuleBranches, std::unordered_multimap< std::string, AliasInfo > const &aliasMap, ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
WorkerManager workerManager_
std::vector< WorkerInPath > PathWorkers
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:36
Log< level::Warning, false > LogWarning

◆ finishedPaths()

void edm::StreamSchedule::finishedPaths ( std::atomic< std::exception_ptr *> &  iExcept,
WaitingTaskHolder  iWait,
EventTransitionInfo info 
)
private

Definition at line 1103 of file StreamSchedule.cc.

References writedatasetfile::action, actionTable(), cms::Exception::addContext(), cms::cuda::assert(), CMS_SA_ALLOW, cms::Exception::context(), edm::WaitingTaskHolder::doneWaiting(), MillePedeFileConverter_cfg::e, edm::ExceptionToActionTable::find(), edm::propagate_const< T >::get(), edm::exception_actions::IgnoreCompletely, info(), edm::printCmsExceptionWarning(), results_, results_inserter_, streamContext_, streamID_, total_passed_, and edm::exception_actions::TryToContinue.

Referenced by processOneEventAsync().

1105  {
1106  if (iExcept) {
1107  // Caught exception is propagated via WaitingTaskHolder
1108  CMS_SA_ALLOW try { std::rethrow_exception(*(iExcept.load())); } catch (cms::Exception& e) {
1112  edm::printCmsExceptionWarning("TryToContinue", e);
1113  *(iExcept.load()) = std::exception_ptr();
1114  } else {
1115  *(iExcept.load()) = std::current_exception();
1116  }
1117  } catch (...) {
1118  *(iExcept.load()) = std::current_exception();
1119  }
1120  }
1121 
1122  if ((not iExcept) and results_->accept()) {
1123  ++total_passed_;
1124  }
1125 
1126  if (nullptr != results_inserter_.get()) {
1127  // Caught exception is propagated to the caller
1128  CMS_SA_ALLOW try {
1129  //Even if there was an exception, we need to allow results inserter
1130  // to run since some module may be waiting on its results.
1131  ParentContext parentContext(&streamContext_);
1132  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
1133 
1134  auto expt = results_inserter_->runModuleDirectly<Traits>(info, streamID_, parentContext, &streamContext_);
1135  if (expt) {
1136  std::rethrow_exception(expt);
1137  }
1138  } catch (cms::Exception& ex) {
1139  if (not iExcept) {
1140  if (ex.context().empty()) {
1141  std::ostringstream ost;
1142  ost << "Processing Event " << info.principal().id();
1143  ex.addContext(ost.str());
1144  }
1145  iExcept.store(new std::exception_ptr(std::current_exception()));
1146  }
1147  } catch (...) {
1148  if (not iExcept) {
1149  iExcept.store(new std::exception_ptr(std::current_exception()));
1150  }
1151  }
1152  }
1153  std::exception_ptr ptr;
1154  if (iExcept) {
1155  ptr = *iExcept.load();
1156  }
1157  iWait.doneWaiting(ptr);
1158  }
ExceptionToActionTable const & actionTable() const
returns the action table
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
edm::propagate_const< WorkerPtr > results_inserter_
assert(be >=bs)
constexpr element_type const * get() const
StreamContext streamContext_
exception_actions::ActionCodes find(const std::string &category) const
edm::propagate_const< TrigResPtr > results_
void addContext(std::string const &context)
Definition: Exception.cc:169
std::list< std::string > const & context() const
Definition: Exception.cc:151
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)

◆ finishProcessOneEvent()

std::exception_ptr edm::StreamSchedule::finishProcessOneEvent ( std::exception_ptr  iExcept)
private

Definition at line 1160 of file StreamSchedule.cc.

References actReg_, edm::addContextAndPrintException(), CMS_SA_ALLOW, cms::Exception::context(), edm::ExceptionFromThisContext, resetEarlyDelete(), streamContext_, and edm::convertException::wrap().

Referenced by processOneEventAsync().

1160  {
1161  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
1162 
1163  if (iExcept) {
1164  //add context information to the exception and print message
1165  try {
1166  convertException::wrap([&]() { std::rethrow_exception(iExcept); });
1167  } catch (cms::Exception& ex) {
1168  bool const cleaningUpAfterException = false;
1169  if (ex.context().empty()) {
1170  addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
1171  } else {
1172  addContextAndPrintException("", ex, cleaningUpAfterException);
1173  }
1174  iExcept = std::current_exception();
1175  }
1176 
1177  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
1178  }
1179  // Caught exception is propagated to the caller
1180  CMS_SA_ALLOW try { Traits::postScheduleSignal(actReg_.get(), &streamContext_); } catch (...) {
1181  if (not iExcept) {
1182  iExcept = std::current_exception();
1183  }
1184  }
1185  if (not iExcept) {
1186  resetEarlyDelete();
1187  }
1188 
1189  return iExcept;
1190  }
#define CMS_SA_ALLOW
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
std::shared_ptr< ActivityRegistry > actReg_
StreamContext streamContext_
auto wrap(F iFunc) -> decltype(iFunc())
std::list< std::string > const & context() const
Definition: Exception.cc:151

◆ getAllModuleDescriptions()

std::vector< ModuleDescription const * > edm::StreamSchedule::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this StreamSchedule. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 983 of file StreamSchedule.cc.

References allWorkers(), AlCaHLTBitMon_ParallelJobs::p, and mps_fire::result.

983  {
984  std::vector<ModuleDescription const*> result;
985  result.reserve(allWorkers().size());
986 
987  for (auto const& worker : allWorkers()) {
988  ModuleDescription const* p = worker->description();
989  result.push_back(p);
990  }
991  return result;
992  }
size
Write out results.
AllWorkers const & allWorkers() const
returns the collection of pointers to workers

◆ getTriggerReport()

void edm::StreamSchedule::getTriggerReport ( TriggerReport rep) const

Return the trigger report information on paths, modules-in-path, modules-in-endpath, and modules.

Definition at line 1314 of file StreamSchedule.cc.

References allWorkers(), end_paths_, edm::fillPathSummary(), edm::fillWorkerSummary(), cuy::rep, totalEvents(), totalEventsFailed(), totalEventsPassed(), and trig_paths_.

1314  {
1315  rep.eventSummary.totalEvents += totalEvents();
1316  rep.eventSummary.totalEventsPassed += totalEventsPassed();
1317  rep.eventSummary.totalEventsFailed += totalEventsFailed();
1318 
1319  fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
1320  fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
1321  fill_summary(allWorkers(), rep.workerSummaries, &fillWorkerSummary);
1322  }
int totalEventsPassed() const
int totalEventsFailed() const
static void fillPathSummary(Path const &path, PathSummary &sum)
rep
Definition: cuy.py:1189
int totalEvents() const
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
AllWorkers const & allWorkers() const
returns the collection of pointers to workers

◆ initializeEarlyDelete()

void edm::StreamSchedule::initializeEarlyDelete ( ModuleRegistry modReg,
std::vector< std::string > const &  branchesToDeleteEarly,
std::multimap< std::string, std::string > const &  referencesToBranches,
std::vector< std::string > const &  modulesToSkip,
edm::ProductRegistry const &  preg 
)

Definition at line 493 of file StreamSchedule.cc.

References allWorkers(), cms::cuda::assert(), MicroEventContent_cff::branch, edm::maker::ModuleHolder::createOutputModuleCommunicator(), dumpMFGeometry_cfg::delta, submitPVResolutionJobs::desc, earlyDeleteBranchToCount_, earlyDeleteHelpers_, earlyDeleteHelperToBranchIndicies_, end_paths_, edm::ModuleRegistry::forAllModuleHolders(), dqm-mbProfile::format, newFWLiteAna::found, edm::InEvent, ALPAKA_ACCELERATOR_NAMESPACE::vertexFinder::it, B2GTnPMonitor_cfi::item, ALPAKA_ACCELERATOR_NAMESPACE::pixelClustering::pixelStatus::kEmpty, MainPageGenerator::l, dqmiodumpmetadata::n, AlCaHLTBitMon_ParallelJobs::p, resetEarlyDelete(), AlCaHLTBitMon_QueryRunRegistry::string, groupFilesInBlocks::temp, trig_paths_, mitigatedMETSequence_cff::U, and w().

497  {
498  // setup the list with those products actually registered for this job
499  std::multimap<std::string, Worker*> branchToReadingWorker;
500  initializeBranchToReadingWorker(branchesToDeleteEarly, preg, branchToReadingWorker);
501 
502  const std::vector<std::string> kEmpty;
503  std::map<Worker*, unsigned int> reserveSizeForWorker;
504  unsigned int upperLimitOnReadingWorker = 0;
505  unsigned int upperLimitOnIndicies = 0;
506  unsigned int nUniqueBranchesToDelete = branchToReadingWorker.size();
507 
508  //talk with output modules first
509  modReg.forAllModuleHolders([&branchToReadingWorker, &nUniqueBranchesToDelete](maker::ModuleHolder* iHolder) {
510  auto comm = iHolder->createOutputModuleCommunicator();
511  if (comm) {
512  if (!branchToReadingWorker.empty()) {
513  //If an OutputModule needs a product, we can't delete it early
514  // so we should remove it from our list
515  SelectedProductsForBranchType const& kept = comm->keptProducts();
516  for (auto const& item : kept[InEvent]) {
517  BranchDescription const& desc = *item.first;
518  auto found = branchToReadingWorker.equal_range(desc.branchName());
519  if (found.first != found.second) {
520  --nUniqueBranchesToDelete;
521  branchToReadingWorker.erase(found.first, found.second);
522  }
523  }
524  }
525  }
526  });
527 
528  if (branchToReadingWorker.empty()) {
529  return;
530  }
531 
532  std::unordered_set<std::string> modulesToExclude(modulesToSkip.begin(), modulesToSkip.end());
533  for (auto w : allWorkers()) {
534  if (modulesToExclude.end() != modulesToExclude.find(w->description()->moduleLabel())) {
535  continue;
536  }
537  //determine if this module could read a branch we want to delete early
538  auto consumes = w->consumesInfo();
539  if (not consumes.empty()) {
540  bool foundAtLeastOneMatchingBranch = false;
541  for (auto const& product : consumes) {
542  std::string branch = fmt::format("{}_{}_{}_{}",
543  product.type().friendlyClassName(),
544  product.label().data(),
545  product.instance().data(),
546  product.process().data());
547  {
548  //Handle case where worker directly consumes product
549  auto found = branchToReadingWorker.end();
550  if (product.process().empty()) {
551  auto startFound = branchToReadingWorker.lower_bound(branch);
552  if (startFound != branchToReadingWorker.end()) {
553  if (startFound->first.substr(0, branch.size()) == branch) {
554  //match all processNames here, even if it means multiple matches will happen
555  found = startFound;
556  }
557  }
558  } else {
559  auto exactFound = branchToReadingWorker.equal_range(branch);
560  if (exactFound.first != exactFound.second) {
561  found = exactFound.first;
562  }
563  }
564  if (found != branchToReadingWorker.end()) {
565  if (not foundAtLeastOneMatchingBranch) {
566  ++upperLimitOnReadingWorker;
567  foundAtLeastOneMatchingBranch = true;
568  }
569  ++upperLimitOnIndicies;
570  ++reserveSizeForWorker[w];
571  if (nullptr == found->second) {
572  found->second = w;
573  } else {
574  branchToReadingWorker.insert(make_pair(found->first, w));
575  }
576  }
577  }
578  {
579  //Handle case where indirectly consumes product
580  auto found = referencesToBranches.end();
581  if (product.process().empty()) {
582  auto startFound = referencesToBranches.lower_bound(branch);
583  if (startFound != referencesToBranches.end()) {
584  if (startFound->first.substr(0, branch.size()) == branch) {
585  //match all processNames here, even if it means multiple matches will happen
586  found = startFound;
587  }
588  }
589  } else {
590  //can match exactly
591  auto exactFound = referencesToBranches.equal_range(branch);
592  if (exactFound.first != exactFound.second) {
593  found = exactFound.first;
594  }
595  }
596  if (found != referencesToBranches.end()) {
597  for (auto itr = found; (itr != referencesToBranches.end()) and (itr->first == found->first); ++itr) {
598  auto foundInBranchToReadingWorker = branchToReadingWorker.find(itr->second);
599  if (foundInBranchToReadingWorker == branchToReadingWorker.end()) {
600  continue;
601  }
602  if (not foundAtLeastOneMatchingBranch) {
603  ++upperLimitOnReadingWorker;
604  foundAtLeastOneMatchingBranch = true;
605  }
606  ++upperLimitOnIndicies;
607  ++reserveSizeForWorker[w];
608  if (nullptr == foundInBranchToReadingWorker->second) {
609  foundInBranchToReadingWorker->second = w;
610  } else {
611  branchToReadingWorker.insert(make_pair(itr->second, w));
612  }
613  }
614  }
615  }
616  }
617  }
618  }
619  {
620  auto it = branchToReadingWorker.begin();
621  std::vector<std::string> unusedBranches;
622  while (it != branchToReadingWorker.end()) {
623  if (it->second == nullptr) {
624  unusedBranches.push_back(it->first);
625  //erasing the object invalidates the iterator so must advance it first
626  auto temp = it;
627  ++it;
628  branchToReadingWorker.erase(temp);
629  } else {
630  ++it;
631  }
632  }
633  if (not unusedBranches.empty()) {
634  LogWarning l("UnusedProductsForCanDeleteEarly");
635  l << "The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
636  " If possible, remove the producer from the job.";
637  for (auto const& n : unusedBranches) {
638  l << "\n " << n;
639  }
640  }
641  }
642  if (!branchToReadingWorker.empty()) {
643  earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
644  earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies, 0);
645  earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
646  std::map<const Worker*, EarlyDeleteHelper*> alreadySeenWorkers;
647  std::string lastBranchName;
648  size_t nextOpenIndex = 0;
649  unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
650  for (auto& branchAndWorker : branchToReadingWorker) {
651  if (lastBranchName != branchAndWorker.first) {
652  //have to put back the period we removed earlier in order to get the proper name
653  BranchID bid(branchAndWorker.first + ".");
654  earlyDeleteBranchToCount_.emplace_back(bid, 0U);
655  lastBranchName = branchAndWorker.first;
656  }
657  auto found = alreadySeenWorkers.find(branchAndWorker.second);
658  if (alreadySeenWorkers.end() == found) {
659  //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
660  // all the branches that might be read by this worker. However, initially we will only tell the
661  // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
662  // EarlyDeleteHelper will automatically advance its internal end pointer.
663  size_t index = nextOpenIndex;
664  size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
667  earlyDeleteHelpers_.emplace_back(beginAddress + index, beginAddress + index + 1, &earlyDeleteBranchToCount_);
668  branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
669  alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second, &(earlyDeleteHelpers_.back())));
670  nextOpenIndex += nIndices;
671  } else {
672  found->second->appendIndex(earlyDeleteBranchToCount_.size() - 1);
673  }
674  }
675 
676  //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
677  // space needed for each module
678  auto itLast = earlyDeleteHelpers_.begin();
679  for (auto it = earlyDeleteHelpers_.begin() + 1; it != earlyDeleteHelpers_.end(); ++it) {
680  if (itLast->end() != it->begin()) {
681  //figure the offset for next Worker since it hasn't been moved yet so it has the original address
682  unsigned int delta = it->begin() - itLast->end();
683  it->shiftIndexPointers(delta);
684 
686  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
687  earlyDeleteHelperToBranchIndicies_.begin() + (it->begin() - beginAddress));
688  }
689  itLast = it;
690  }
692  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
694 
695  //now tell the paths about the deleters
696  for (auto& p : trig_paths_) {
697  p.setEarlyDeleteHelpers(alreadySeenWorkers);
698  }
699  for (auto& p : end_paths_) {
700  p.setEarlyDeleteHelpers(alreadySeenWorkers);
701  }
703  }
704  }
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
T w() const
std::vector< BranchToCount > earlyDeleteBranchToCount_
assert(be >=bs)
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
Log< level::Warning, false > LogWarning
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_

◆ makePathStatusInserters()

void edm::StreamSchedule::makePathStatusInserters ( std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters,
std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &  endPathStatusInserters,
ExceptionToActionTable const &  actions 
)
private

Definition at line 1350 of file StreamSchedule.cc.

References actions, actReg_, addToAllWorkers(), empty_end_paths_, empty_trig_paths_, end_paths_, endPathStatusInserterWorkers_, edm::get_underlying(), pathStatusInserterWorkers_, and trig_paths_.

Referenced by StreamSchedule().

1353  {
1354  int bitpos = 0;
1355  unsigned int indexEmpty = 0;
1356  unsigned int indexOfPath = 0;
1357  for (auto& pathStatusInserter : pathStatusInserters) {
1358  std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
1359  WorkerPtr workerPtr(
1360  new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1361  pathStatusInserterWorkers_.emplace_back(workerPtr);
1362  workerPtr->setActivityRegistry(actReg_);
1363  addToAllWorkers(workerPtr.get());
1364 
1365  // A little complexity here because a C++ Path object is not
1366  // instantiated and put into end_paths if there are no modules
1367  // on the configured path.
1368  if (indexEmpty < empty_trig_paths_.size() && bitpos == empty_trig_paths_.at(indexEmpty)) {
1369  ++indexEmpty;
1370  } else {
1371  trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(), workerPtr.get());
1372  ++indexOfPath;
1373  }
1374  ++bitpos;
1375  }
1376 
1377  bitpos = 0;
1378  indexEmpty = 0;
1379  indexOfPath = 0;
1380  for (auto& endPathStatusInserter : endPathStatusInserters) {
1381  std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
1382  WorkerPtr workerPtr(
1383  new edm::WorkerT<EndPathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1384  endPathStatusInserterWorkers_.emplace_back(workerPtr);
1385  workerPtr->setActivityRegistry(actReg_);
1386  addToAllWorkers(workerPtr.get());
1387 
1388  // A little complexity here because a C++ Path object is not
1389  // instantiated and put into end_paths if there are no modules
1390  // on the configured path.
1391  if (indexEmpty < empty_end_paths_.size() && bitpos == empty_end_paths_.at(indexEmpty)) {
1392  ++indexEmpty;
1393  } else {
1394  end_paths_.at(indexOfPath).setPathStatusInserter(nullptr, workerPtr.get());
1395  ++indexOfPath;
1396  }
1397  ++bitpos;
1398  }
1399  }
std::vector< int > empty_trig_paths_
roAction_t actions[nactions]
Definition: GenABIO.cc:181
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void addToAllWorkers(Worker *w)
std::shared_ptr< Worker > WorkerPtr
std::shared_ptr< ActivityRegistry > actReg_
std::vector< int > empty_end_paths_
constexpr T & get_underlying(propagate_const< T > &)
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_

◆ moduleDescriptionsInEndPath()

void edm::StreamSchedule::moduleDescriptionsInEndPath ( std::string const &  iEndPathLabel,
std::vector< ModuleDescription const *> &  descriptions,
unsigned int  hint 
) const

Definition at line 1242 of file StreamSchedule.cc.

References end_paths_, newFWLiteAna::found, mps_fire::i, and edm::Path::name().

1244  {
1245  descriptions.clear();
1246  bool found = false;
1247  TrigPaths::const_iterator itFound;
1248 
1249  if (hint < end_paths_.size()) {
1250  itFound = end_paths_.begin() + hint;
1251  if (itFound->name() == iEndPathLabel)
1252  found = true;
1253  }
1254  if (!found) {
1255  // if the hint did not work, do it the slow way
1256  itFound = std::find_if(
1257  end_paths_.begin(),
1258  end_paths_.end(),
1259  std::bind(std::equal_to<std::string>(), iEndPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1260  if (itFound != end_paths_.end())
1261  found = true;
1262  }
1263  if (found) {
1264  descriptions.reserve(itFound->size());
1265  for (size_t i = 0; i < itFound->size(); ++i) {
1266  descriptions.push_back(itFound->getWorker(i)->description());
1267  }
1268  }
1269  }
std::string const & name() const
Definition: Path.h:73

◆ moduleDescriptionsInPath()

void edm::StreamSchedule::moduleDescriptionsInPath ( std::string const &  iPathLabel,
std::vector< ModuleDescription const *> &  descriptions,
unsigned int  hint 
) const

Definition at line 1213 of file StreamSchedule.cc.

References newFWLiteAna::found, mps_fire::i, edm::Path::name(), and trig_paths_.

1215  {
1216  descriptions.clear();
1217  bool found = false;
1218  TrigPaths::const_iterator itFound;
1219 
1220  if (hint < trig_paths_.size()) {
1221  itFound = trig_paths_.begin() + hint;
1222  if (itFound->name() == iPathLabel)
1223  found = true;
1224  }
1225  if (!found) {
1226  // if the hint did not work, do it the slow way
1227  itFound = std::find_if(
1228  trig_paths_.begin(),
1229  trig_paths_.end(),
1230  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1231  if (itFound != trig_paths_.end())
1232  found = true;
1233  }
1234  if (found) {
1235  descriptions.reserve(itFound->size());
1236  for (size_t i = 0; i < itFound->size(); ++i) {
1237  descriptions.push_back(itFound->getWorker(i)->description());
1238  }
1239  }
1240  }
std::string const & name() const
Definition: Path.h:73

◆ modulesInPath()

void edm::StreamSchedule::modulesInPath ( std::string const &  iPathLabel,
std::vector< std::string > &  oLabelsToFill 
) const

adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel

Definition at line 1200 of file StreamSchedule.cc.

References mps_fire::i, edm::Path::name(), and trig_paths_.

1200  {
1201  TrigPaths::const_iterator itFound = std::find_if(
1202  trig_paths_.begin(),
1203  trig_paths_.end(),
1204  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1205  if (itFound != trig_paths_.end()) {
1206  oLabelsToFill.reserve(itFound->size());
1207  for (size_t i = 0; i < itFound->size(); ++i) {
1208  oLabelsToFill.push_back(itFound->getWorker(i)->description()->moduleLabel());
1209  }
1210  }
1211  }
std::string const & name() const
Definition: Path.h:73

◆ numberOfUnscheduledModules()

unsigned int edm::StreamSchedule::numberOfUnscheduledModules ( ) const
inline

Definition at line 259 of file StreamSchedule.h.

References number_of_unscheduled_modules_.

unsigned int number_of_unscheduled_modules_

◆ processOneEventAsync()

void edm::StreamSchedule::processOneEventAsync ( WaitingTaskHolder  iTask,
EventTransitionInfo info,
ServiceToken const &  token,
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters 
)

Definition at line 994 of file StreamSchedule.cc.

References actReg_, CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), empty_end_paths_, empty_trig_paths_, end_paths_, endPathStatusInserterWorkers_, SiStripBadComponentsDQMServiceTemplate_cfg::ep, finishedPaths(), finishProcessOneEvent(), edm::WaitingTaskHolder::group(), info(), ALPAKA_ACCELERATOR_NAMESPACE::vertexFinder::it, edm::ServiceWeakToken::lock(), edm::make_waiting_task(), eostools::move(), edm::hlt::Pass, pathStatusInserterWorkers_, edm::WorkerManager::processAccumulatorsAsync(), resetAll(), results_, edm::WorkerManager::setupOnDemandSystem(), edm::WorkerManager::setupResolvers(), streamContext_, streamID_, total_events_, trig_paths_, and workerManager_.

998  {
999  EventPrincipal& ep = info.principal();
1000 
1001  // Caught exception is propagated via WaitingTaskHolder
1002  CMS_SA_ALLOW try {
1003  this->resetAll();
1004 
1005  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
1006 
1007  Traits::setStreamContext(streamContext_, ep);
1008  //a service may want to communicate with another service
1009  ServiceRegistry::Operate guard(serviceToken);
1010  Traits::preScheduleSignal(actReg_.get(), &streamContext_);
1011 
1012  // Data dependencies need to be set up before marking empty
1013  // (End)Paths complete in case something consumes the status of
1014  // the empty (EndPath)
1017 
1018  HLTPathStatus hltPathStatus(hlt::Pass, 0);
1019  for (int empty_trig_path : empty_trig_paths_) {
1020  results_->at(empty_trig_path) = hltPathStatus;
1021  pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
1022  std::exception_ptr except = pathStatusInserterWorkers_[empty_trig_path]
1023  ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
1024  info, streamID_, ParentContext(&streamContext_), &streamContext_);
1025  if (except) {
1026  iTask.doneWaiting(except);
1027  return;
1028  }
1029  }
1030  if (not endPathStatusInserterWorkers_.empty()) {
1031  for (int empty_end_path : empty_end_paths_) {
1032  std::exception_ptr except =
1033  endPathStatusInserterWorkers_[empty_end_path]
1034  ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
1035  info, streamID_, ParentContext(&streamContext_), &streamContext_);
1036  if (except) {
1037  iTask.doneWaiting(except);
1038  return;
1039  }
1040  }
1041  }
1042 
1043  ++total_events_;
1044 
1045  //use to give priorities on an error to ones from Paths
1046  auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(nullptr);
1047  auto pathErrorPtr = pathErrorHolder.get();
1048  ServiceWeakToken weakToken = serviceToken;
1049  auto allPathsDone = make_waiting_task(
1050  [iTask, this, weakToken, pathError = std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable {
1051  ServiceRegistry::Operate operate(weakToken.lock());
1052 
1053  std::exception_ptr ptr;
1054  if (pathError->load()) {
1055  ptr = *pathError->load();
1056  delete pathError->load();
1057  }
1058  if ((not ptr) and iPtr) {
1059  ptr = *iPtr;
1060  }
1061  iTask.doneWaiting(finishProcessOneEvent(ptr));
1062  });
1063  //The holder guarantees that if the paths finish before the loop ends
1064  // that we do not start too soon. It also guarantees that the task will
1065  // run under that condition.
1066  WaitingTaskHolder allPathsHolder(*iTask.group(), allPathsDone);
1067 
1068  auto pathsDone = make_waiting_task([allPathsHolder, pathErrorPtr, transitionInfo = info, this, weakToken](
1069  std::exception_ptr const* iPtr) mutable {
1070  ServiceRegistry::Operate operate(weakToken.lock());
1071 
1072  if (iPtr) {
1073  //this is used to prioritize this error over one
1074  // that happens in EndPath or Accumulate
1075  pathErrorPtr->store(new std::exception_ptr(*iPtr));
1076  }
1077  finishedPaths(*pathErrorPtr, std::move(allPathsHolder), transitionInfo);
1078  });
1079 
1080  //The holder guarantees that if the paths finish before the loop ends
1081  // that we do not start too soon. It also guarantees that the task will
1082  // run under that condition.
1083  WaitingTaskHolder taskHolder(*iTask.group(), pathsDone);
1084 
1085  //start end paths first so on single threaded the paths will run first
1086  WaitingTaskHolder hAllPathsDone(*iTask.group(), allPathsDone);
1087  for (auto it = end_paths_.rbegin(), itEnd = end_paths_.rend(); it != itEnd; ++it) {
1088  it->processOneOccurrenceAsync(hAllPathsDone, info, serviceToken, streamID_, &streamContext_);
1089  }
1090 
1091  for (auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend(); it != itEnd; ++it) {
1092  it->processOneOccurrenceAsync(taskHolder, info, serviceToken, streamID_, &streamContext_);
1093  }
1094 
1095  ParentContext parentContext(&streamContext_);
1096  workerManager_.processAccumulatorsAsync<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
1097  hAllPathsDone, info, serviceToken, streamID_, parentContext, &streamContext_);
1098  } catch (...) {
1099  iTask.doneWaiting(std::current_exception());
1100  }
1101  }
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
std::vector< int > empty_trig_paths_
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void processAccumulatorsAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
void setupOnDemandSystem(EventTransitionInfo const &)
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
std::vector< int > empty_end_paths_
accept
Definition: HLTenums.h:18
StreamContext streamContext_
void finishedPaths(std::atomic< std::exception_ptr *> &, WaitingTaskHolder, EventTransitionInfo &)
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
edm::propagate_const< TrigResPtr > results_
void setupResolvers(Principal &principal)
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
def move(src, dest)
Definition: eostools.py:511

◆ processOneStreamAsync()

template<typename T >
void edm::StreamSchedule::processOneStreamAsync ( WaitingTaskHolder  iTask,
typename T::TransitionInfoType &  transitionInfo,
ServiceToken const &  token,
bool  cleaningUpAfterException = false 
)

Definition at line 396 of file StreamSchedule.h.

References actReg_, edm::addContextAndPrintException(), CMS_SA_ALLOW, cms::Exception::context(), edm::WaitingTaskHolder::doneWaiting(), end_paths_, edm::ExceptionFromThisContext, edm::WaitingTaskHolder::group(), h, l1ctLayer2EG_cff::id, info(), edm::ServiceWeakToken::lock(), edm::make_functor_task(), edm::make_waiting_task(), findAndChange::op, AlCaHLTBitMon_ParallelJobs::p, edm::WorkerManager::processOneOccurrenceAsync(), edm::WorkerManager::resetAll(), alignCSCRings::s, streamContext_, streamID_, TrackValidation_cff::task, unpackBuffers-CaloStage2::token, edm::transitionName(), trig_paths_, edm::StreamID::value(), workerManager_, and edm::convertException::wrap().

399  {
400  auto const& principal = transitionInfo.principal();
401  T::setStreamContext(streamContext_, principal);
402 
403  auto id = principal.id();
404  ServiceWeakToken weakToken = token;
405  auto doneTask = make_waiting_task(
406  [this, iHolder, id, cleaningUpAfterException, weakToken](std::exception_ptr const* iPtr) mutable {
407  std::exception_ptr excpt;
408  if (iPtr) {
409  excpt = *iPtr;
410  //add context information to the exception and print message
411  try {
412  convertException::wrap([&]() { std::rethrow_exception(excpt); });
413  } catch (cms::Exception& ex) {
414  //TODO: should add the transition type info
415  std::ostringstream ost;
416  if (ex.context().empty()) {
417  ost << "Processing " << T::transitionName() << " " << id;
418  }
419  ServiceRegistry::Operate op(weakToken.lock());
420  addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
421  excpt = std::current_exception();
422  }
423 
424  ServiceRegistry::Operate op(weakToken.lock());
425  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
426  }
427  // Caught exception is propagated via WaitingTaskHolder
428  CMS_SA_ALLOW try {
429  ServiceRegistry::Operate op(weakToken.lock());
430  T::postScheduleSignal(actReg_.get(), &streamContext_);
431  } catch (...) {
432  if (not excpt) {
433  excpt = std::current_exception();
434  }
435  }
436  iHolder.doneWaiting(excpt);
437  });
438 
439  auto task = make_functor_task(
440  [this, h = WaitingTaskHolder(*iHolder.group(), doneTask), info = transitionInfo, weakToken]() mutable {
441  auto token = weakToken.lock();
443  // Caught exception is propagated via WaitingTaskHolder
444  CMS_SA_ALLOW try {
445  T::preScheduleSignal(actReg_.get(), &streamContext_);
446 
448  } catch (...) {
449  h.doneWaiting(std::current_exception());
450  return;
451  }
452 
453  for (auto& p : end_paths_) {
454  p.runAllModulesAsync<T>(h, info, token, streamID_, &streamContext_);
455  }
456 
457  for (auto& p : trig_paths_) {
458  p.runAllModulesAsync<T>(h, info, token, streamID_, &streamContext_);
459  }
460 
462  });
463 
464  if (streamID_.value() == 0) {
465  //Enqueueing will start another thread if there is only
466  // one thread in the job. Having stream == 0 use spawn
467  // avoids starting up another thread when there is only one stream.
468  iHolder.group()->run([task]() {
469  TaskSentry s{task};
470  task->execute();
471  });
472  } else {
473  oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
474  arena.enqueue([task]() {
475  TaskSentry s{task};
476  task->execute();
477  });
478  }
479  }
std::string_view transitionName(GlobalContext::Transition)
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
void processOneOccurrenceAsync(WaitingTaskHolder, typename T::TransitionInfoType &, ServiceToken const &, StreamID, typename T::Context const *topContext, U const *context)
StreamContext streamContext_
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
FunctorTask< F > * make_functor_task(F f)
Definition: FunctorTask.h:44
unsigned int value() const
Definition: StreamID.h:43
auto wrap(F iFunc) -> decltype(iFunc())
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
std::list< std::string > const & context() const
Definition: Exception.cc:151
long double T

◆ replaceModule()

void edm::StreamSchedule::replaceModule ( maker::ModuleHolder iMod,
std::string const &  iLabel 
)

clone the type of module with label iLabel but configure with iPSet.

Definition at line 965 of file StreamSchedule.cc.

References allWorkers(), newFWLiteAna::found, edm::maker::ModuleHolder::replaceModuleFor(), streamContext_, and streamID_.

965  {
966  Worker* found = nullptr;
967  for (auto const& worker : allWorkers()) {
968  if (worker->description()->moduleLabel() == iLabel) {
969  found = worker;
970  break;
971  }
972  }
973  if (nullptr == found) {
974  return;
975  }
976 
977  iMod->replaceModuleFor(found);
978  found->beginStream(streamID_, streamContext_);
979  }
StreamContext streamContext_
AllWorkers const & allWorkers() const
returns the collection of pointers to workers

◆ reportSkipped()

void edm::StreamSchedule::reportSkipped ( EventPrincipal const &  ep) const
inlineprivate

Definition at line 390 of file StreamSchedule.h.

References SiStripBadComponentsDQMServiceTemplate_cfg::ep.

390  {
391  Service<JobReport> reportSvc;
392  reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
393  }

◆ resetAll()

void edm::StreamSchedule::resetAll ( )
private

Definition at line 1332 of file StreamSchedule.cc.

References results_.

Referenced by processOneEventAsync().

1332 { results_->reset(); }
edm::propagate_const< TrigResPtr > results_

◆ resetEarlyDelete()

void edm::StreamSchedule::resetEarlyDelete ( )
private

Definition at line 1336 of file StreamSchedule.cc.

References submitPVResolutionJobs::count, earlyDeleteBranchToCount_, earlyDeleteHelpers_, and earlyDeleteHelperToBranchIndicies_.

Referenced by finishProcessOneEvent(), and initializeEarlyDelete().

1336  {
1337  //must be sure we have cleared the count first
1338  for (auto& count : earlyDeleteBranchToCount_) {
1339  count.count = 0;
1340  }
1341  //now reset based on how many helpers use that branch
1343  ++(earlyDeleteBranchToCount_[index].count);
1344  }
1345  for (auto& helper : earlyDeleteHelpers_) {
1346  helper.reset();
1347  }
1348  }
Definition: helper.py:1
std::vector< BranchToCount > earlyDeleteBranchToCount_
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_

◆ results() [1/2]

TrigResConstPtr edm::StreamSchedule::results ( ) const
inlineprivate

Definition at line 345 of file StreamSchedule.h.

References edm::get_underlying_safe(), and results_.

Referenced by StreamSchedule().

345 { return get_underlying_safe(results_); }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< TrigResPtr > results_

◆ results() [2/2]

TrigResPtr& edm::StreamSchedule::results ( )
inlineprivate

Definition at line 346 of file StreamSchedule.h.

References edm::get_underlying_safe(), and results_.

346 { return get_underlying_safe(results_); }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< TrigResPtr > results_

◆ streamID()

StreamID edm::StreamSchedule::streamID ( ) const
inline

Definition at line 199 of file StreamSchedule.h.

References streamID_.

Referenced by StreamSchedule().

199 { return streamID_; }

◆ totalEvents()

int edm::StreamSchedule::totalEvents ( ) const
inline

Return the number of events this StreamSchedule has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 226 of file StreamSchedule.h.

References total_events_.

Referenced by getTriggerReport(), and totalEventsFailed().

226 { return total_events_; }

◆ totalEventsFailed()

int edm::StreamSchedule::totalEventsFailed ( ) const
inline

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 234 of file StreamSchedule.h.

References totalEvents(), and totalEventsPassed().

Referenced by getTriggerReport().

234 { return totalEvents() - totalEventsPassed(); }
int totalEventsPassed() const
int totalEvents() const

◆ totalEventsPassed()

int edm::StreamSchedule::totalEventsPassed ( ) const
inline

Return the number of events which have been passed by one or more trigger paths.

Definition at line 230 of file StreamSchedule.h.

References total_passed_.

Referenced by getTriggerReport(), and totalEventsFailed().

230 { return total_passed_; }

◆ tryToPlaceConditionalModules()

std::vector< Worker * > edm::StreamSchedule::tryToPlaceConditionalModules ( Worker worker,
std::unordered_set< std::string > &  conditionalModules,
std::unordered_multimap< std::string, edm::BranchDescription const *> const &  conditionalModuleBranches,
std::unordered_multimap< std::string, AliasInfo > const &  aliasMap,
ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration 
)
private

Definition at line 706 of file StreamSchedule.cc.

References cms::cuda::assert(), edm::TypeID::className(), edm::Worker::consumesInfo(), edm::Worker::description(), HerwigMaxPtPartonFilter_cfi::moduleLabel, edm::ModuleDescription::moduleLabel(), or, edm::PRODUCT_TYPE, AlCaHLTBitMon_QueryRunRegistry::string, edm::productholderindexhelper::typeIsViewCompatible(), and workerManager_.

Referenced by fillWorkers().

714  {
715  std::vector<Worker*> returnValue;
716  auto const& consumesInfo = worker->consumesInfo();
717  auto moduleLabel = worker->description()->moduleLabel();
718  using namespace productholderindexhelper;
719  for (auto const& ci : consumesInfo) {
720  if (not ci.skipCurrentProcess() and
721  (ci.process().empty() or ci.process() == processConfiguration->processName())) {
722  auto productModuleLabel = std::string(ci.label());
723  bool productFromConditionalModule = false;
724  auto itFound = conditionalModules.find(productModuleLabel);
725  if (itFound == conditionalModules.end()) {
726  //Check to see if this was an alias
727  //note that aliasMap was previously filtered so only the conditional modules remain there
728  auto foundAlias = findBestMatchingAlias(conditionalModuleBranches, aliasMap, productModuleLabel, ci);
729  if (foundAlias) {
730  productModuleLabel = *foundAlias;
731  productFromConditionalModule = true;
732  itFound = conditionalModules.find(productModuleLabel);
733  //check that the alias-for conditional module has not been used
734  if (itFound == conditionalModules.end()) {
735  continue;
736  }
737  }
738  } else {
739  //need to check the rest of the data product info
740  auto findBranches = conditionalModuleBranches.equal_range(productModuleLabel);
741  for (auto itBranch = findBranches.first; itBranch != findBranches.second; ++itBranch) {
742  if (itBranch->second->productInstanceName() == ci.instance()) {
743  if (ci.kindOfType() == PRODUCT_TYPE) {
744  if (ci.type() == itBranch->second->unwrappedTypeID()) {
745  productFromConditionalModule = true;
746  break;
747  }
748  } else {
749  //this is a view
751  ci.type(), TypeID(itBranch->second->wrappedType().typeInfo()), itBranch->second->className())) {
752  productFromConditionalModule = true;
753  break;
754  }
755  }
756  }
757  }
758  }
759  if (productFromConditionalModule) {
760  auto condWorker =
761  getWorker(productModuleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration);
762  assert(condWorker);
763 
764  conditionalModules.erase(itFound);
765 
766  auto dependents = tryToPlaceConditionalModules(condWorker,
767  conditionalModules,
768  conditionalModuleBranches,
769  aliasMap,
770  proc_pset,
771  preg,
772  prealloc,
773  processConfiguration);
774  returnValue.insert(returnValue.end(), dependents.begin(), dependents.end());
775  returnValue.push_back(condWorker);
776  }
777  }
778  }
779  return returnValue;
780  }
std::vector< Worker * > tryToPlaceConditionalModules(Worker *, std::unordered_set< std::string > &conditionalModules, std::unordered_multimap< std::string, edm::BranchDescription const *> const &conditionalModuleBranches, std::unordered_multimap< std::string, AliasInfo > const &aliasMap, ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
assert(be >=bs)
WorkerManager workerManager_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
bool typeIsViewCompatible(TypeID const &requestedViewType, TypeID const &wrappedtypeID, std::string const &className)

◆ unscheduledWorkers()

AllWorkers const& edm::StreamSchedule::unscheduledWorkers ( ) const
inline

Definition at line 258 of file StreamSchedule.h.

References edm::WorkerManager::unscheduledWorkers(), and workerManager_.

AllWorkers const & unscheduledWorkers() const
Definition: WorkerManager.h:89
WorkerManager workerManager_

Member Data Documentation

◆ actReg_

std::shared_ptr<ActivityRegistry> edm::StreamSchedule::actReg_
private

◆ earlyDeleteBranchToCount_

std::vector<BranchToCount> edm::StreamSchedule::earlyDeleteBranchToCount_
private

Definition at line 370 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

◆ earlyDeleteHelpers_

std::vector<EarlyDeleteHelper> edm::StreamSchedule::earlyDeleteHelpers_
private

Definition at line 380 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

◆ earlyDeleteHelperToBranchIndicies_

std::vector<unsigned int> edm::StreamSchedule::earlyDeleteHelperToBranchIndicies_
private

Definition at line 377 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

◆ empty_end_paths_

std::vector<int> edm::StreamSchedule::empty_end_paths_
private

Definition at line 365 of file StreamSchedule.h.

Referenced by fillEndPath(), makePathStatusInserters(), and processOneEventAsync().

◆ empty_trig_paths_

std::vector<int> edm::StreamSchedule::empty_trig_paths_
private

Definition at line 364 of file StreamSchedule.h.

Referenced by fillTrigPath(), makePathStatusInserters(), and processOneEventAsync().

◆ end_paths_

TrigPaths edm::StreamSchedule::end_paths_
private

◆ endPathStatusInserterWorkers_

std::vector<edm::propagate_const<WorkerPtr> > edm::StreamSchedule::endPathStatusInserterWorkers_
private

Definition at line 360 of file StreamSchedule.h.

Referenced by makePathStatusInserters(), and processOneEventAsync().

◆ number_of_unscheduled_modules_

unsigned int edm::StreamSchedule::number_of_unscheduled_modules_
private

Definition at line 384 of file StreamSchedule.h.

Referenced by numberOfUnscheduledModules(), and StreamSchedule().

◆ pathStatusInserterWorkers_

std::vector<edm::propagate_const<WorkerPtr> > edm::StreamSchedule::pathStatusInserterWorkers_
private

Definition at line 359 of file StreamSchedule.h.

Referenced by makePathStatusInserters(), and processOneEventAsync().

◆ results_

edm::propagate_const<TrigResPtr> edm::StreamSchedule::results_
private

Definition at line 356 of file StreamSchedule.h.

Referenced by finishedPaths(), processOneEventAsync(), resetAll(), and results().

◆ results_inserter_

edm::propagate_const<WorkerPtr> edm::StreamSchedule::results_inserter_
private

Definition at line 358 of file StreamSchedule.h.

Referenced by finishedPaths(), and StreamSchedule().

◆ streamContext_

StreamContext edm::StreamSchedule::streamContext_
private

◆ streamID_

StreamID edm::StreamSchedule::streamID_
private

◆ total_events_

int edm::StreamSchedule::total_events_
private

Definition at line 382 of file StreamSchedule.h.

Referenced by clearCounters(), processOneEventAsync(), and totalEvents().

◆ total_passed_

int edm::StreamSchedule::total_passed_
private

Definition at line 383 of file StreamSchedule.h.

Referenced by clearCounters(), finishedPaths(), and totalEventsPassed().

◆ trig_paths_

TrigPaths edm::StreamSchedule::trig_paths_
private

◆ workerManager_

WorkerManager edm::StreamSchedule::workerManager_
private