CMS 3D CMS Logo

List of all members | Classes | Public Types | Public Member Functions | Private Member Functions | Private Attributes
edm::StreamSchedule Class Reference

#include <StreamSchedule.h>

Classes

struct  AliasInfo
 
class  SendTerminationSignalIfException
 

Public Types

typedef std::vector< Worker * > AllWorkers
 
typedef std::vector< PathNonTrigPaths
 
typedef std::vector< WorkerInPathPathWorkers
 
typedef std::vector< PathTrigPaths
 
typedef std::shared_ptr< HLTGlobalStatus const > TrigResConstPtr
 
typedef std::shared_ptr< HLTGlobalStatusTrigResPtr
 
typedef std::vector< std::string > vstring
 
typedef std::shared_ptr< WorkerWorkerPtr
 
typedef std::vector< Worker * > Workers
 

Public Member Functions

AllWorkers const & allWorkers () const
 returns the collection of pointers to workers More...
 
void availablePaths (std::vector< std::string > &oLabelsToFill) const
 adds to oLabelsToFill the labels for all paths in the process More...
 
void beginStream ()
 
void clearCounters ()
 Clear all the counters in the trigger report. More...
 
StreamContext const & context () const
 
void deleteModule (std::string const &iLabel)
 Delete the module with label iLabel. More...
 
void endStream ()
 
std::vector< ModuleDescription const * > getAllModuleDescriptions () const
 
void getTriggerReport (TriggerReport &rep) const
 
void initializeEarlyDelete (ModuleRegistry &modReg, std::vector< std::string > const &branchesToDeleteEarly, std::multimap< std::string, std::string > const &referencesToBranches, std::vector< std::string > const &modulesToSkip, edm::ProductRegistry const &preg)
 
void moduleDescriptionsInEndPath (std::string const &iEndPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
 
void moduleDescriptionsInPath (std::string const &iPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
 
void modulesInPath (std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
 adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel More...
 
unsigned int numberOfUnscheduledModules () const
 
void processOneEventAsync (WaitingTaskHolder iTask, EventTransitionInfo &, ServiceToken const &token, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
 
template<typename T >
void processOneStreamAsync (WaitingTaskHolder iTask, typename T::TransitionInfoType &transitionInfo, ServiceToken const &token, bool cleaningUpAfterException=false)
 
void replaceModule (maker::ModuleHolder *iMod, std::string const &iLabel)
 clone the type of module with label iLabel but configure with iPSet. More...
 
StreamID streamID () const
 
 StreamSchedule (std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, StreamID streamID, ProcessContext const *processContext)
 
 StreamSchedule (StreamSchedule const &)=delete
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 
AllWorkers const & unscheduledWorkers () const
 

Private Member Functions

ExceptionToActionTable const & actionTable () const
 returns the action table More...
 
void addToAllWorkers (Worker *w)
 
void fillEndPath (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
 
void fillTrigPath (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
 
void fillWorkers (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
 
void finishedPaths (std::atomic< std::exception_ptr *> &, WaitingTaskHolder, EventTransitionInfo &)
 
std::exception_ptr finishProcessOneEvent (std::exception_ptr)
 
void makePathStatusInserters (std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
 
void reportSkipped (EventPrincipal const &ep) const
 
void resetAll ()
 
void resetEarlyDelete ()
 
TrigResConstPtr results () const
 
TrigResPtrresults ()
 
std::vector< Worker * > tryToPlaceConditionalModules (Worker *, std::unordered_set< std::string > &conditionalModules, std::unordered_multimap< std::string, edm::BranchDescription const *> const &conditionalModuleBranches, std::unordered_multimap< std::string, AliasInfo > const &aliasMap, ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
 

Private Attributes

std::shared_ptr< ActivityRegistryactReg_
 
std::vector< BranchToCountearlyDeleteBranchToCount_
 
std::vector< EarlyDeleteHelperearlyDeleteHelpers_
 
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
 
std::vector< int > empty_end_paths_
 
std::vector< int > empty_trig_paths_
 
TrigPaths end_paths_
 
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
 
unsigned int number_of_unscheduled_modules_
 
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
 
edm::propagate_const< TrigResPtrresults_
 
edm::propagate_const< WorkerPtrresults_inserter_
 
StreamContext streamContext_
 
StreamID streamID_
 
int total_events_
 
int total_passed_
 
TrigPaths trig_paths_
 
WorkerManager workerManager_
 

Detailed Description

Definition at line 154 of file StreamSchedule.h.

Member Typedef Documentation

◆ AllWorkers

typedef std::vector<Worker*> edm::StreamSchedule::AllWorkers

Definition at line 162 of file StreamSchedule.h.

◆ NonTrigPaths

Definition at line 158 of file StreamSchedule.h.

◆ PathWorkers

Definition at line 166 of file StreamSchedule.h.

◆ TrigPaths

typedef std::vector<Path> edm::StreamSchedule::TrigPaths

Definition at line 157 of file StreamSchedule.h.

◆ TrigResConstPtr

typedef std::shared_ptr<HLTGlobalStatus const> edm::StreamSchedule::TrigResConstPtr

Definition at line 160 of file StreamSchedule.h.

◆ TrigResPtr

Definition at line 159 of file StreamSchedule.h.

◆ vstring

typedef std::vector<std::string> edm::StreamSchedule::vstring

Definition at line 156 of file StreamSchedule.h.

◆ WorkerPtr

typedef std::shared_ptr<Worker> edm::StreamSchedule::WorkerPtr

Definition at line 161 of file StreamSchedule.h.

◆ Workers

typedef std::vector<Worker*> edm::StreamSchedule::Workers

Definition at line 164 of file StreamSchedule.h.

Constructor & Destructor Documentation

◆ StreamSchedule() [1/2]

edm::StreamSchedule::StreamSchedule ( std::shared_ptr< TriggerResultInserter inserter,
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters,
std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &  endPathStatusInserters,
std::shared_ptr< ModuleRegistry modReg,
ParameterSet proc_pset,
service::TriggerNamesService const &  tns,
PreallocationConfiguration const &  prealloc,
ProductRegistry pregistry,
ExceptionToActionTable const &  actions,
std::shared_ptr< ActivityRegistry areg,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
StreamID  streamID,
ProcessContext const *  processContext 
)

Definition at line 349 of file StreamSchedule.cc.

References actions, actReg_, addToAllWorkers(), edm::WorkerManager::addToUnscheduledWorkers(), allWorkers(), cms::cuda::assert(), end_paths_, fillEndPath(), fillTrigPath(), edm::propagate_const< T >::get(), edm::service::TriggerNamesService::getEndPaths(), edm::ParameterSet::getParameter(), edm::ParameterSet::getPSetForUpdate(), edm::service::TriggerNamesService::getTrigPaths(), label, dqm-mbProfile::log, makePathStatusInserters(), number_of_unscheduled_modules_, results(), results_inserter_, DBoxMetadataHelper::set_difference(), streamID(), trig_paths_, edm::StreamID::value(), and workerManager_.

363  : workerManager_(modReg, areg, actions),
364  actReg_(areg),
365  results_(new HLTGlobalStatus(tns.getTrigPaths().size())),
367  trig_paths_(),
368  end_paths_(),
369  total_events_(),
370  total_passed_(),
373  streamContext_(streamID_, processContext) {
374  bool hasPath = false;
375  std::vector<std::string> const& pathNames = tns.getTrigPaths();
376  std::vector<std::string> const& endPathNames = tns.getEndPaths();
377 
378  ConditionalTaskHelper conditionalTaskHelper(
379  proc_pset, preg, &prealloc, processConfiguration, workerManager_, pathNames);
380  std::unordered_set<std::string> conditionalModules;
381 
382  int trig_bitpos = 0;
383  trig_paths_.reserve(pathNames.size());
384  for (auto const& trig_name : pathNames) {
385  fillTrigPath(proc_pset,
386  preg,
387  &prealloc,
388  processConfiguration,
389  trig_bitpos,
390  trig_name,
391  results(),
392  endPathNames,
393  conditionalTaskHelper,
394  conditionalModules);
395  ++trig_bitpos;
396  hasPath = true;
397  }
398 
399  if (hasPath) {
400  // the results inserter stands alone
401  inserter->setTrigResultForStream(streamID.value(), results());
402 
403  results_inserter_ = makeInserter(actions, actReg_, inserter);
405  }
406 
407  // fill normal endpaths
408  int bitpos = 0;
409  end_paths_.reserve(endPathNames.size());
410  for (auto const& end_path_name : endPathNames) {
411  fillEndPath(proc_pset,
412  preg,
413  &prealloc,
414  processConfiguration,
415  bitpos,
416  end_path_name,
417  endPathNames,
418  conditionalTaskHelper,
419  conditionalModules);
420  ++bitpos;
421  }
422 
423  makePathStatusInserters(pathStatusInserters, endPathStatusInserters, actions);
424 
425  //See if all modules were used
426  std::set<std::string> usedWorkerLabels;
427  for (auto const& worker : allWorkers()) {
428  usedWorkerLabels.insert(worker->description()->moduleLabel());
429  }
430  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
431  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
432  std::vector<std::string> unusedLabels;
433  set_difference(modulesInConfigSet.begin(),
434  modulesInConfigSet.end(),
435  usedWorkerLabels.begin(),
436  usedWorkerLabels.end(),
437  back_inserter(unusedLabels));
438  std::set<std::string> unscheduledLabels;
439  std::vector<std::string> shouldBeUsedLabels;
440  if (!unusedLabels.empty()) {
441  //Need to
442  // 1) create worker
443  // 2) if it is a WorkerT<EDProducer>, add it to our list
444  // 3) hand list to our delayed reader
445  for (auto const& label : unusedLabels) {
446  bool isTracked;
447  ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
448  assert(isTracked);
449  assert(modulePSet != nullptr);
451  *modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
452  }
453  if (!shouldBeUsedLabels.empty()) {
454  std::ostringstream unusedStream;
455  unusedStream << "'" << shouldBeUsedLabels.front() << "'";
456  for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
457  itLabelEnd = shouldBeUsedLabels.end();
458  itLabel != itLabelEnd;
459  ++itLabel) {
460  unusedStream << ",'" << *itLabel << "'";
461  }
462  LogInfo("path") << "The following module labels are not assigned to any path:\n" << unusedStream.str() << "\n";
463  }
464  }
465  number_of_unscheduled_modules_ = unscheduledLabels.size();
466 
467  // Print conditional modules that were not consumed in any of their associated Paths
468  if (streamID.value() == 0 and not conditionalModules.empty()) {
469  // Intersection of unscheduled and ConditionalTask modules gives
470  // directly the set of conditional modules that were not
471  // consumed by anything in the Paths associated to the
472  // corresponding ConditionalTask.
473  std::vector<std::string_view> labelsToPrint;
474  std::copy_if(
475  unscheduledLabels.begin(),
476  unscheduledLabels.end(),
477  std::back_inserter(labelsToPrint),
478  [&conditionalModules](auto const& lab) { return conditionalModules.find(lab) != conditionalModules.end(); });
479 
480  if (not labelsToPrint.empty()) {
481  edm::LogWarning log("NonConsumedConditionalModules");
482  log << "The following modules were part of some ConditionalTask, but were not\n"
483  << "consumed by any other module in any of the Paths to which the ConditionalTask\n"
484  << "was associated. Perhaps they should be either removed from the\n"
485  << "job, or moved to a Task to make it explicit they are unscheduled.\n";
486  for (auto const& modLabel : labelsToPrint) {
487  log.format("\n {}", modLabel);
488  }
489  }
490  }
491  } // StreamSchedule::StreamSchedule
roAction_t actions[nactions]
Definition: GenABIO.cc:181
void addToAllWorkers(Worker *w)
edm::propagate_const< WorkerPtr > results_inserter_
assert(be >=bs)
StreamID streamID() const
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
constexpr element_type const * get() const
char const * label
TrigResConstPtr results() const
StreamContext streamContext_
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
Log< level::Info, false > LogInfo
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
edm::propagate_const< TrigResPtr > results_
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
unsigned int value() const
Definition: StreamID.h:43
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
std::vector< std::string > set_difference(std::vector< std::string > const &v1, std::vector< std::string > const &v2)

◆ StreamSchedule() [2/2]

edm::StreamSchedule::StreamSchedule ( StreamSchedule const &  )
delete

Member Function Documentation

◆ actionTable()

ExceptionToActionTable const& edm::StreamSchedule::actionTable ( ) const
inlineprivate

returns the action table

Definition at line 292 of file StreamSchedule.h.

References edm::WorkerManager::actionTable(), and workerManager_.

Referenced by fillEndPath(), fillTrigPath(), and finishedPaths().

292 { return workerManager_.actionTable(); }
WorkerManager workerManager_
ExceptionToActionTable const & actionTable() const
Definition: WorkerManager.h:93

◆ addToAllWorkers()

void edm::StreamSchedule::addToAllWorkers ( Worker w)
private

◆ allWorkers()

AllWorkers const& edm::StreamSchedule::allWorkers ( ) const
inline

returns the collection of pointers to workers

Definition at line 256 of file StreamSchedule.h.

References edm::WorkerManager::allWorkers(), and workerManager_.

Referenced by clearCounters(), getAllModuleDescriptions(), getTriggerReport(), initializeEarlyDelete(), replaceModule(), and StreamSchedule().

256 { return workerManager_.allWorkers(); }
WorkerManager workerManager_
AllWorkers const & allWorkers() const
Definition: WorkerManager.h:88

◆ availablePaths()

void edm::StreamSchedule::availablePaths ( std::vector< std::string > &  oLabelsToFill) const

adds to oLabelsToFill the labels for all paths in the process

Definition at line 1189 of file StreamSchedule.cc.

References edm::Path::name(), HcalDetIdTransform::transform(), and trig_paths_.

1189  {
1190  oLabelsToFill.reserve(trig_paths_.size());
1191  std::transform(trig_paths_.begin(),
1192  trig_paths_.end(),
1193  std::back_inserter(oLabelsToFill),
1194  std::bind(&Path::name, std::placeholders::_1));
1195  }
std::string const & name() const
Definition: Path.h:73
unsigned transform(const HcalDetId &id, unsigned transformCode)

◆ beginStream()

void edm::StreamSchedule::beginStream ( )

Definition at line 961 of file StreamSchedule.cc.

References edm::WorkerManager::beginStream(), streamContext_, streamID_, and workerManager_.

WorkerManager workerManager_
StreamContext streamContext_
void beginStream(StreamID iID, StreamContext &streamContext)

◆ clearCounters()

void edm::StreamSchedule::clearCounters ( )

Clear all the counters in the trigger report.

Definition at line 1321 of file StreamSchedule.cc.

References allWorkers(), edm::Path::clearCounters(), edm::Worker::clearCounters(), end_paths_, edm::for_all(), total_events_, total_passed_, and trig_paths_.

1321  {
1322  using std::placeholders::_1;
1324  for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
1325  for_all(end_paths_, std::bind(&Path::clearCounters, _1));
1326  for_all(allWorkers(), std::bind(&Worker::clearCounters, _1));
1327  }
void clearCounters()
Definition: Worker.h:232
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
void clearCounters()
Definition: Path.cc:183

◆ context()

StreamContext const& edm::StreamSchedule::context ( ) const
inline

Definition at line 261 of file StreamSchedule.h.

References streamContext_.

261 { return streamContext_; }
StreamContext streamContext_

◆ deleteModule()

void edm::StreamSchedule::deleteModule ( std::string const &  iLabel)

Delete the module with label iLabel.

Definition at line 981 of file StreamSchedule.cc.

References edm::WorkerManager::deleteModuleIfExists(), and workerManager_.

void deleteModuleIfExists(std::string const &moduleLabel)
WorkerManager workerManager_

◆ endStream()

void edm::StreamSchedule::endStream ( )

Definition at line 963 of file StreamSchedule.cc.

References edm::WorkerManager::endStream(), streamContext_, streamID_, and workerManager_.

void endStream(StreamID iID, StreamContext &streamContext)
WorkerManager workerManager_
StreamContext streamContext_

◆ fillEndPath()

void edm::StreamSchedule::fillEndPath ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
int  bitpos,
std::string const &  name,
std::vector< std::string > const &  endPathNames,
ConditionalTaskHelper const &  conditionalTaskHelper,
std::unordered_set< std::string > &  allConditionalModules 
)
private

Definition at line 923 of file StreamSchedule.cc.

References actionTable(), actReg_, addToAllWorkers(), empty_end_paths_, end_paths_, fillWorkers(), edm::PathContext::kEndPath, Skims_PA_cff::name, and streamContext_.

Referenced by StreamSchedule().

931  {
932  PathWorkers tmpworkers;
933  fillWorkers(proc_pset,
934  preg,
935  prealloc,
936  processConfiguration,
937  name,
938  true,
939  tmpworkers,
940  endPathNames,
941  conditionalTaskHelper,
942  allConditionalModules);
943 
944  if (!tmpworkers.empty()) {
945  end_paths_.emplace_back(bitpos,
946  name,
947  tmpworkers,
948  TrigResPtr(),
949  actionTable(),
950  actReg_,
953  } else {
954  empty_end_paths_.push_back(bitpos);
955  }
956  for (WorkerInPath const& workerInPath : tmpworkers) {
957  addToAllWorkers(workerInPath.getWorker());
958  }
959  }
ExceptionToActionTable const & actionTable() const
returns the action table
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
std::shared_ptr< HLTGlobalStatus > TrigResPtr
void addToAllWorkers(Worker *w)
std::shared_ptr< ActivityRegistry > actReg_
std::vector< WorkerInPath > PathWorkers
std::vector< int > empty_end_paths_
StreamContext streamContext_

◆ fillTrigPath()

void edm::StreamSchedule::fillTrigPath ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
int  bitpos,
std::string const &  name,
TrigResPtr  trptr,
std::vector< std::string > const &  endPathNames,
ConditionalTaskHelper const &  conditionalTaskHelper,
std::unordered_set< std::string > &  allConditionalModules 
)
private

Definition at line 889 of file StreamSchedule.cc.

References actionTable(), actReg_, addToAllWorkers(), empty_trig_paths_, fillWorkers(), edm::PathContext::kPath, Skims_PA_cff::name, streamContext_, and trig_paths_.

Referenced by StreamSchedule().

898  {
899  PathWorkers tmpworkers;
900  fillWorkers(proc_pset,
901  preg,
902  prealloc,
903  processConfiguration,
904  name,
905  false,
906  tmpworkers,
907  endPathNames,
908  conditionalTaskHelper,
909  allConditionalModules);
910 
911  // an empty path will cause an extra bit that is not used
912  if (!tmpworkers.empty()) {
913  trig_paths_.emplace_back(
914  bitpos, name, tmpworkers, trptr, actionTable(), actReg_, &streamContext_, PathContext::PathType::kPath);
915  } else {
916  empty_trig_paths_.push_back(bitpos);
917  }
918  for (WorkerInPath const& workerInPath : tmpworkers) {
919  addToAllWorkers(workerInPath.getWorker());
920  }
921  }
ExceptionToActionTable const & actionTable() const
returns the action table
std::vector< int > empty_trig_paths_
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
void addToAllWorkers(Worker *w)
std::shared_ptr< ActivityRegistry > actReg_
std::vector< WorkerInPath > PathWorkers
StreamContext streamContext_

◆ fillWorkers()

void edm::StreamSchedule::fillWorkers ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
std::string const &  name,
bool  ignoreFilters,
PathWorkers out,
std::vector< std::string > const &  endPathNames,
ConditionalTaskHelper const &  conditionalTaskHelper,
std::unordered_set< std::string > &  allConditionalModules 
)
private

Definition at line 782 of file StreamSchedule.cc.

References edm::ConditionalTaskHelper::aliasMap(), edm::ConditionalTaskHelper::conditionalModuleBranches(), edm::errors::Configuration, edm::Worker::description(), Exception, edm::ParameterSet::getParameter(), edm::ParameterSet::getUntrackedParameter(), edm::WorkerInPath::Ignore, edm::Worker::kFilter, HerwigMaxPtPartonFilter_cfi::moduleLabel, edm::ModuleDescription::moduleName(), edm::Worker::moduleType(), Skims_PA_cff::name, edm::WorkerInPath::Normal, or, MillePedeFileConverter_cfg::out, hltMonBTagIPClient_cfi::pathName, edm::search_all(), AlCaHLTBitMon_QueryRunRegistry::string, tryToPlaceConditionalModules(), edm::WorkerInPath::Veto, and workerManager_.

Referenced by fillEndPath(), and fillTrigPath().

791  {
792  vstring modnames = proc_pset.getParameter<vstring>(pathName);
793  PathWorkers tmpworkers;
794 
795  //Pull out ConditionalTask modules
796  auto condRange = findConditionalTaskModulesRange(modnames);
797 
798  std::unordered_set<std::string> conditionalmods;
799  //An EDAlias may be redirecting to a module on a ConditionalTask
800  std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModsBranches;
801  std::unordered_map<std::string, unsigned int> conditionalModOrder;
802  if (condRange.first != condRange.second) {
803  for (auto it = condRange.first; it != condRange.second; ++it) {
804  // ordering needs to skip the # token in the path list
805  conditionalModOrder.emplace(*it, it - modnames.begin() - 1);
806  }
807  //the last entry should be ignored since it is required to be "@"
808  conditionalmods = std::unordered_set<std::string>(std::make_move_iterator(condRange.first),
809  std::make_move_iterator(condRange.second));
810 
811  conditionalModsBranches = conditionalTaskHelper.conditionalModuleBranches(conditionalmods);
812  modnames.erase(std::prev(condRange.first), modnames.end());
813 
814  // Make a union of all conditional modules from all Paths
815  allConditionalModules.insert(conditionalmods.begin(), conditionalmods.end());
816  }
817 
818  unsigned int placeInPath = 0;
819  for (auto const& name : modnames) {
820  //Modules except EDFilters are set to run concurrently by default
821  bool doNotRunConcurrently = false;
823  if (name[0] == '!') {
824  filterAction = WorkerInPath::Veto;
825  } else if (name[0] == '-' or name[0] == '+') {
826  filterAction = WorkerInPath::Ignore;
827  }
828  if (name[0] == '|' or name[0] == '+') {
829  //cms.wait was specified so do not run concurrently
830  doNotRunConcurrently = true;
831  }
832 
834  if (filterAction != WorkerInPath::Normal or name[0] == '|') {
835  moduleLabel.erase(0, 1);
836  }
837 
838  Worker* worker = getWorker(moduleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration);
839  if (worker == nullptr) {
840  std::string pathType("endpath");
841  if (!search_all(endPathNames, pathName)) {
842  pathType = std::string("path");
843  }
845  << "The unknown module label \"" << moduleLabel << "\" appears in " << pathType << " \"" << pathName
846  << "\"\n please check spelling or remove that label from the path.";
847  }
848 
849  if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType() == Worker::kFilter) {
850  // We have a filter on an end path, and the filter is not explicitly ignored.
851  // See if the filter is allowed.
852  std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
853  if (!search_all(allowed_filters, worker->description()->moduleName())) {
854  // Filter is not allowed. Ignore the result, and issue a warning.
855  filterAction = WorkerInPath::Ignore;
856  LogWarning("FilterOnEndPath") << "The EDFilter '" << worker->description()->moduleName()
857  << "' with module label '" << moduleLabel << "' appears on EndPath '"
858  << pathName << "'.\n"
859  << "The return value of the filter will be ignored.\n"
860  << "To suppress this warning, either remove the filter from the endpath,\n"
861  << "or explicitly ignore it in the configuration by using cms.ignore().\n";
862  }
863  }
864  bool runConcurrently = not doNotRunConcurrently;
865  if (runConcurrently && worker->moduleType() == Worker::kFilter and filterAction != WorkerInPath::Ignore) {
866  runConcurrently = false;
867  }
868 
869  auto condModules = tryToPlaceConditionalModules(worker,
870  conditionalmods,
871  conditionalModsBranches,
872  conditionalTaskHelper.aliasMap(),
873  proc_pset,
874  preg,
875  prealloc,
876  processConfiguration);
877  for (auto condMod : condModules) {
878  tmpworkers.emplace_back(
879  condMod, WorkerInPath::Ignore, conditionalModOrder[condMod->description()->moduleLabel()], true);
880  }
881 
882  tmpworkers.emplace_back(worker, filterAction, placeInPath, runConcurrently);
883  ++placeInPath;
884  }
885 
886  out.swap(tmpworkers);
887  }
vector< string > vstring
Definition: ExoticaDQM.cc:7
std::vector< Worker * > tryToPlaceConditionalModules(Worker *, std::unordered_set< std::string > &conditionalModules, std::unordered_multimap< std::string, edm::BranchDescription const *> const &conditionalModuleBranches, std::unordered_multimap< std::string, AliasInfo > const &aliasMap, ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
WorkerManager workerManager_
std::vector< WorkerInPath > PathWorkers
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:36
Log< level::Warning, false > LogWarning

◆ finishedPaths()

void edm::StreamSchedule::finishedPaths ( std::atomic< std::exception_ptr *> &  iExcept,
WaitingTaskHolder  iWait,
EventTransitionInfo info 
)
private

Definition at line 1100 of file StreamSchedule.cc.

References writedatasetfile::action, actionTable(), cms::Exception::addContext(), cms::cuda::assert(), CMS_SA_ALLOW, cms::Exception::context(), edm::WaitingTaskHolder::doneWaiting(), MillePedeFileConverter_cfg::e, edm::ExceptionToActionTable::find(), edm::propagate_const< T >::get(), edm::exception_actions::IgnoreCompletely, info(), edm::printCmsExceptionWarning(), results_, results_inserter_, streamContext_, streamID_, total_passed_, and edm::exception_actions::TryToContinue.

Referenced by processOneEventAsync().

1102  {
1103  if (iExcept) {
1104  // Caught exception is propagated via WaitingTaskHolder
1105  CMS_SA_ALLOW try { std::rethrow_exception(*(iExcept.load())); } catch (cms::Exception& e) {
1109  edm::printCmsExceptionWarning("TryToContinue", e);
1110  *(iExcept.load()) = std::exception_ptr();
1111  } else {
1112  *(iExcept.load()) = std::current_exception();
1113  }
1114  } catch (...) {
1115  *(iExcept.load()) = std::current_exception();
1116  }
1117  }
1118 
1119  if ((not iExcept) and results_->accept()) {
1120  ++total_passed_;
1121  }
1122 
1123  if (nullptr != results_inserter_.get()) {
1124  // Caught exception is propagated to the caller
1125  CMS_SA_ALLOW try {
1126  //Even if there was an exception, we need to allow results inserter
1127  // to run since some module may be waiting on its results.
1128  ParentContext parentContext(&streamContext_);
1129  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
1130 
1131  auto expt = results_inserter_->runModuleDirectly<Traits>(info, streamID_, parentContext, &streamContext_);
1132  if (expt) {
1133  std::rethrow_exception(expt);
1134  }
1135  } catch (cms::Exception& ex) {
1136  if (not iExcept) {
1137  if (ex.context().empty()) {
1138  std::ostringstream ost;
1139  ost << "Processing Event " << info.principal().id();
1140  ex.addContext(ost.str());
1141  }
1142  iExcept.store(new std::exception_ptr(std::current_exception()));
1143  }
1144  } catch (...) {
1145  if (not iExcept) {
1146  iExcept.store(new std::exception_ptr(std::current_exception()));
1147  }
1148  }
1149  }
1150  std::exception_ptr ptr;
1151  if (iExcept) {
1152  ptr = *iExcept.load();
1153  }
1154  iWait.doneWaiting(ptr);
1155  }
ExceptionToActionTable const & actionTable() const
returns the action table
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
edm::propagate_const< WorkerPtr > results_inserter_
assert(be >=bs)
constexpr element_type const * get() const
StreamContext streamContext_
exception_actions::ActionCodes find(const std::string &category) const
edm::propagate_const< TrigResPtr > results_
void addContext(std::string const &context)
Definition: Exception.cc:169
std::list< std::string > const & context() const
Definition: Exception.cc:151
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)

◆ finishProcessOneEvent()

std::exception_ptr edm::StreamSchedule::finishProcessOneEvent ( std::exception_ptr  iExcept)
private

Definition at line 1157 of file StreamSchedule.cc.

References actReg_, edm::addContextAndPrintException(), CMS_SA_ALLOW, cms::Exception::context(), edm::ExceptionFromThisContext, resetEarlyDelete(), streamContext_, and edm::convertException::wrap().

Referenced by processOneEventAsync().

1157  {
1158  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
1159 
1160  if (iExcept) {
1161  //add context information to the exception and print message
1162  try {
1163  convertException::wrap([&]() { std::rethrow_exception(iExcept); });
1164  } catch (cms::Exception& ex) {
1165  bool const cleaningUpAfterException = false;
1166  if (ex.context().empty()) {
1167  addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
1168  } else {
1169  addContextAndPrintException("", ex, cleaningUpAfterException);
1170  }
1171  iExcept = std::current_exception();
1172  }
1173 
1174  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
1175  }
1176  // Caught exception is propagated to the caller
1177  CMS_SA_ALLOW try { Traits::postScheduleSignal(actReg_.get(), &streamContext_); } catch (...) {
1178  if (not iExcept) {
1179  iExcept = std::current_exception();
1180  }
1181  }
1182  if (not iExcept) {
1183  resetEarlyDelete();
1184  }
1185 
1186  return iExcept;
1187  }
#define CMS_SA_ALLOW
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
std::shared_ptr< ActivityRegistry > actReg_
StreamContext streamContext_
auto wrap(F iFunc) -> decltype(iFunc())
std::list< std::string > const & context() const
Definition: Exception.cc:151

◆ getAllModuleDescriptions()

std::vector< ModuleDescription const * > edm::StreamSchedule::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this StreamSchedule. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 983 of file StreamSchedule.cc.

References allWorkers(), AlCaHLTBitMon_ParallelJobs::p, and mps_fire::result.

983  {
984  std::vector<ModuleDescription const*> result;
985  result.reserve(allWorkers().size());
986 
987  for (auto const& worker : allWorkers()) {
988  ModuleDescription const* p = worker->description();
989  result.push_back(p);
990  }
991  return result;
992  }
size
Write out results.
AllWorkers const & allWorkers() const
returns the collection of pointers to workers

◆ getTriggerReport()

void edm::StreamSchedule::getTriggerReport ( TriggerReport rep) const

Return the trigger report information on paths, modules-in-path, modules-in-endpath, and modules.

Definition at line 1311 of file StreamSchedule.cc.

References allWorkers(), end_paths_, edm::fillPathSummary(), edm::fillWorkerSummary(), cuy::rep, totalEvents(), totalEventsFailed(), totalEventsPassed(), and trig_paths_.

1311  {
1312  rep.eventSummary.totalEvents += totalEvents();
1313  rep.eventSummary.totalEventsPassed += totalEventsPassed();
1314  rep.eventSummary.totalEventsFailed += totalEventsFailed();
1315 
1316  fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
1317  fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
1318  fill_summary(allWorkers(), rep.workerSummaries, &fillWorkerSummary);
1319  }
int totalEventsPassed() const
int totalEventsFailed() const
static void fillPathSummary(Path const &path, PathSummary &sum)
rep
Definition: cuy.py:1189
int totalEvents() const
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
AllWorkers const & allWorkers() const
returns the collection of pointers to workers

◆ initializeEarlyDelete()

void edm::StreamSchedule::initializeEarlyDelete ( ModuleRegistry modReg,
std::vector< std::string > const &  branchesToDeleteEarly,
std::multimap< std::string, std::string > const &  referencesToBranches,
std::vector< std::string > const &  modulesToSkip,
edm::ProductRegistry const &  preg 
)

Definition at line 493 of file StreamSchedule.cc.

References allWorkers(), cms::cuda::assert(), MicroEventContent_cff::branch, edm::maker::ModuleHolder::createOutputModuleCommunicator(), dumpMFGeometry_cfg::delta, submitPVResolutionJobs::desc, earlyDeleteBranchToCount_, earlyDeleteHelpers_, earlyDeleteHelperToBranchIndicies_, end_paths_, edm::ModuleRegistry::forAllModuleHolders(), dqm-mbProfile::format, newFWLiteAna::found, edm::InEvent, B2GTnPMonitor_cfi::item, gpuClustering::pixelStatus::kEmpty, MainPageGenerator::l, dqmiodumpmetadata::n, AlCaHLTBitMon_ParallelJobs::p, resetEarlyDelete(), AlCaHLTBitMon_QueryRunRegistry::string, groupFilesInBlocks::temp, trig_paths_, mitigatedMETSequence_cff::U, and w().

497  {
498  // setup the list with those products actually registered for this job
499  std::multimap<std::string, Worker*> branchToReadingWorker;
500  initializeBranchToReadingWorker(branchesToDeleteEarly, preg, branchToReadingWorker);
501 
502  const std::vector<std::string> kEmpty;
503  std::map<Worker*, unsigned int> reserveSizeForWorker;
504  unsigned int upperLimitOnReadingWorker = 0;
505  unsigned int upperLimitOnIndicies = 0;
506  unsigned int nUniqueBranchesToDelete = branchToReadingWorker.size();
507 
508  //talk with output modules first
509  modReg.forAllModuleHolders([&branchToReadingWorker, &nUniqueBranchesToDelete](maker::ModuleHolder* iHolder) {
510  auto comm = iHolder->createOutputModuleCommunicator();
511  if (comm) {
512  if (!branchToReadingWorker.empty()) {
513  //If an OutputModule needs a product, we can't delete it early
514  // so we should remove it from our list
515  SelectedProductsForBranchType const& kept = comm->keptProducts();
516  for (auto const& item : kept[InEvent]) {
517  BranchDescription const& desc = *item.first;
518  auto found = branchToReadingWorker.equal_range(desc.branchName());
519  if (found.first != found.second) {
520  --nUniqueBranchesToDelete;
521  branchToReadingWorker.erase(found.first, found.second);
522  }
523  }
524  }
525  }
526  });
527 
528  if (branchToReadingWorker.empty()) {
529  return;
530  }
531 
532  std::unordered_set<std::string> modulesToExclude(modulesToSkip.begin(), modulesToSkip.end());
533  for (auto w : allWorkers()) {
534  if (modulesToExclude.end() != modulesToExclude.find(w->description()->moduleLabel())) {
535  continue;
536  }
537  //determine if this module could read a branch we want to delete early
538  auto consumes = w->consumesInfo();
539  if (not consumes.empty()) {
540  bool foundAtLeastOneMatchingBranch = false;
541  for (auto const& product : consumes) {
542  std::string branch = fmt::format("{}_{}_{}_{}",
543  product.type().friendlyClassName(),
544  product.label().data(),
545  product.instance().data(),
546  product.process().data());
547  {
548  //Handle case where worker directly consumes product
549  auto found = branchToReadingWorker.end();
550  if (product.process().empty()) {
551  auto startFound = branchToReadingWorker.lower_bound(branch);
552  if (startFound != branchToReadingWorker.end()) {
553  if (startFound->first.substr(0, branch.size()) == branch) {
554  //match all processNames here, even if it means multiple matches will happen
555  found = startFound;
556  }
557  }
558  } else {
559  auto exactFound = branchToReadingWorker.equal_range(branch);
560  if (exactFound.first != exactFound.second) {
561  found = exactFound.first;
562  }
563  }
564  if (found != branchToReadingWorker.end()) {
565  if (not foundAtLeastOneMatchingBranch) {
566  ++upperLimitOnReadingWorker;
567  foundAtLeastOneMatchingBranch = true;
568  }
569  ++upperLimitOnIndicies;
570  ++reserveSizeForWorker[w];
571  if (nullptr == found->second) {
572  found->second = w;
573  } else {
574  branchToReadingWorker.insert(make_pair(found->first, w));
575  }
576  }
577  }
578  {
579  //Handle case where indirectly consumes product
580  auto found = referencesToBranches.end();
581  if (product.process().empty()) {
582  auto startFound = referencesToBranches.lower_bound(branch);
583  if (startFound != referencesToBranches.end()) {
584  if (startFound->first.substr(0, branch.size()) == branch) {
585  //match all processNames here, even if it means multiple matches will happen
586  found = startFound;
587  }
588  }
589  } else {
590  //can match exactly
591  auto exactFound = referencesToBranches.equal_range(branch);
592  if (exactFound.first != exactFound.second) {
593  found = exactFound.first;
594  }
595  }
596  if (found != referencesToBranches.end()) {
597  for (auto itr = found; (itr != referencesToBranches.end()) and (itr->first == found->first); ++itr) {
598  auto foundInBranchToReadingWorker = branchToReadingWorker.find(itr->second);
599  if (foundInBranchToReadingWorker == branchToReadingWorker.end()) {
600  continue;
601  }
602  if (not foundAtLeastOneMatchingBranch) {
603  ++upperLimitOnReadingWorker;
604  foundAtLeastOneMatchingBranch = true;
605  }
606  ++upperLimitOnIndicies;
607  ++reserveSizeForWorker[w];
608  if (nullptr == foundInBranchToReadingWorker->second) {
609  foundInBranchToReadingWorker->second = w;
610  } else {
611  branchToReadingWorker.insert(make_pair(itr->second, w));
612  }
613  }
614  }
615  }
616  }
617  }
618  }
619  {
620  auto it = branchToReadingWorker.begin();
621  std::vector<std::string> unusedBranches;
622  while (it != branchToReadingWorker.end()) {
623  if (it->second == nullptr) {
624  unusedBranches.push_back(it->first);
625  //erasing the object invalidates the iterator so must advance it first
626  auto temp = it;
627  ++it;
628  branchToReadingWorker.erase(temp);
629  } else {
630  ++it;
631  }
632  }
633  if (not unusedBranches.empty()) {
634  LogWarning l("UnusedProductsForCanDeleteEarly");
635  l << "The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
636  " If possible, remove the producer from the job.";
637  for (auto const& n : unusedBranches) {
638  l << "\n " << n;
639  }
640  }
641  }
642  if (!branchToReadingWorker.empty()) {
643  earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
644  earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies, 0);
645  earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
646  std::map<const Worker*, EarlyDeleteHelper*> alreadySeenWorkers;
647  std::string lastBranchName;
648  size_t nextOpenIndex = 0;
649  unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
650  for (auto& branchAndWorker : branchToReadingWorker) {
651  if (lastBranchName != branchAndWorker.first) {
652  //have to put back the period we removed earlier in order to get the proper name
653  BranchID bid(branchAndWorker.first + ".");
654  earlyDeleteBranchToCount_.emplace_back(bid, 0U);
655  lastBranchName = branchAndWorker.first;
656  }
657  auto found = alreadySeenWorkers.find(branchAndWorker.second);
658  if (alreadySeenWorkers.end() == found) {
659  //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
660  // all the branches that might be read by this worker. However, initially we will only tell the
661  // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
662  // EarlyDeleteHelper will automatically advance its internal end pointer.
663  size_t index = nextOpenIndex;
664  size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
667  earlyDeleteHelpers_.emplace_back(beginAddress + index, beginAddress + index + 1, &earlyDeleteBranchToCount_);
668  branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
669  alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second, &(earlyDeleteHelpers_.back())));
670  nextOpenIndex += nIndices;
671  } else {
672  found->second->appendIndex(earlyDeleteBranchToCount_.size() - 1);
673  }
674  }
675 
676  //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
677  // space needed for each module
678  auto itLast = earlyDeleteHelpers_.begin();
679  for (auto it = earlyDeleteHelpers_.begin() + 1; it != earlyDeleteHelpers_.end(); ++it) {
680  if (itLast->end() != it->begin()) {
681  //figure the offset for next Worker since it hasn't been moved yet so it has the original address
682  unsigned int delta = it->begin() - itLast->end();
683  it->shiftIndexPointers(delta);
684 
686  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
687  earlyDeleteHelperToBranchIndicies_.begin() + (it->begin() - beginAddress));
688  }
689  itLast = it;
690  }
692  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
694 
695  //now tell the paths about the deleters
696  for (auto& p : trig_paths_) {
697  p.setEarlyDeleteHelpers(alreadySeenWorkers);
698  }
699  for (auto& p : end_paths_) {
700  p.setEarlyDeleteHelpers(alreadySeenWorkers);
701  }
703  }
704  }
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
T w() const
std::vector< BranchToCount > earlyDeleteBranchToCount_
assert(be >=bs)
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
Log< level::Warning, false > LogWarning
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_

◆ makePathStatusInserters()

void edm::StreamSchedule::makePathStatusInserters ( std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters,
std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &  endPathStatusInserters,
ExceptionToActionTable const &  actions 
)
private

Definition at line 1347 of file StreamSchedule.cc.

References actions, actReg_, addToAllWorkers(), empty_end_paths_, empty_trig_paths_, end_paths_, endPathStatusInserterWorkers_, edm::get_underlying(), pathStatusInserterWorkers_, and trig_paths_.

Referenced by StreamSchedule().

1350  {
1351  int bitpos = 0;
1352  unsigned int indexEmpty = 0;
1353  unsigned int indexOfPath = 0;
1354  for (auto& pathStatusInserter : pathStatusInserters) {
1355  std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
1356  WorkerPtr workerPtr(
1357  new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1358  pathStatusInserterWorkers_.emplace_back(workerPtr);
1359  workerPtr->setActivityRegistry(actReg_);
1360  addToAllWorkers(workerPtr.get());
1361 
1362  // A little complexity here because a C++ Path object is not
1363  // instantiated and put into end_paths if there are no modules
1364  // on the configured path.
1365  if (indexEmpty < empty_trig_paths_.size() && bitpos == empty_trig_paths_.at(indexEmpty)) {
1366  ++indexEmpty;
1367  } else {
1368  trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(), workerPtr.get());
1369  ++indexOfPath;
1370  }
1371  ++bitpos;
1372  }
1373 
1374  bitpos = 0;
1375  indexEmpty = 0;
1376  indexOfPath = 0;
1377  for (auto& endPathStatusInserter : endPathStatusInserters) {
1378  std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
1379  WorkerPtr workerPtr(
1380  new edm::WorkerT<EndPathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1381  endPathStatusInserterWorkers_.emplace_back(workerPtr);
1382  workerPtr->setActivityRegistry(actReg_);
1383  addToAllWorkers(workerPtr.get());
1384 
1385  // A little complexity here because a C++ Path object is not
1386  // instantiated and put into end_paths if there are no modules
1387  // on the configured path.
1388  if (indexEmpty < empty_end_paths_.size() && bitpos == empty_end_paths_.at(indexEmpty)) {
1389  ++indexEmpty;
1390  } else {
1391  end_paths_.at(indexOfPath).setPathStatusInserter(nullptr, workerPtr.get());
1392  ++indexOfPath;
1393  }
1394  ++bitpos;
1395  }
1396  }
std::vector< int > empty_trig_paths_
roAction_t actions[nactions]
Definition: GenABIO.cc:181
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void addToAllWorkers(Worker *w)
std::shared_ptr< Worker > WorkerPtr
std::shared_ptr< ActivityRegistry > actReg_
std::vector< int > empty_end_paths_
constexpr T & get_underlying(propagate_const< T > &)
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_

◆ moduleDescriptionsInEndPath()

void edm::StreamSchedule::moduleDescriptionsInEndPath ( std::string const &  iEndPathLabel,
std::vector< ModuleDescription const *> &  descriptions,
unsigned int  hint 
) const

Definition at line 1239 of file StreamSchedule.cc.

References end_paths_, newFWLiteAna::found, mps_fire::i, and edm::Path::name().

1241  {
1242  descriptions.clear();
1243  bool found = false;
1244  TrigPaths::const_iterator itFound;
1245 
1246  if (hint < end_paths_.size()) {
1247  itFound = end_paths_.begin() + hint;
1248  if (itFound->name() == iEndPathLabel)
1249  found = true;
1250  }
1251  if (!found) {
1252  // if the hint did not work, do it the slow way
1253  itFound = std::find_if(
1254  end_paths_.begin(),
1255  end_paths_.end(),
1256  std::bind(std::equal_to<std::string>(), iEndPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1257  if (itFound != end_paths_.end())
1258  found = true;
1259  }
1260  if (found) {
1261  descriptions.reserve(itFound->size());
1262  for (size_t i = 0; i < itFound->size(); ++i) {
1263  descriptions.push_back(itFound->getWorker(i)->description());
1264  }
1265  }
1266  }
std::string const & name() const
Definition: Path.h:73

◆ moduleDescriptionsInPath()

void edm::StreamSchedule::moduleDescriptionsInPath ( std::string const &  iPathLabel,
std::vector< ModuleDescription const *> &  descriptions,
unsigned int  hint 
) const

Definition at line 1210 of file StreamSchedule.cc.

References newFWLiteAna::found, mps_fire::i, edm::Path::name(), and trig_paths_.

1212  {
1213  descriptions.clear();
1214  bool found = false;
1215  TrigPaths::const_iterator itFound;
1216 
1217  if (hint < trig_paths_.size()) {
1218  itFound = trig_paths_.begin() + hint;
1219  if (itFound->name() == iPathLabel)
1220  found = true;
1221  }
1222  if (!found) {
1223  // if the hint did not work, do it the slow way
1224  itFound = std::find_if(
1225  trig_paths_.begin(),
1226  trig_paths_.end(),
1227  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1228  if (itFound != trig_paths_.end())
1229  found = true;
1230  }
1231  if (found) {
1232  descriptions.reserve(itFound->size());
1233  for (size_t i = 0; i < itFound->size(); ++i) {
1234  descriptions.push_back(itFound->getWorker(i)->description());
1235  }
1236  }
1237  }
std::string const & name() const
Definition: Path.h:73

◆ modulesInPath()

void edm::StreamSchedule::modulesInPath ( std::string const &  iPathLabel,
std::vector< std::string > &  oLabelsToFill 
) const

adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel

Definition at line 1197 of file StreamSchedule.cc.

References mps_fire::i, edm::Path::name(), and trig_paths_.

1197  {
1198  TrigPaths::const_iterator itFound = std::find_if(
1199  trig_paths_.begin(),
1200  trig_paths_.end(),
1201  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1202  if (itFound != trig_paths_.end()) {
1203  oLabelsToFill.reserve(itFound->size());
1204  for (size_t i = 0; i < itFound->size(); ++i) {
1205  oLabelsToFill.push_back(itFound->getWorker(i)->description()->moduleLabel());
1206  }
1207  }
1208  }
std::string const & name() const
Definition: Path.h:73

◆ numberOfUnscheduledModules()

unsigned int edm::StreamSchedule::numberOfUnscheduledModules ( ) const
inline

Definition at line 259 of file StreamSchedule.h.

References number_of_unscheduled_modules_.

unsigned int number_of_unscheduled_modules_

◆ processOneEventAsync()

void edm::StreamSchedule::processOneEventAsync ( WaitingTaskHolder  iTask,
EventTransitionInfo info,
ServiceToken const &  token,
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters 
)

Definition at line 994 of file StreamSchedule.cc.

References actReg_, CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), empty_end_paths_, empty_trig_paths_, end_paths_, endPathStatusInserterWorkers_, SiStripBadComponentsDQMServiceTemplate_cfg::ep, finishedPaths(), finishProcessOneEvent(), edm::WaitingTaskHolder::group(), info(), edm::ServiceWeakToken::lock(), edm::make_waiting_task(), eostools::move(), edm::hlt::Pass, pathStatusInserterWorkers_, edm::WorkerManager::processAccumulatorsAsync(), resetAll(), results_, edm::WorkerManager::setupOnDemandSystem(), edm::WorkerManager::setupResolvers(), streamContext_, streamID_, total_events_, trig_paths_, and workerManager_.

998  {
999  EventPrincipal& ep = info.principal();
1000 
1001  // Caught exception is propagated via WaitingTaskHolder
1002  CMS_SA_ALLOW try {
1003  this->resetAll();
1004 
1005  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
1006 
1007  Traits::setStreamContext(streamContext_, ep);
1008  //a service may want to communicate with another service
1009  ServiceRegistry::Operate guard(serviceToken);
1010  Traits::preScheduleSignal(actReg_.get(), &streamContext_);
1011 
1012  // Data dependencies need to be set up before marking empty
1013  // (End)Paths complete in case something consumes the status of
1014  // the empty (EndPath)
1017 
1018  HLTPathStatus hltPathStatus(hlt::Pass, 0);
1019  for (int empty_trig_path : empty_trig_paths_) {
1020  results_->at(empty_trig_path) = hltPathStatus;
1021  pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
1022  std::exception_ptr except = pathStatusInserterWorkers_[empty_trig_path]
1023  ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
1024  info, streamID_, ParentContext(&streamContext_), &streamContext_);
1025  if (except) {
1026  iTask.doneWaiting(except);
1027  return;
1028  }
1029  }
1030  for (int empty_end_path : empty_end_paths_) {
1031  std::exception_ptr except = endPathStatusInserterWorkers_[empty_end_path]
1032  ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
1033  info, streamID_, ParentContext(&streamContext_), &streamContext_);
1034  if (except) {
1035  iTask.doneWaiting(except);
1036  return;
1037  }
1038  }
1039 
1040  ++total_events_;
1041 
1042  //use to give priorities on an error to ones from Paths
1043  auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(nullptr);
1044  auto pathErrorPtr = pathErrorHolder.get();
1045  ServiceWeakToken weakToken = serviceToken;
1046  auto allPathsDone = make_waiting_task(
1047  [iTask, this, weakToken, pathError = std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable {
1048  ServiceRegistry::Operate operate(weakToken.lock());
1049 
1050  std::exception_ptr ptr;
1051  if (pathError->load()) {
1052  ptr = *pathError->load();
1053  delete pathError->load();
1054  }
1055  if ((not ptr) and iPtr) {
1056  ptr = *iPtr;
1057  }
1058  iTask.doneWaiting(finishProcessOneEvent(ptr));
1059  });
1060  //The holder guarantees that if the paths finish before the loop ends
1061  // that we do not start too soon. It also guarantees that the task will
1062  // run under that condition.
1063  WaitingTaskHolder allPathsHolder(*iTask.group(), allPathsDone);
1064 
1065  auto pathsDone = make_waiting_task([allPathsHolder, pathErrorPtr, transitionInfo = info, this, weakToken](
1066  std::exception_ptr const* iPtr) mutable {
1067  ServiceRegistry::Operate operate(weakToken.lock());
1068 
1069  if (iPtr) {
1070  //this is used to prioritize this error over one
1071  // that happens in EndPath or Accumulate
1072  pathErrorPtr->store(new std::exception_ptr(*iPtr));
1073  }
1074  finishedPaths(*pathErrorPtr, std::move(allPathsHolder), transitionInfo);
1075  });
1076 
1077  //The holder guarantees that if the paths finish before the loop ends
1078  // that we do not start too soon. It also guarantees that the task will
1079  // run under that condition.
1080  WaitingTaskHolder taskHolder(*iTask.group(), pathsDone);
1081 
1082  //start end paths first so on single threaded the paths will run first
1083  WaitingTaskHolder hAllPathsDone(*iTask.group(), allPathsDone);
1084  for (auto it = end_paths_.rbegin(), itEnd = end_paths_.rend(); it != itEnd; ++it) {
1085  it->processOneOccurrenceAsync(hAllPathsDone, info, serviceToken, streamID_, &streamContext_);
1086  }
1087 
1088  for (auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend(); it != itEnd; ++it) {
1089  it->processOneOccurrenceAsync(taskHolder, info, serviceToken, streamID_, &streamContext_);
1090  }
1091 
1092  ParentContext parentContext(&streamContext_);
1093  workerManager_.processAccumulatorsAsync<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
1094  hAllPathsDone, info, serviceToken, streamID_, parentContext, &streamContext_);
1095  } catch (...) {
1096  iTask.doneWaiting(std::current_exception());
1097  }
1098  }
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
std::vector< int > empty_trig_paths_
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void processAccumulatorsAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
void setupOnDemandSystem(EventTransitionInfo const &)
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
std::vector< int > empty_end_paths_
accept
Definition: HLTenums.h:18
StreamContext streamContext_
void finishedPaths(std::atomic< std::exception_ptr *> &, WaitingTaskHolder, EventTransitionInfo &)
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
edm::propagate_const< TrigResPtr > results_
void setupResolvers(Principal &principal)
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
def move(src, dest)
Definition: eostools.py:511

◆ processOneStreamAsync()

template<typename T >
void edm::StreamSchedule::processOneStreamAsync ( WaitingTaskHolder  iTask,
typename T::TransitionInfoType &  transitionInfo,
ServiceToken const &  token,
bool  cleaningUpAfterException = false 
)

Definition at line 396 of file StreamSchedule.h.

References actReg_, edm::addContextAndPrintException(), CMS_SA_ALLOW, cms::Exception::context(), edm::WaitingTaskHolder::doneWaiting(), end_paths_, edm::ExceptionFromThisContext, edm::WaitingTaskHolder::group(), h, l1ctLayer2EG_cff::id, info(), edm::ServiceWeakToken::lock(), edm::make_functor_task(), edm::make_waiting_task(), findAndChange::op, AlCaHLTBitMon_ParallelJobs::p, edm::WorkerManager::processOneOccurrenceAsync(), edm::WorkerManager::resetAll(), alignCSCRings::s, streamContext_, streamID_, TrackValidation_cff::task, unpackBuffers-CaloStage2::token, edm::transitionName(), trig_paths_, edm::StreamID::value(), workerManager_, and edm::convertException::wrap().

399  {
400  auto const& principal = transitionInfo.principal();
401  T::setStreamContext(streamContext_, principal);
402 
403  auto id = principal.id();
404  ServiceWeakToken weakToken = token;
405  auto doneTask = make_waiting_task(
406  [this, iHolder, id, cleaningUpAfterException, weakToken](std::exception_ptr const* iPtr) mutable {
407  std::exception_ptr excpt;
408  if (iPtr) {
409  excpt = *iPtr;
410  //add context information to the exception and print message
411  try {
412  convertException::wrap([&]() { std::rethrow_exception(excpt); });
413  } catch (cms::Exception& ex) {
414  //TODO: should add the transition type info
415  std::ostringstream ost;
416  if (ex.context().empty()) {
417  ost << "Processing " << T::transitionName() << " " << id;
418  }
419  ServiceRegistry::Operate op(weakToken.lock());
420  addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
421  excpt = std::current_exception();
422  }
423 
424  ServiceRegistry::Operate op(weakToken.lock());
425  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
426  }
427  // Caught exception is propagated via WaitingTaskHolder
428  CMS_SA_ALLOW try {
429  ServiceRegistry::Operate op(weakToken.lock());
430  T::postScheduleSignal(actReg_.get(), &streamContext_);
431  } catch (...) {
432  if (not excpt) {
433  excpt = std::current_exception();
434  }
435  }
436  iHolder.doneWaiting(excpt);
437  });
438 
439  auto task = make_functor_task(
440  [this, h = WaitingTaskHolder(*iHolder.group(), doneTask), info = transitionInfo, weakToken]() mutable {
441  auto token = weakToken.lock();
443  // Caught exception is propagated via WaitingTaskHolder
444  CMS_SA_ALLOW try {
445  T::preScheduleSignal(actReg_.get(), &streamContext_);
446 
448  } catch (...) {
449  h.doneWaiting(std::current_exception());
450  return;
451  }
452 
453  for (auto& p : end_paths_) {
454  p.runAllModulesAsync<T>(h, info, token, streamID_, &streamContext_);
455  }
456 
457  for (auto& p : trig_paths_) {
458  p.runAllModulesAsync<T>(h, info, token, streamID_, &streamContext_);
459  }
460 
462  });
463 
464  if (streamID_.value() == 0) {
465  //Enqueueing will start another thread if there is only
466  // one thread in the job. Having stream == 0 use spawn
467  // avoids starting up another thread when there is only one stream.
468  iHolder.group()->run([task]() {
469  TaskSentry s{task};
470  task->execute();
471  });
472  } else {
473  oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
474  arena.enqueue([task]() {
475  TaskSentry s{task};
476  task->execute();
477  });
478  }
479  }
std::string_view transitionName(GlobalContext::Transition)
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
void processOneOccurrenceAsync(WaitingTaskHolder, typename T::TransitionInfoType &, ServiceToken const &, StreamID, typename T::Context const *topContext, U const *context)
StreamContext streamContext_
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
FunctorTask< F > * make_functor_task(F f)
Definition: FunctorTask.h:44
unsigned int value() const
Definition: StreamID.h:43
auto wrap(F iFunc) -> decltype(iFunc())
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
std::list< std::string > const & context() const
Definition: Exception.cc:151
long double T

◆ replaceModule()

void edm::StreamSchedule::replaceModule ( maker::ModuleHolder iMod,
std::string const &  iLabel 
)

clone the type of module with label iLabel but configure with iPSet.

Definition at line 965 of file StreamSchedule.cc.

References allWorkers(), newFWLiteAna::found, edm::maker::ModuleHolder::replaceModuleFor(), streamContext_, and streamID_.

965  {
966  Worker* found = nullptr;
967  for (auto const& worker : allWorkers()) {
968  if (worker->description()->moduleLabel() == iLabel) {
969  found = worker;
970  break;
971  }
972  }
973  if (nullptr == found) {
974  return;
975  }
976 
977  iMod->replaceModuleFor(found);
978  found->beginStream(streamID_, streamContext_);
979  }
StreamContext streamContext_
AllWorkers const & allWorkers() const
returns the collection of pointers to workers

◆ reportSkipped()

void edm::StreamSchedule::reportSkipped ( EventPrincipal const &  ep) const
inlineprivate

Definition at line 390 of file StreamSchedule.h.

References SiStripBadComponentsDQMServiceTemplate_cfg::ep.

390  {
391  Service<JobReport> reportSvc;
392  reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
393  }

◆ resetAll()

void edm::StreamSchedule::resetAll ( )
private

Definition at line 1329 of file StreamSchedule.cc.

References results_.

Referenced by processOneEventAsync().

1329 { results_->reset(); }
edm::propagate_const< TrigResPtr > results_

◆ resetEarlyDelete()

void edm::StreamSchedule::resetEarlyDelete ( )
private

Definition at line 1333 of file StreamSchedule.cc.

References submitPVResolutionJobs::count, earlyDeleteBranchToCount_, earlyDeleteHelpers_, and earlyDeleteHelperToBranchIndicies_.

Referenced by finishProcessOneEvent(), and initializeEarlyDelete().

1333  {
1334  //must be sure we have cleared the count first
1335  for (auto& count : earlyDeleteBranchToCount_) {
1336  count.count = 0;
1337  }
1338  //now reset based on how many helpers use that branch
1340  ++(earlyDeleteBranchToCount_[index].count);
1341  }
1342  for (auto& helper : earlyDeleteHelpers_) {
1343  helper.reset();
1344  }
1345  }
Definition: helper.py:1
std::vector< BranchToCount > earlyDeleteBranchToCount_
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_

◆ results() [1/2]

TrigResConstPtr edm::StreamSchedule::results ( ) const
inlineprivate

Definition at line 345 of file StreamSchedule.h.

References edm::get_underlying_safe(), and results_.

Referenced by StreamSchedule().

345 { return get_underlying_safe(results_); }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< TrigResPtr > results_

◆ results() [2/2]

TrigResPtr& edm::StreamSchedule::results ( )
inlineprivate

Definition at line 346 of file StreamSchedule.h.

References edm::get_underlying_safe(), and results_.

346 { return get_underlying_safe(results_); }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< TrigResPtr > results_

◆ streamID()

StreamID edm::StreamSchedule::streamID ( ) const
inline

Definition at line 199 of file StreamSchedule.h.

References streamID_.

Referenced by StreamSchedule().

199 { return streamID_; }

◆ totalEvents()

int edm::StreamSchedule::totalEvents ( ) const
inline

Return the number of events this StreamSchedule has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 226 of file StreamSchedule.h.

References total_events_.

Referenced by getTriggerReport(), and totalEventsFailed().

226 { return total_events_; }

◆ totalEventsFailed()

int edm::StreamSchedule::totalEventsFailed ( ) const
inline

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 234 of file StreamSchedule.h.

References totalEvents(), and totalEventsPassed().

Referenced by getTriggerReport().

234 { return totalEvents() - totalEventsPassed(); }
int totalEventsPassed() const
int totalEvents() const

◆ totalEventsPassed()

int edm::StreamSchedule::totalEventsPassed ( ) const
inline

Return the number of events which have been passed by one or more trigger paths.

Definition at line 230 of file StreamSchedule.h.

References total_passed_.

Referenced by getTriggerReport(), and totalEventsFailed().

230 { return total_passed_; }

◆ tryToPlaceConditionalModules()

std::vector< Worker * > edm::StreamSchedule::tryToPlaceConditionalModules ( Worker worker,
std::unordered_set< std::string > &  conditionalModules,
std::unordered_multimap< std::string, edm::BranchDescription const *> const &  conditionalModuleBranches,
std::unordered_multimap< std::string, AliasInfo > const &  aliasMap,
ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration 
)
private

Definition at line 706 of file StreamSchedule.cc.

References cms::cuda::assert(), edm::TypeID::className(), edm::Worker::consumesInfo(), edm::Worker::description(), HerwigMaxPtPartonFilter_cfi::moduleLabel, edm::ModuleDescription::moduleLabel(), or, edm::PRODUCT_TYPE, AlCaHLTBitMon_QueryRunRegistry::string, edm::productholderindexhelper::typeIsViewCompatible(), and workerManager_.

Referenced by fillWorkers().

714  {
715  std::vector<Worker*> returnValue;
716  auto const& consumesInfo = worker->consumesInfo();
717  auto moduleLabel = worker->description()->moduleLabel();
718  using namespace productholderindexhelper;
719  for (auto const& ci : consumesInfo) {
720  if (not ci.skipCurrentProcess() and
721  (ci.process().empty() or ci.process() == processConfiguration->processName())) {
722  auto productModuleLabel = std::string(ci.label());
723  bool productFromConditionalModule = false;
724  auto itFound = conditionalModules.find(productModuleLabel);
725  if (itFound == conditionalModules.end()) {
726  //Check to see if this was an alias
727  //note that aliasMap was previously filtered so only the conditional modules remain there
728  auto foundAlias = findBestMatchingAlias(conditionalModuleBranches, aliasMap, productModuleLabel, ci);
729  if (foundAlias) {
730  productModuleLabel = *foundAlias;
731  productFromConditionalModule = true;
732  itFound = conditionalModules.find(productModuleLabel);
733  //check that the alias-for conditional module has not been used
734  if (itFound == conditionalModules.end()) {
735  continue;
736  }
737  }
738  } else {
739  //need to check the rest of the data product info
740  auto findBranches = conditionalModuleBranches.equal_range(productModuleLabel);
741  for (auto itBranch = findBranches.first; itBranch != findBranches.second; ++itBranch) {
742  if (itBranch->second->productInstanceName() == ci.instance()) {
743  if (ci.kindOfType() == PRODUCT_TYPE) {
744  if (ci.type() == itBranch->second->unwrappedTypeID()) {
745  productFromConditionalModule = true;
746  break;
747  }
748  } else {
749  //this is a view
751  ci.type(), TypeID(itBranch->second->wrappedType().typeInfo()), itBranch->second->className())) {
752  productFromConditionalModule = true;
753  break;
754  }
755  }
756  }
757  }
758  }
759  if (productFromConditionalModule) {
760  auto condWorker =
761  getWorker(productModuleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration);
762  assert(condWorker);
763 
764  conditionalModules.erase(itFound);
765 
766  auto dependents = tryToPlaceConditionalModules(condWorker,
767  conditionalModules,
768  conditionalModuleBranches,
769  aliasMap,
770  proc_pset,
771  preg,
772  prealloc,
773  processConfiguration);
774  returnValue.insert(returnValue.end(), dependents.begin(), dependents.end());
775  returnValue.push_back(condWorker);
776  }
777  }
778  }
779  return returnValue;
780  }
std::vector< Worker * > tryToPlaceConditionalModules(Worker *, std::unordered_set< std::string > &conditionalModules, std::unordered_multimap< std::string, edm::BranchDescription const *> const &conditionalModuleBranches, std::unordered_multimap< std::string, AliasInfo > const &aliasMap, ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
assert(be >=bs)
WorkerManager workerManager_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
bool typeIsViewCompatible(TypeID const &requestedViewType, TypeID const &wrappedtypeID, std::string const &className)

◆ unscheduledWorkers()

AllWorkers const& edm::StreamSchedule::unscheduledWorkers ( ) const
inline

Definition at line 258 of file StreamSchedule.h.

References edm::WorkerManager::unscheduledWorkers(), and workerManager_.

AllWorkers const & unscheduledWorkers() const
Definition: WorkerManager.h:89
WorkerManager workerManager_

Member Data Documentation

◆ actReg_

std::shared_ptr<ActivityRegistry> edm::StreamSchedule::actReg_
private

◆ earlyDeleteBranchToCount_

std::vector<BranchToCount> edm::StreamSchedule::earlyDeleteBranchToCount_
private

Definition at line 370 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

◆ earlyDeleteHelpers_

std::vector<EarlyDeleteHelper> edm::StreamSchedule::earlyDeleteHelpers_
private

Definition at line 380 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

◆ earlyDeleteHelperToBranchIndicies_

std::vector<unsigned int> edm::StreamSchedule::earlyDeleteHelperToBranchIndicies_
private

Definition at line 377 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

◆ empty_end_paths_

std::vector<int> edm::StreamSchedule::empty_end_paths_
private

Definition at line 365 of file StreamSchedule.h.

Referenced by fillEndPath(), makePathStatusInserters(), and processOneEventAsync().

◆ empty_trig_paths_

std::vector<int> edm::StreamSchedule::empty_trig_paths_
private

Definition at line 364 of file StreamSchedule.h.

Referenced by fillTrigPath(), makePathStatusInserters(), and processOneEventAsync().

◆ end_paths_

TrigPaths edm::StreamSchedule::end_paths_
private

◆ endPathStatusInserterWorkers_

std::vector<edm::propagate_const<WorkerPtr> > edm::StreamSchedule::endPathStatusInserterWorkers_
private

Definition at line 360 of file StreamSchedule.h.

Referenced by makePathStatusInserters(), and processOneEventAsync().

◆ number_of_unscheduled_modules_

unsigned int edm::StreamSchedule::number_of_unscheduled_modules_
private

Definition at line 384 of file StreamSchedule.h.

Referenced by numberOfUnscheduledModules(), and StreamSchedule().

◆ pathStatusInserterWorkers_

std::vector<edm::propagate_const<WorkerPtr> > edm::StreamSchedule::pathStatusInserterWorkers_
private

Definition at line 359 of file StreamSchedule.h.

Referenced by makePathStatusInserters(), and processOneEventAsync().

◆ results_

edm::propagate_const<TrigResPtr> edm::StreamSchedule::results_
private

Definition at line 356 of file StreamSchedule.h.

Referenced by finishedPaths(), processOneEventAsync(), resetAll(), and results().

◆ results_inserter_

edm::propagate_const<WorkerPtr> edm::StreamSchedule::results_inserter_
private

Definition at line 358 of file StreamSchedule.h.

Referenced by finishedPaths(), and StreamSchedule().

◆ streamContext_

StreamContext edm::StreamSchedule::streamContext_
private

◆ streamID_

StreamID edm::StreamSchedule::streamID_
private

◆ total_events_

int edm::StreamSchedule::total_events_
private

Definition at line 382 of file StreamSchedule.h.

Referenced by clearCounters(), processOneEventAsync(), and totalEvents().

◆ total_passed_

int edm::StreamSchedule::total_passed_
private

Definition at line 383 of file StreamSchedule.h.

Referenced by clearCounters(), finishedPaths(), and totalEventsPassed().

◆ trig_paths_

TrigPaths edm::StreamSchedule::trig_paths_
private

◆ workerManager_

WorkerManager edm::StreamSchedule::workerManager_
private