CMS 3D CMS Logo

List of all members | Classes | Public Types | Public Member Functions | Private Member Functions | Private Attributes
edm::StreamSchedule Class Reference

#include <StreamSchedule.h>

Classes

struct  AliasInfo
 
class  SendTerminationSignalIfException
 

Public Types

typedef std::vector< Worker * > AllWorkers
 
typedef std::vector< PathNonTrigPaths
 
typedef std::vector< WorkerInPathPathWorkers
 
typedef std::vector< PathTrigPaths
 
typedef std::shared_ptr< HLTGlobalStatus const > TrigResConstPtr
 
typedef std::shared_ptr< HLTGlobalStatusTrigResPtr
 
typedef std::vector< std::string > vstring
 
typedef std::shared_ptr< WorkerWorkerPtr
 
typedef std::vector< Worker * > Workers
 

Public Member Functions

AllWorkers const & allWorkers () const
 returns the collection of pointers to workers More...
 
void availablePaths (std::vector< std::string > &oLabelsToFill) const
 adds to oLabelsToFill the labels for all paths in the process More...
 
void beginStream ()
 
void clearCounters ()
 Clear all the counters in the trigger report. More...
 
StreamContext const & context () const
 
void deleteModule (std::string const &iLabel)
 Delete the module with label iLabel. More...
 
void endStream ()
 
std::vector< ModuleDescription const * > getAllModuleDescriptions () const
 
void getTriggerReport (TriggerReport &rep) const
 
void initializeEarlyDelete (ModuleRegistry &modReg, std::vector< std::string > const &branchesToDeleteEarly, std::multimap< std::string, std::string > const &referencesToBranches, std::vector< std::string > const &modulesToSkip, edm::ProductRegistry const &preg)
 
void moduleDescriptionsInEndPath (std::string const &iEndPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
 
void moduleDescriptionsInPath (std::string const &iPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
 
void modulesInPath (std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
 adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel More...
 
unsigned int numberOfUnscheduledModules () const
 
void processOneEventAsync (WaitingTaskHolder iTask, EventTransitionInfo &, ServiceToken const &token, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
 
template<typename T >
void processOneStreamAsync (WaitingTaskHolder iTask, typename T::TransitionInfoType &transitionInfo, ServiceToken const &token, bool cleaningUpAfterException=false)
 
void replaceModule (maker::ModuleHolder *iMod, std::string const &iLabel)
 clone the type of module with label iLabel but configure with iPSet. More...
 
StreamID streamID () const
 
 StreamSchedule (std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, StreamID streamID, ProcessContext const *processContext)
 
 StreamSchedule (StreamSchedule const &)=delete
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 
AllWorkers const & unscheduledWorkers () const
 

Private Member Functions

ExceptionToActionTable const & actionTable () const
 returns the action table More...
 
void addToAllWorkers (Worker *w)
 
void fillEndPath (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)
 
void fillTrigPath (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)
 
void fillWorkers (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)
 
void finishedPaths (std::atomic< std::exception_ptr *> &, WaitingTaskHolder, EventTransitionInfo &)
 
std::exception_ptr finishProcessOneEvent (std::exception_ptr)
 
void makePathStatusInserters (std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
 
void reportSkipped (EventPrincipal const &ep) const
 
void resetAll ()
 
void resetEarlyDelete ()
 
TrigResConstPtr results () const
 
TrigResPtrresults ()
 
std::vector< Worker * > tryToPlaceConditionalModules (Worker *, std::unordered_set< std::string > &conditionalModules, std::unordered_multimap< std::string, edm::BranchDescription const *> const &conditionalModuleBranches, std::unordered_multimap< std::string, AliasInfo > const &aliasMap, ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
 

Private Attributes

std::shared_ptr< ActivityRegistryactReg_
 
std::vector< BranchToCountearlyDeleteBranchToCount_
 
std::vector< EarlyDeleteHelperearlyDeleteHelpers_
 
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
 
std::vector< int > empty_end_paths_
 
std::vector< int > empty_trig_paths_
 
TrigPaths end_paths_
 
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
 
unsigned int number_of_unscheduled_modules_
 
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
 
edm::propagate_const< TrigResPtrresults_
 
edm::propagate_const< WorkerPtrresults_inserter_
 
StreamContext streamContext_
 
StreamID streamID_
 
int total_events_
 
int total_passed_
 
TrigPaths trig_paths_
 
WorkerManager workerManager_
 

Detailed Description

Definition at line 154 of file StreamSchedule.h.

Member Typedef Documentation

◆ AllWorkers

typedef std::vector<Worker*> edm::StreamSchedule::AllWorkers

Definition at line 162 of file StreamSchedule.h.

◆ NonTrigPaths

Definition at line 158 of file StreamSchedule.h.

◆ PathWorkers

Definition at line 166 of file StreamSchedule.h.

◆ TrigPaths

typedef std::vector<Path> edm::StreamSchedule::TrigPaths

Definition at line 157 of file StreamSchedule.h.

◆ TrigResConstPtr

typedef std::shared_ptr<HLTGlobalStatus const> edm::StreamSchedule::TrigResConstPtr

Definition at line 160 of file StreamSchedule.h.

◆ TrigResPtr

Definition at line 159 of file StreamSchedule.h.

◆ vstring

typedef std::vector<std::string> edm::StreamSchedule::vstring

Definition at line 156 of file StreamSchedule.h.

◆ WorkerPtr

typedef std::shared_ptr<Worker> edm::StreamSchedule::WorkerPtr

Definition at line 161 of file StreamSchedule.h.

◆ Workers

typedef std::vector<Worker*> edm::StreamSchedule::Workers

Definition at line 164 of file StreamSchedule.h.

Constructor & Destructor Documentation

◆ StreamSchedule() [1/2]

edm::StreamSchedule::StreamSchedule ( std::shared_ptr< TriggerResultInserter inserter,
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters,
std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &  endPathStatusInserters,
std::shared_ptr< ModuleRegistry modReg,
ParameterSet proc_pset,
service::TriggerNamesService const &  tns,
PreallocationConfiguration const &  prealloc,
ProductRegistry pregistry,
ExceptionToActionTable const &  actions,
std::shared_ptr< ActivityRegistry areg,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
StreamID  streamID,
ProcessContext const *  processContext 
)

Definition at line 349 of file StreamSchedule.cc.

References actions, actReg_, addToAllWorkers(), edm::WorkerManager::addToUnscheduledWorkers(), allWorkers(), cms::cuda::assert(), end_paths_, fillEndPath(), fillTrigPath(), edm::propagate_const< T >::get(), edm::service::TriggerNamesService::getEndPaths(), edm::ParameterSet::getParameter(), edm::ParameterSet::getPSetForUpdate(), edm::service::TriggerNamesService::getTrigPaths(), label, makePathStatusInserters(), number_of_unscheduled_modules_, results(), results_inserter_, DBoxMetadataHelper::set_difference(), streamID(), trig_paths_, edm::StreamID::value(), and workerManager_.

363  : workerManager_(modReg, areg, actions),
364  actReg_(areg),
365  results_(new HLTGlobalStatus(tns.getTrigPaths().size())),
367  trig_paths_(),
368  end_paths_(),
369  total_events_(),
370  total_passed_(),
373  streamContext_(streamID_, processContext) {
374  bool hasPath = false;
375  std::vector<std::string> const& pathNames = tns.getTrigPaths();
376  std::vector<std::string> const& endPathNames = tns.getEndPaths();
377 
378  ConditionalTaskHelper conditionalTaskHelper(
379  proc_pset, preg, &prealloc, processConfiguration, workerManager_, pathNames);
380 
381  int trig_bitpos = 0;
382  trig_paths_.reserve(pathNames.size());
383  for (auto const& trig_name : pathNames) {
384  fillTrigPath(proc_pset,
385  preg,
386  &prealloc,
387  processConfiguration,
388  trig_bitpos,
389  trig_name,
390  results(),
391  endPathNames,
392  conditionalTaskHelper);
393  ++trig_bitpos;
394  hasPath = true;
395  }
396 
397  if (hasPath) {
398  // the results inserter stands alone
399  inserter->setTrigResultForStream(streamID.value(), results());
400 
401  results_inserter_ = makeInserter(actions, actReg_, inserter);
403  }
404 
405  // fill normal endpaths
406  int bitpos = 0;
407  end_paths_.reserve(endPathNames.size());
408  for (auto const& end_path_name : endPathNames) {
409  fillEndPath(
410  proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name, endPathNames, conditionalTaskHelper);
411  ++bitpos;
412  }
413 
414  makePathStatusInserters(pathStatusInserters, endPathStatusInserters, actions);
415 
416  //See if all modules were used
417  std::set<std::string> usedWorkerLabels;
418  for (auto const& worker : allWorkers()) {
419  usedWorkerLabels.insert(worker->description()->moduleLabel());
420  }
421  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
422  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
423  std::vector<std::string> unusedLabels;
424  set_difference(modulesInConfigSet.begin(),
425  modulesInConfigSet.end(),
426  usedWorkerLabels.begin(),
427  usedWorkerLabels.end(),
428  back_inserter(unusedLabels));
429  std::set<std::string> unscheduledLabels;
430  std::vector<std::string> shouldBeUsedLabels;
431  if (!unusedLabels.empty()) {
432  //Need to
433  // 1) create worker
434  // 2) if it is a WorkerT<EDProducer>, add it to our list
435  // 3) hand list to our delayed reader
436  for (auto const& label : unusedLabels) {
437  bool isTracked;
438  ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
439  assert(isTracked);
440  assert(modulePSet != nullptr);
442  *modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
443  }
444  if (!shouldBeUsedLabels.empty()) {
445  std::ostringstream unusedStream;
446  unusedStream << "'" << shouldBeUsedLabels.front() << "'";
447  for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
448  itLabelEnd = shouldBeUsedLabels.end();
449  itLabel != itLabelEnd;
450  ++itLabel) {
451  unusedStream << ",'" << *itLabel << "'";
452  }
453  LogInfo("path") << "The following module labels are not assigned to any path:\n" << unusedStream.str() << "\n";
454  }
455  }
456  number_of_unscheduled_modules_ = unscheduledLabels.size();
457  } // StreamSchedule::StreamSchedule
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)
roAction_t actions[nactions]
Definition: GenABIO.cc:181
void addToAllWorkers(Worker *w)
edm::propagate_const< WorkerPtr > results_inserter_
assert(be >=bs)
StreamID streamID() const
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
constexpr element_type const * get() const
char const * label
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)
TrigResConstPtr results() const
StreamContext streamContext_
Log< level::Info, false > LogInfo
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
edm::propagate_const< TrigResPtr > results_
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
unsigned int value() const
Definition: StreamID.h:43
std::vector< std::string > set_difference(std::vector< std::string > const &v1, std::vector< std::string > const &v2)

◆ StreamSchedule() [2/2]

edm::StreamSchedule::StreamSchedule ( StreamSchedule const &  )
delete

Member Function Documentation

◆ actionTable()

ExceptionToActionTable const& edm::StreamSchedule::actionTable ( ) const
inlineprivate

returns the action table

Definition at line 292 of file StreamSchedule.h.

References edm::WorkerManager::actionTable(), and workerManager_.

Referenced by fillEndPath(), fillTrigPath(), and finishedPaths().

292 { return workerManager_.actionTable(); }
WorkerManager workerManager_
ExceptionToActionTable const & actionTable() const
Definition: WorkerManager.h:93

◆ addToAllWorkers()

void edm::StreamSchedule::addToAllWorkers ( Worker w)
private

◆ allWorkers()

AllWorkers const& edm::StreamSchedule::allWorkers ( ) const
inline

returns the collection of pointers to workers

Definition at line 256 of file StreamSchedule.h.

References edm::WorkerManager::allWorkers(), and workerManager_.

Referenced by clearCounters(), getAllModuleDescriptions(), getTriggerReport(), initializeEarlyDelete(), replaceModule(), and StreamSchedule().

256 { return workerManager_.allWorkers(); }
WorkerManager workerManager_
AllWorkers const & allWorkers() const
Definition: WorkerManager.h:88

◆ availablePaths()

void edm::StreamSchedule::availablePaths ( std::vector< std::string > &  oLabelsToFill) const

adds to oLabelsToFill the labels for all paths in the process

Definition at line 1133 of file StreamSchedule.cc.

References edm::Path::name(), HcalDetIdTransform::transform(), and trig_paths_.

1133  {
1134  oLabelsToFill.reserve(trig_paths_.size());
1135  std::transform(trig_paths_.begin(),
1136  trig_paths_.end(),
1137  std::back_inserter(oLabelsToFill),
1138  std::bind(&Path::name, std::placeholders::_1));
1139  }
std::string const & name() const
Definition: Path.h:73
unsigned transform(const HcalDetId &id, unsigned transformCode)

◆ beginStream()

void edm::StreamSchedule::beginStream ( )

Definition at line 905 of file StreamSchedule.cc.

References edm::WorkerManager::beginStream(), streamContext_, streamID_, and workerManager_.

WorkerManager workerManager_
StreamContext streamContext_
void beginStream(StreamID iID, StreamContext &streamContext)

◆ clearCounters()

void edm::StreamSchedule::clearCounters ( )

Clear all the counters in the trigger report.

Definition at line 1265 of file StreamSchedule.cc.

References allWorkers(), edm::Path::clearCounters(), edm::Worker::clearCounters(), end_paths_, edm::for_all(), total_events_, total_passed_, and trig_paths_.

1265  {
1266  using std::placeholders::_1;
1268  for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
1269  for_all(end_paths_, std::bind(&Path::clearCounters, _1));
1270  for_all(allWorkers(), std::bind(&Worker::clearCounters, _1));
1271  }
void clearCounters()
Definition: Worker.h:232
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
void clearCounters()
Definition: Path.cc:183

◆ context()

StreamContext const& edm::StreamSchedule::context ( ) const
inline

Definition at line 261 of file StreamSchedule.h.

References streamContext_.

261 { return streamContext_; }
StreamContext streamContext_

◆ deleteModule()

void edm::StreamSchedule::deleteModule ( std::string const &  iLabel)

Delete the module with label iLabel.

Definition at line 925 of file StreamSchedule.cc.

References edm::WorkerManager::deleteModuleIfExists(), and workerManager_.

void deleteModuleIfExists(std::string const &moduleLabel)
WorkerManager workerManager_

◆ endStream()

void edm::StreamSchedule::endStream ( )

Definition at line 907 of file StreamSchedule.cc.

References edm::WorkerManager::endStream(), streamContext_, streamID_, and workerManager_.

void endStream(StreamID iID, StreamContext &streamContext)
WorkerManager workerManager_
StreamContext streamContext_

◆ fillEndPath()

void edm::StreamSchedule::fillEndPath ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
int  bitpos,
std::string const &  name,
std::vector< std::string > const &  endPathNames,
ConditionalTaskHelper const &  conditionalTaskHelper 
)
private

Definition at line 876 of file StreamSchedule.cc.

References actionTable(), actReg_, addToAllWorkers(), empty_end_paths_, end_paths_, fillWorkers(), edm::PathContext::kEndPath, Skims_PA_cff::name, and streamContext_.

Referenced by StreamSchedule().

883  {
884  PathWorkers tmpworkers;
885  fillWorkers(
886  proc_pset, preg, prealloc, processConfiguration, name, true, tmpworkers, endPathNames, conditionalTaskHelper);
887 
888  if (!tmpworkers.empty()) {
889  end_paths_.emplace_back(bitpos,
890  name,
891  tmpworkers,
892  TrigResPtr(),
893  actionTable(),
894  actReg_,
897  } else {
898  empty_end_paths_.push_back(bitpos);
899  }
900  for (WorkerInPath const& workerInPath : tmpworkers) {
901  addToAllWorkers(workerInPath.getWorker());
902  }
903  }
ExceptionToActionTable const & actionTable() const
returns the action table
std::shared_ptr< HLTGlobalStatus > TrigResPtr
void addToAllWorkers(Worker *w)
std::shared_ptr< ActivityRegistry > actReg_
std::vector< WorkerInPath > PathWorkers
std::vector< int > empty_end_paths_
StreamContext streamContext_
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)

◆ fillTrigPath()

void edm::StreamSchedule::fillTrigPath ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
int  bitpos,
std::string const &  name,
TrigResPtr  trptr,
std::vector< std::string > const &  endPathNames,
ConditionalTaskHelper const &  conditionalTaskHelper 
)
private

Definition at line 851 of file StreamSchedule.cc.

References actionTable(), actReg_, addToAllWorkers(), empty_trig_paths_, fillWorkers(), edm::PathContext::kPath, Skims_PA_cff::name, streamContext_, and trig_paths_.

Referenced by StreamSchedule().

859  {
860  PathWorkers tmpworkers;
861  fillWorkers(
862  proc_pset, preg, prealloc, processConfiguration, name, false, tmpworkers, endPathNames, conditionalTaskHelper);
863 
864  // an empty path will cause an extra bit that is not used
865  if (!tmpworkers.empty()) {
866  trig_paths_.emplace_back(
867  bitpos, name, tmpworkers, trptr, actionTable(), actReg_, &streamContext_, PathContext::PathType::kPath);
868  } else {
869  empty_trig_paths_.push_back(bitpos);
870  }
871  for (WorkerInPath const& workerInPath : tmpworkers) {
872  addToAllWorkers(workerInPath.getWorker());
873  }
874  }
ExceptionToActionTable const & actionTable() const
returns the action table
std::vector< int > empty_trig_paths_
void addToAllWorkers(Worker *w)
std::shared_ptr< ActivityRegistry > actReg_
std::vector< WorkerInPath > PathWorkers
StreamContext streamContext_
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)

◆ fillWorkers()

void edm::StreamSchedule::fillWorkers ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
std::string const &  name,
bool  ignoreFilters,
PathWorkers out,
std::vector< std::string > const &  endPathNames,
ConditionalTaskHelper const &  conditionalTaskHelper 
)
private

Definition at line 748 of file StreamSchedule.cc.

References edm::ConditionalTaskHelper::aliasMap(), edm::ConditionalTaskHelper::conditionalModuleBranches(), edm::errors::Configuration, edm::Worker::description(), Exception, edm::ParameterSet::getParameter(), edm::ParameterSet::getUntrackedParameter(), edm::WorkerInPath::Ignore, edm::Worker::kFilter, HerwigMaxPtPartonFilter_cfi::moduleLabel, edm::ModuleDescription::moduleName(), edm::Worker::moduleType(), Skims_PA_cff::name, edm::WorkerInPath::Normal, or, MillePedeFileConverter_cfg::out, hltMonBTagIPClient_cfi::pathName, edm::search_all(), AlCaHLTBitMon_QueryRunRegistry::string, tryToPlaceConditionalModules(), edm::WorkerInPath::Veto, and workerManager_.

Referenced by fillEndPath(), and fillTrigPath().

756  {
757  vstring modnames = proc_pset.getParameter<vstring>(pathName);
758  PathWorkers tmpworkers;
759 
760  //Pull out ConditionalTask modules
761  auto condRange = findConditionalTaskModulesRange(modnames);
762 
763  std::unordered_set<std::string> conditionalmods;
764  //An EDAlias may be redirecting to a module on a ConditionalTask
765  std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModsBranches;
766  std::unordered_map<std::string, unsigned int> conditionalModOrder;
767  if (condRange.first != condRange.second) {
768  for (auto it = condRange.first; it != condRange.second; ++it) {
769  // ordering needs to skip the # token in the path list
770  conditionalModOrder.emplace(*it, it - modnames.begin() - 1);
771  }
772  //the last entry should be ignored since it is required to be "@"
773  conditionalmods = std::unordered_set<std::string>(std::make_move_iterator(condRange.first),
774  std::make_move_iterator(condRange.second));
775 
776  conditionalModsBranches = conditionalTaskHelper.conditionalModuleBranches(conditionalmods);
777  modnames.erase(std::prev(condRange.first), modnames.end());
778  }
779 
780  unsigned int placeInPath = 0;
781  for (auto const& name : modnames) {
782  //Modules except EDFilters are set to run concurrently by default
783  bool doNotRunConcurrently = false;
785  if (name[0] == '!') {
786  filterAction = WorkerInPath::Veto;
787  } else if (name[0] == '-' or name[0] == '+') {
788  filterAction = WorkerInPath::Ignore;
789  }
790  if (name[0] == '|' or name[0] == '+') {
791  //cms.wait was specified so do not run concurrently
792  doNotRunConcurrently = true;
793  }
794 
796  if (filterAction != WorkerInPath::Normal or name[0] == '|') {
797  moduleLabel.erase(0, 1);
798  }
799 
800  Worker* worker = getWorker(moduleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration);
801  if (worker == nullptr) {
802  std::string pathType("endpath");
803  if (!search_all(endPathNames, pathName)) {
804  pathType = std::string("path");
805  }
807  << "The unknown module label \"" << moduleLabel << "\" appears in " << pathType << " \"" << pathName
808  << "\"\n please check spelling or remove that label from the path.";
809  }
810 
811  if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType() == Worker::kFilter) {
812  // We have a filter on an end path, and the filter is not explicitly ignored.
813  // See if the filter is allowed.
814  std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
815  if (!search_all(allowed_filters, worker->description()->moduleName())) {
816  // Filter is not allowed. Ignore the result, and issue a warning.
817  filterAction = WorkerInPath::Ignore;
818  LogWarning("FilterOnEndPath") << "The EDFilter '" << worker->description()->moduleName()
819  << "' with module label '" << moduleLabel << "' appears on EndPath '"
820  << pathName << "'.\n"
821  << "The return value of the filter will be ignored.\n"
822  << "To suppress this warning, either remove the filter from the endpath,\n"
823  << "or explicitly ignore it in the configuration by using cms.ignore().\n";
824  }
825  }
826  bool runConcurrently = not doNotRunConcurrently;
827  if (runConcurrently && worker->moduleType() == Worker::kFilter and filterAction != WorkerInPath::Ignore) {
828  runConcurrently = false;
829  }
830 
831  auto condModules = tryToPlaceConditionalModules(worker,
832  conditionalmods,
833  conditionalModsBranches,
834  conditionalTaskHelper.aliasMap(),
835  proc_pset,
836  preg,
837  prealloc,
838  processConfiguration);
839  for (auto condMod : condModules) {
840  tmpworkers.emplace_back(
841  condMod, WorkerInPath::Ignore, conditionalModOrder[condMod->description()->moduleLabel()], true);
842  }
843 
844  tmpworkers.emplace_back(worker, filterAction, placeInPath, runConcurrently);
845  ++placeInPath;
846  }
847 
848  out.swap(tmpworkers);
849  }
vector< string > vstring
Definition: ExoticaDQM.cc:7
std::vector< Worker * > tryToPlaceConditionalModules(Worker *, std::unordered_set< std::string > &conditionalModules, std::unordered_multimap< std::string, edm::BranchDescription const *> const &conditionalModuleBranches, std::unordered_multimap< std::string, AliasInfo > const &aliasMap, ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
WorkerManager workerManager_
std::vector< WorkerInPath > PathWorkers
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:36
Log< level::Warning, false > LogWarning

◆ finishedPaths()

void edm::StreamSchedule::finishedPaths ( std::atomic< std::exception_ptr *> &  iExcept,
WaitingTaskHolder  iWait,
EventTransitionInfo info 
)
private

Definition at line 1044 of file StreamSchedule.cc.

References writedatasetfile::action, actionTable(), cms::Exception::addContext(), cms::cuda::assert(), CMS_SA_ALLOW, cms::Exception::context(), edm::WaitingTaskHolder::doneWaiting(), MillePedeFileConverter_cfg::e, edm::ExceptionToActionTable::find(), edm::propagate_const< T >::get(), edm::exception_actions::IgnoreCompletely, info(), edm::printCmsExceptionWarning(), results_, results_inserter_, streamContext_, streamID_, total_passed_, and edm::exception_actions::TryToContinue.

Referenced by processOneEventAsync().

1046  {
1047  if (iExcept) {
1048  // Caught exception is propagated via WaitingTaskHolder
1049  CMS_SA_ALLOW try { std::rethrow_exception(*(iExcept.load())); } catch (cms::Exception& e) {
1053  edm::printCmsExceptionWarning("TryToContinue", e);
1054  *(iExcept.load()) = std::exception_ptr();
1055  } else {
1056  *(iExcept.load()) = std::current_exception();
1057  }
1058  } catch (...) {
1059  *(iExcept.load()) = std::current_exception();
1060  }
1061  }
1062 
1063  if ((not iExcept) and results_->accept()) {
1064  ++total_passed_;
1065  }
1066 
1067  if (nullptr != results_inserter_.get()) {
1068  // Caught exception is propagated to the caller
1069  CMS_SA_ALLOW try {
1070  //Even if there was an exception, we need to allow results inserter
1071  // to run since some module may be waiting on its results.
1072  ParentContext parentContext(&streamContext_);
1073  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
1074 
1075  auto expt = results_inserter_->runModuleDirectly<Traits>(info, streamID_, parentContext, &streamContext_);
1076  if (expt) {
1077  std::rethrow_exception(expt);
1078  }
1079  } catch (cms::Exception& ex) {
1080  if (not iExcept) {
1081  if (ex.context().empty()) {
1082  std::ostringstream ost;
1083  ost << "Processing Event " << info.principal().id();
1084  ex.addContext(ost.str());
1085  }
1086  iExcept.store(new std::exception_ptr(std::current_exception()));
1087  }
1088  } catch (...) {
1089  if (not iExcept) {
1090  iExcept.store(new std::exception_ptr(std::current_exception()));
1091  }
1092  }
1093  }
1094  std::exception_ptr ptr;
1095  if (iExcept) {
1096  ptr = *iExcept.load();
1097  }
1098  iWait.doneWaiting(ptr);
1099  }
ExceptionToActionTable const & actionTable() const
returns the action table
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
edm::propagate_const< WorkerPtr > results_inserter_
assert(be >=bs)
constexpr element_type const * get() const
StreamContext streamContext_
exception_actions::ActionCodes find(const std::string &category) const
edm::propagate_const< TrigResPtr > results_
void addContext(std::string const &context)
Definition: Exception.cc:169
std::list< std::string > const & context() const
Definition: Exception.cc:151
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)

◆ finishProcessOneEvent()

std::exception_ptr edm::StreamSchedule::finishProcessOneEvent ( std::exception_ptr  iExcept)
private

Definition at line 1101 of file StreamSchedule.cc.

References actReg_, edm::addContextAndPrintException(), CMS_SA_ALLOW, cms::Exception::context(), edm::ExceptionFromThisContext, resetEarlyDelete(), streamContext_, and edm::convertException::wrap().

Referenced by processOneEventAsync().

1101  {
1102  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
1103 
1104  if (iExcept) {
1105  //add context information to the exception and print message
1106  try {
1107  convertException::wrap([&]() { std::rethrow_exception(iExcept); });
1108  } catch (cms::Exception& ex) {
1109  bool const cleaningUpAfterException = false;
1110  if (ex.context().empty()) {
1111  addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
1112  } else {
1113  addContextAndPrintException("", ex, cleaningUpAfterException);
1114  }
1115  iExcept = std::current_exception();
1116  }
1117 
1118  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
1119  }
1120  // Caught exception is propagated to the caller
1121  CMS_SA_ALLOW try { Traits::postScheduleSignal(actReg_.get(), &streamContext_); } catch (...) {
1122  if (not iExcept) {
1123  iExcept = std::current_exception();
1124  }
1125  }
1126  if (not iExcept) {
1127  resetEarlyDelete();
1128  }
1129 
1130  return iExcept;
1131  }
#define CMS_SA_ALLOW
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
std::shared_ptr< ActivityRegistry > actReg_
StreamContext streamContext_
auto wrap(F iFunc) -> decltype(iFunc())
std::list< std::string > const & context() const
Definition: Exception.cc:151

◆ getAllModuleDescriptions()

std::vector< ModuleDescription const * > edm::StreamSchedule::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this StreamSchedule. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 927 of file StreamSchedule.cc.

References allWorkers(), AlCaHLTBitMon_ParallelJobs::p, and mps_fire::result.

927  {
928  std::vector<ModuleDescription const*> result;
929  result.reserve(allWorkers().size());
930 
931  for (auto const& worker : allWorkers()) {
932  ModuleDescription const* p = worker->description();
933  result.push_back(p);
934  }
935  return result;
936  }
size
Write out results.
AllWorkers const & allWorkers() const
returns the collection of pointers to workers

◆ getTriggerReport()

void edm::StreamSchedule::getTriggerReport ( TriggerReport rep) const

Return the trigger report information on paths, modules-in-path, modules-in-endpath, and modules.

Definition at line 1255 of file StreamSchedule.cc.

References allWorkers(), end_paths_, edm::fillPathSummary(), edm::fillWorkerSummary(), cuy::rep, totalEvents(), totalEventsFailed(), totalEventsPassed(), and trig_paths_.

1255  {
1256  rep.eventSummary.totalEvents += totalEvents();
1257  rep.eventSummary.totalEventsPassed += totalEventsPassed();
1258  rep.eventSummary.totalEventsFailed += totalEventsFailed();
1259 
1260  fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
1261  fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
1262  fill_summary(allWorkers(), rep.workerSummaries, &fillWorkerSummary);
1263  }
int totalEventsPassed() const
int totalEventsFailed() const
static void fillPathSummary(Path const &path, PathSummary &sum)
rep
Definition: cuy.py:1189
int totalEvents() const
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
AllWorkers const & allWorkers() const
returns the collection of pointers to workers

◆ initializeEarlyDelete()

void edm::StreamSchedule::initializeEarlyDelete ( ModuleRegistry modReg,
std::vector< std::string > const &  branchesToDeleteEarly,
std::multimap< std::string, std::string > const &  referencesToBranches,
std::vector< std::string > const &  modulesToSkip,
edm::ProductRegistry const &  preg 
)

Definition at line 459 of file StreamSchedule.cc.

References allWorkers(), cms::cuda::assert(), MicroEventContent_cff::branch, edm::maker::ModuleHolder::createOutputModuleCommunicator(), dumpMFGeometry_cfg::delta, submitPVResolutionJobs::desc, earlyDeleteBranchToCount_, earlyDeleteHelpers_, earlyDeleteHelperToBranchIndicies_, end_paths_, edm::ModuleRegistry::forAllModuleHolders(), dqm-mbProfile::format, newFWLiteAna::found, edm::InEvent, B2GTnPMonitor_cfi::item, gpuClustering::pixelStatus::kEmpty, MainPageGenerator::l, dqmiodumpmetadata::n, AlCaHLTBitMon_ParallelJobs::p, resetEarlyDelete(), AlCaHLTBitMon_QueryRunRegistry::string, groupFilesInBlocks::temp, trig_paths_, mitigatedMETSequence_cff::U, and w().

463  {
464  // setup the list with those products actually registered for this job
465  std::multimap<std::string, Worker*> branchToReadingWorker;
466  initializeBranchToReadingWorker(branchesToDeleteEarly, preg, branchToReadingWorker);
467 
468  const std::vector<std::string> kEmpty;
469  std::map<Worker*, unsigned int> reserveSizeForWorker;
470  unsigned int upperLimitOnReadingWorker = 0;
471  unsigned int upperLimitOnIndicies = 0;
472  unsigned int nUniqueBranchesToDelete = branchToReadingWorker.size();
473 
474  //talk with output modules first
475  modReg.forAllModuleHolders([&branchToReadingWorker, &nUniqueBranchesToDelete](maker::ModuleHolder* iHolder) {
476  auto comm = iHolder->createOutputModuleCommunicator();
477  if (comm) {
478  if (!branchToReadingWorker.empty()) {
479  //If an OutputModule needs a product, we can't delete it early
480  // so we should remove it from our list
481  SelectedProductsForBranchType const& kept = comm->keptProducts();
482  for (auto const& item : kept[InEvent]) {
483  BranchDescription const& desc = *item.first;
484  auto found = branchToReadingWorker.equal_range(desc.branchName());
485  if (found.first != found.second) {
486  --nUniqueBranchesToDelete;
487  branchToReadingWorker.erase(found.first, found.second);
488  }
489  }
490  }
491  }
492  });
493 
494  if (branchToReadingWorker.empty()) {
495  return;
496  }
497 
498  std::unordered_set<std::string> modulesToExclude(modulesToSkip.begin(), modulesToSkip.end());
499  for (auto w : allWorkers()) {
500  if (modulesToExclude.end() != modulesToExclude.find(w->description()->moduleLabel())) {
501  continue;
502  }
503  //determine if this module could read a branch we want to delete early
504  auto consumes = w->consumesInfo();
505  if (not consumes.empty()) {
506  bool foundAtLeastOneMatchingBranch = false;
507  for (auto const& product : consumes) {
508  std::string branch = fmt::format("{}_{}_{}_{}",
509  product.type().friendlyClassName(),
510  product.label().data(),
511  product.instance().data(),
512  product.process().data());
513  {
514  //Handle case where worker directly consumes product
515  auto found = branchToReadingWorker.end();
516  if (product.process().empty()) {
517  auto startFound = branchToReadingWorker.lower_bound(branch);
518  if (startFound != branchToReadingWorker.end()) {
519  if (startFound->first.substr(0, branch.size()) == branch) {
520  //match all processNames here, even if it means multiple matches will happen
521  found = startFound;
522  }
523  }
524  } else {
525  auto exactFound = branchToReadingWorker.equal_range(branch);
526  if (exactFound.first != exactFound.second) {
527  found = exactFound.first;
528  }
529  }
530  if (found != branchToReadingWorker.end()) {
531  if (not foundAtLeastOneMatchingBranch) {
532  ++upperLimitOnReadingWorker;
533  foundAtLeastOneMatchingBranch = true;
534  }
535  ++upperLimitOnIndicies;
536  ++reserveSizeForWorker[w];
537  if (nullptr == found->second) {
538  found->second = w;
539  } else {
540  branchToReadingWorker.insert(make_pair(found->first, w));
541  }
542  }
543  }
544  {
545  //Handle case where indirectly consumes product
546  auto found = referencesToBranches.end();
547  if (product.process().empty()) {
548  auto startFound = referencesToBranches.lower_bound(branch);
549  if (startFound != referencesToBranches.end()) {
550  if (startFound->first.substr(0, branch.size()) == branch) {
551  //match all processNames here, even if it means multiple matches will happen
552  found = startFound;
553  }
554  }
555  } else {
556  //can match exactly
557  auto exactFound = referencesToBranches.equal_range(branch);
558  if (exactFound.first != exactFound.second) {
559  found = exactFound.first;
560  }
561  }
562  if (found != referencesToBranches.end()) {
563  for (auto itr = found; (itr != referencesToBranches.end()) and (itr->first == found->first); ++itr) {
564  auto foundInBranchToReadingWorker = branchToReadingWorker.find(itr->second);
565  if (foundInBranchToReadingWorker == branchToReadingWorker.end()) {
566  continue;
567  }
568  if (not foundAtLeastOneMatchingBranch) {
569  ++upperLimitOnReadingWorker;
570  foundAtLeastOneMatchingBranch = true;
571  }
572  ++upperLimitOnIndicies;
573  ++reserveSizeForWorker[w];
574  if (nullptr == foundInBranchToReadingWorker->second) {
575  foundInBranchToReadingWorker->second = w;
576  } else {
577  branchToReadingWorker.insert(make_pair(itr->second, w));
578  }
579  }
580  }
581  }
582  }
583  }
584  }
585  {
586  auto it = branchToReadingWorker.begin();
587  std::vector<std::string> unusedBranches;
588  while (it != branchToReadingWorker.end()) {
589  if (it->second == nullptr) {
590  unusedBranches.push_back(it->first);
591  //erasing the object invalidates the iterator so must advance it first
592  auto temp = it;
593  ++it;
594  branchToReadingWorker.erase(temp);
595  } else {
596  ++it;
597  }
598  }
599  if (not unusedBranches.empty()) {
600  LogWarning l("UnusedProductsForCanDeleteEarly");
601  l << "The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
602  " If possible, remove the producer from the job.";
603  for (auto const& n : unusedBranches) {
604  l << "\n " << n;
605  }
606  }
607  }
608  if (!branchToReadingWorker.empty()) {
609  earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
610  earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies, 0);
611  earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
612  std::map<const Worker*, EarlyDeleteHelper*> alreadySeenWorkers;
613  std::string lastBranchName;
614  size_t nextOpenIndex = 0;
615  unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
616  for (auto& branchAndWorker : branchToReadingWorker) {
617  if (lastBranchName != branchAndWorker.first) {
618  //have to put back the period we removed earlier in order to get the proper name
619  BranchID bid(branchAndWorker.first + ".");
620  earlyDeleteBranchToCount_.emplace_back(bid, 0U);
621  lastBranchName = branchAndWorker.first;
622  }
623  auto found = alreadySeenWorkers.find(branchAndWorker.second);
624  if (alreadySeenWorkers.end() == found) {
625  //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
626  // all the branches that might be read by this worker. However, initially we will only tell the
627  // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
628  // EarlyDeleteHelper will automatically advance its internal end pointer.
629  size_t index = nextOpenIndex;
630  size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
633  earlyDeleteHelpers_.emplace_back(beginAddress + index, beginAddress + index + 1, &earlyDeleteBranchToCount_);
634  branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
635  alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second, &(earlyDeleteHelpers_.back())));
636  nextOpenIndex += nIndices;
637  } else {
638  found->second->appendIndex(earlyDeleteBranchToCount_.size() - 1);
639  }
640  }
641 
642  //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
643  // space needed for each module
644  auto itLast = earlyDeleteHelpers_.begin();
645  for (auto it = earlyDeleteHelpers_.begin() + 1; it != earlyDeleteHelpers_.end(); ++it) {
646  if (itLast->end() != it->begin()) {
647  //figure the offset for next Worker since it hasn't been moved yet so it has the original address
648  unsigned int delta = it->begin() - itLast->end();
649  it->shiftIndexPointers(delta);
650 
652  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
653  earlyDeleteHelperToBranchIndicies_.begin() + (it->begin() - beginAddress));
654  }
655  itLast = it;
656  }
658  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
660 
661  //now tell the paths about the deleters
662  for (auto& p : trig_paths_) {
663  p.setEarlyDeleteHelpers(alreadySeenWorkers);
664  }
665  for (auto& p : end_paths_) {
666  p.setEarlyDeleteHelpers(alreadySeenWorkers);
667  }
669  }
670  }
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
T w() const
std::vector< BranchToCount > earlyDeleteBranchToCount_
assert(be >=bs)
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
Log< level::Warning, false > LogWarning
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_

◆ makePathStatusInserters()

void edm::StreamSchedule::makePathStatusInserters ( std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters,
std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &  endPathStatusInserters,
ExceptionToActionTable const &  actions 
)
private

Definition at line 1291 of file StreamSchedule.cc.

References actions, actReg_, addToAllWorkers(), empty_end_paths_, empty_trig_paths_, end_paths_, endPathStatusInserterWorkers_, edm::get_underlying(), pathStatusInserterWorkers_, and trig_paths_.

Referenced by StreamSchedule().

1294  {
1295  int bitpos = 0;
1296  unsigned int indexEmpty = 0;
1297  unsigned int indexOfPath = 0;
1298  for (auto& pathStatusInserter : pathStatusInserters) {
1299  std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
1300  WorkerPtr workerPtr(
1301  new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1302  pathStatusInserterWorkers_.emplace_back(workerPtr);
1303  workerPtr->setActivityRegistry(actReg_);
1304  addToAllWorkers(workerPtr.get());
1305 
1306  // A little complexity here because a C++ Path object is not
1307  // instantiated and put into end_paths if there are no modules
1308  // on the configured path.
1309  if (indexEmpty < empty_trig_paths_.size() && bitpos == empty_trig_paths_.at(indexEmpty)) {
1310  ++indexEmpty;
1311  } else {
1312  trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(), workerPtr.get());
1313  ++indexOfPath;
1314  }
1315  ++bitpos;
1316  }
1317 
1318  bitpos = 0;
1319  indexEmpty = 0;
1320  indexOfPath = 0;
1321  for (auto& endPathStatusInserter : endPathStatusInserters) {
1322  std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
1323  WorkerPtr workerPtr(
1324  new edm::WorkerT<EndPathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1325  endPathStatusInserterWorkers_.emplace_back(workerPtr);
1326  workerPtr->setActivityRegistry(actReg_);
1327  addToAllWorkers(workerPtr.get());
1328 
1329  // A little complexity here because a C++ Path object is not
1330  // instantiated and put into end_paths if there are no modules
1331  // on the configured path.
1332  if (indexEmpty < empty_end_paths_.size() && bitpos == empty_end_paths_.at(indexEmpty)) {
1333  ++indexEmpty;
1334  } else {
1335  end_paths_.at(indexOfPath).setPathStatusInserter(nullptr, workerPtr.get());
1336  ++indexOfPath;
1337  }
1338  ++bitpos;
1339  }
1340  }
std::vector< int > empty_trig_paths_
roAction_t actions[nactions]
Definition: GenABIO.cc:181
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void addToAllWorkers(Worker *w)
std::shared_ptr< Worker > WorkerPtr
std::shared_ptr< ActivityRegistry > actReg_
std::vector< int > empty_end_paths_
constexpr T & get_underlying(propagate_const< T > &)
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_

◆ moduleDescriptionsInEndPath()

void edm::StreamSchedule::moduleDescriptionsInEndPath ( std::string const &  iEndPathLabel,
std::vector< ModuleDescription const *> &  descriptions,
unsigned int  hint 
) const

Definition at line 1183 of file StreamSchedule.cc.

References end_paths_, newFWLiteAna::found, mps_fire::i, and edm::Path::name().

1185  {
1186  descriptions.clear();
1187  bool found = false;
1188  TrigPaths::const_iterator itFound;
1189 
1190  if (hint < end_paths_.size()) {
1191  itFound = end_paths_.begin() + hint;
1192  if (itFound->name() == iEndPathLabel)
1193  found = true;
1194  }
1195  if (!found) {
1196  // if the hint did not work, do it the slow way
1197  itFound = std::find_if(
1198  end_paths_.begin(),
1199  end_paths_.end(),
1200  std::bind(std::equal_to<std::string>(), iEndPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1201  if (itFound != end_paths_.end())
1202  found = true;
1203  }
1204  if (found) {
1205  descriptions.reserve(itFound->size());
1206  for (size_t i = 0; i < itFound->size(); ++i) {
1207  descriptions.push_back(itFound->getWorker(i)->description());
1208  }
1209  }
1210  }
std::string const & name() const
Definition: Path.h:73

◆ moduleDescriptionsInPath()

void edm::StreamSchedule::moduleDescriptionsInPath ( std::string const &  iPathLabel,
std::vector< ModuleDescription const *> &  descriptions,
unsigned int  hint 
) const

Definition at line 1154 of file StreamSchedule.cc.

References newFWLiteAna::found, mps_fire::i, edm::Path::name(), and trig_paths_.

1156  {
1157  descriptions.clear();
1158  bool found = false;
1159  TrigPaths::const_iterator itFound;
1160 
1161  if (hint < trig_paths_.size()) {
1162  itFound = trig_paths_.begin() + hint;
1163  if (itFound->name() == iPathLabel)
1164  found = true;
1165  }
1166  if (!found) {
1167  // if the hint did not work, do it the slow way
1168  itFound = std::find_if(
1169  trig_paths_.begin(),
1170  trig_paths_.end(),
1171  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1172  if (itFound != trig_paths_.end())
1173  found = true;
1174  }
1175  if (found) {
1176  descriptions.reserve(itFound->size());
1177  for (size_t i = 0; i < itFound->size(); ++i) {
1178  descriptions.push_back(itFound->getWorker(i)->description());
1179  }
1180  }
1181  }
std::string const & name() const
Definition: Path.h:73

◆ modulesInPath()

void edm::StreamSchedule::modulesInPath ( std::string const &  iPathLabel,
std::vector< std::string > &  oLabelsToFill 
) const

adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel

Definition at line 1141 of file StreamSchedule.cc.

References mps_fire::i, edm::Path::name(), and trig_paths_.

1141  {
1142  TrigPaths::const_iterator itFound = std::find_if(
1143  trig_paths_.begin(),
1144  trig_paths_.end(),
1145  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1146  if (itFound != trig_paths_.end()) {
1147  oLabelsToFill.reserve(itFound->size());
1148  for (size_t i = 0; i < itFound->size(); ++i) {
1149  oLabelsToFill.push_back(itFound->getWorker(i)->description()->moduleLabel());
1150  }
1151  }
1152  }
std::string const & name() const
Definition: Path.h:73

◆ numberOfUnscheduledModules()

unsigned int edm::StreamSchedule::numberOfUnscheduledModules ( ) const
inline

Definition at line 259 of file StreamSchedule.h.

References number_of_unscheduled_modules_.

unsigned int number_of_unscheduled_modules_

◆ processOneEventAsync()

void edm::StreamSchedule::processOneEventAsync ( WaitingTaskHolder  iTask,
EventTransitionInfo info,
ServiceToken const &  token,
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters 
)

Definition at line 938 of file StreamSchedule.cc.

References actReg_, CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), empty_end_paths_, empty_trig_paths_, end_paths_, endPathStatusInserterWorkers_, SiStripBadComponentsDQMServiceTemplate_cfg::ep, finishedPaths(), finishProcessOneEvent(), edm::WaitingTaskHolder::group(), info(), edm::ServiceWeakToken::lock(), edm::make_waiting_task(), eostools::move(), edm::hlt::Pass, pathStatusInserterWorkers_, edm::WorkerManager::processAccumulatorsAsync(), resetAll(), results_, edm::WorkerManager::setupOnDemandSystem(), edm::WorkerManager::setupResolvers(), streamContext_, streamID_, total_events_, trig_paths_, and workerManager_.

942  {
943  EventPrincipal& ep = info.principal();
944 
945  // Caught exception is propagated via WaitingTaskHolder
946  CMS_SA_ALLOW try {
947  this->resetAll();
948 
949  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
950 
951  Traits::setStreamContext(streamContext_, ep);
952  //a service may want to communicate with another service
953  ServiceRegistry::Operate guard(serviceToken);
954  Traits::preScheduleSignal(actReg_.get(), &streamContext_);
955 
956  // Data dependencies need to be set up before marking empty
957  // (End)Paths complete in case something consumes the status of
958  // the empty (EndPath)
961 
962  HLTPathStatus hltPathStatus(hlt::Pass, 0);
963  for (int empty_trig_path : empty_trig_paths_) {
964  results_->at(empty_trig_path) = hltPathStatus;
965  pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
966  std::exception_ptr except = pathStatusInserterWorkers_[empty_trig_path]
967  ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
968  info, streamID_, ParentContext(&streamContext_), &streamContext_);
969  if (except) {
970  iTask.doneWaiting(except);
971  return;
972  }
973  }
974  for (int empty_end_path : empty_end_paths_) {
975  std::exception_ptr except = endPathStatusInserterWorkers_[empty_end_path]
976  ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
977  info, streamID_, ParentContext(&streamContext_), &streamContext_);
978  if (except) {
979  iTask.doneWaiting(except);
980  return;
981  }
982  }
983 
984  ++total_events_;
985 
986  //use to give priorities on an error to ones from Paths
987  auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(nullptr);
988  auto pathErrorPtr = pathErrorHolder.get();
989  ServiceWeakToken weakToken = serviceToken;
990  auto allPathsDone = make_waiting_task(
991  [iTask, this, weakToken, pathError = std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable {
992  ServiceRegistry::Operate operate(weakToken.lock());
993 
994  std::exception_ptr ptr;
995  if (pathError->load()) {
996  ptr = *pathError->load();
997  delete pathError->load();
998  }
999  if ((not ptr) and iPtr) {
1000  ptr = *iPtr;
1001  }
1002  iTask.doneWaiting(finishProcessOneEvent(ptr));
1003  });
1004  //The holder guarantees that if the paths finish before the loop ends
1005  // that we do not start too soon. It also guarantees that the task will
1006  // run under that condition.
1007  WaitingTaskHolder allPathsHolder(*iTask.group(), allPathsDone);
1008 
1009  auto pathsDone = make_waiting_task([allPathsHolder, pathErrorPtr, transitionInfo = info, this, weakToken](
1010  std::exception_ptr const* iPtr) mutable {
1011  ServiceRegistry::Operate operate(weakToken.lock());
1012 
1013  if (iPtr) {
1014  //this is used to prioritize this error over one
1015  // that happens in EndPath or Accumulate
1016  pathErrorPtr->store(new std::exception_ptr(*iPtr));
1017  }
1018  finishedPaths(*pathErrorPtr, std::move(allPathsHolder), transitionInfo);
1019  });
1020 
1021  //The holder guarantees that if the paths finish before the loop ends
1022  // that we do not start too soon. It also guarantees that the task will
1023  // run under that condition.
1024  WaitingTaskHolder taskHolder(*iTask.group(), pathsDone);
1025 
1026  //start end paths first so on single threaded the paths will run first
1027  WaitingTaskHolder hAllPathsDone(*iTask.group(), allPathsDone);
1028  for (auto it = end_paths_.rbegin(), itEnd = end_paths_.rend(); it != itEnd; ++it) {
1029  it->processOneOccurrenceAsync(hAllPathsDone, info, serviceToken, streamID_, &streamContext_);
1030  }
1031 
1032  for (auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend(); it != itEnd; ++it) {
1033  it->processOneOccurrenceAsync(taskHolder, info, serviceToken, streamID_, &streamContext_);
1034  }
1035 
1036  ParentContext parentContext(&streamContext_);
1037  workerManager_.processAccumulatorsAsync<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
1038  hAllPathsDone, info, serviceToken, streamID_, parentContext, &streamContext_);
1039  } catch (...) {
1040  iTask.doneWaiting(std::current_exception());
1041  }
1042  }
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
std::vector< int > empty_trig_paths_
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void processAccumulatorsAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
void setupOnDemandSystem(EventTransitionInfo const &)
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
std::vector< int > empty_end_paths_
accept
Definition: HLTenums.h:18
StreamContext streamContext_
void finishedPaths(std::atomic< std::exception_ptr *> &, WaitingTaskHolder, EventTransitionInfo &)
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
edm::propagate_const< TrigResPtr > results_
void setupResolvers(Principal &principal)
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
def move(src, dest)
Definition: eostools.py:511

◆ processOneStreamAsync()

template<typename T >
void edm::StreamSchedule::processOneStreamAsync ( WaitingTaskHolder  iTask,
typename T::TransitionInfoType &  transitionInfo,
ServiceToken const &  token,
bool  cleaningUpAfterException = false 
)

Definition at line 393 of file StreamSchedule.h.

References actReg_, edm::addContextAndPrintException(), CMS_SA_ALLOW, cms::Exception::context(), edm::WaitingTaskHolder::doneWaiting(), end_paths_, edm::ExceptionFromThisContext, edm::WaitingTaskHolder::group(), h, l1ctLayer2EG_cff::id, info(), edm::ServiceWeakToken::lock(), edm::make_functor_task(), edm::make_waiting_task(), findAndChange::op, AlCaHLTBitMon_ParallelJobs::p, edm::WorkerManager::processOneOccurrenceAsync(), edm::WorkerManager::resetAll(), alignCSCRings::s, streamContext_, streamID_, TrackValidation_cff::task, unpackBuffers-CaloStage2::token, edm::transitionName(), trig_paths_, edm::StreamID::value(), workerManager_, and edm::convertException::wrap().

396  {
397  auto const& principal = transitionInfo.principal();
398  T::setStreamContext(streamContext_, principal);
399 
400  auto id = principal.id();
401  ServiceWeakToken weakToken = token;
402  auto doneTask = make_waiting_task(
403  [this, iHolder, id, cleaningUpAfterException, weakToken](std::exception_ptr const* iPtr) mutable {
404  std::exception_ptr excpt;
405  if (iPtr) {
406  excpt = *iPtr;
407  //add context information to the exception and print message
408  try {
409  convertException::wrap([&]() { std::rethrow_exception(excpt); });
410  } catch (cms::Exception& ex) {
411  //TODO: should add the transition type info
412  std::ostringstream ost;
413  if (ex.context().empty()) {
414  ost << "Processing " << T::transitionName() << " " << id;
415  }
416  ServiceRegistry::Operate op(weakToken.lock());
417  addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
418  excpt = std::current_exception();
419  }
420 
421  ServiceRegistry::Operate op(weakToken.lock());
422  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
423  }
424  // Caught exception is propagated via WaitingTaskHolder
425  CMS_SA_ALLOW try {
426  ServiceRegistry::Operate op(weakToken.lock());
427  T::postScheduleSignal(actReg_.get(), &streamContext_);
428  } catch (...) {
429  if (not excpt) {
430  excpt = std::current_exception();
431  }
432  }
433  iHolder.doneWaiting(excpt);
434  });
435 
436  auto task = make_functor_task(
437  [this, h = WaitingTaskHolder(*iHolder.group(), doneTask), info = transitionInfo, weakToken]() mutable {
438  auto token = weakToken.lock();
440  // Caught exception is propagated via WaitingTaskHolder
441  CMS_SA_ALLOW try {
442  T::preScheduleSignal(actReg_.get(), &streamContext_);
443 
445  } catch (...) {
446  h.doneWaiting(std::current_exception());
447  return;
448  }
449 
450  for (auto& p : end_paths_) {
451  p.runAllModulesAsync<T>(h, info, token, streamID_, &streamContext_);
452  }
453 
454  for (auto& p : trig_paths_) {
455  p.runAllModulesAsync<T>(h, info, token, streamID_, &streamContext_);
456  }
457 
459  });
460 
461  if (streamID_.value() == 0) {
462  //Enqueueing will start another thread if there is only
463  // one thread in the job. Having stream == 0 use spawn
464  // avoids starting up another thread when there is only one stream.
465  iHolder.group()->run([task]() {
466  TaskSentry s{task};
467  task->execute();
468  });
469  } else {
470  oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
471  arena.enqueue([task]() {
472  TaskSentry s{task};
473  task->execute();
474  });
475  }
476  }
std::string_view transitionName(GlobalContext::Transition)
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
void processOneOccurrenceAsync(WaitingTaskHolder, typename T::TransitionInfoType &, ServiceToken const &, StreamID, typename T::Context const *topContext, U const *context)
StreamContext streamContext_
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
FunctorTask< F > * make_functor_task(F f)
Definition: FunctorTask.h:44
unsigned int value() const
Definition: StreamID.h:43
auto wrap(F iFunc) -> decltype(iFunc())
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
std::list< std::string > const & context() const
Definition: Exception.cc:151
long double T

◆ replaceModule()

void edm::StreamSchedule::replaceModule ( maker::ModuleHolder iMod,
std::string const &  iLabel 
)

clone the type of module with label iLabel but configure with iPSet.

Definition at line 909 of file StreamSchedule.cc.

References allWorkers(), newFWLiteAna::found, edm::maker::ModuleHolder::replaceModuleFor(), streamContext_, and streamID_.

909  {
910  Worker* found = nullptr;
911  for (auto const& worker : allWorkers()) {
912  if (worker->description()->moduleLabel() == iLabel) {
913  found = worker;
914  break;
915  }
916  }
917  if (nullptr == found) {
918  return;
919  }
920 
921  iMod->replaceModuleFor(found);
922  found->beginStream(streamID_, streamContext_);
923  }
StreamContext streamContext_
AllWorkers const & allWorkers() const
returns the collection of pointers to workers

◆ reportSkipped()

void edm::StreamSchedule::reportSkipped ( EventPrincipal const &  ep) const
inlineprivate

Definition at line 387 of file StreamSchedule.h.

References SiStripBadComponentsDQMServiceTemplate_cfg::ep.

387  {
388  Service<JobReport> reportSvc;
389  reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
390  }

◆ resetAll()

void edm::StreamSchedule::resetAll ( )
private

Definition at line 1273 of file StreamSchedule.cc.

References results_.

Referenced by processOneEventAsync().

1273 { results_->reset(); }
edm::propagate_const< TrigResPtr > results_

◆ resetEarlyDelete()

void edm::StreamSchedule::resetEarlyDelete ( )
private

Definition at line 1277 of file StreamSchedule.cc.

References submitPVResolutionJobs::count, earlyDeleteBranchToCount_, earlyDeleteHelpers_, and earlyDeleteHelperToBranchIndicies_.

Referenced by finishProcessOneEvent(), and initializeEarlyDelete().

1277  {
1278  //must be sure we have cleared the count first
1279  for (auto& count : earlyDeleteBranchToCount_) {
1280  count.count = 0;
1281  }
1282  //now reset based on how many helpers use that branch
1284  ++(earlyDeleteBranchToCount_[index].count);
1285  }
1286  for (auto& helper : earlyDeleteHelpers_) {
1287  helper.reset();
1288  }
1289  }
Definition: helper.py:1
std::vector< BranchToCount > earlyDeleteBranchToCount_
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_

◆ results() [1/2]

TrigResConstPtr edm::StreamSchedule::results ( ) const
inlineprivate

Definition at line 342 of file StreamSchedule.h.

References edm::get_underlying_safe(), and results_.

Referenced by StreamSchedule().

342 { return get_underlying_safe(results_); }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< TrigResPtr > results_

◆ results() [2/2]

TrigResPtr& edm::StreamSchedule::results ( )
inlineprivate

Definition at line 343 of file StreamSchedule.h.

References edm::get_underlying_safe(), and results_.

343 { return get_underlying_safe(results_); }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< TrigResPtr > results_

◆ streamID()

StreamID edm::StreamSchedule::streamID ( ) const
inline

Definition at line 199 of file StreamSchedule.h.

References streamID_.

Referenced by StreamSchedule().

199 { return streamID_; }

◆ totalEvents()

int edm::StreamSchedule::totalEvents ( ) const
inline

Return the number of events this StreamSchedule has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 226 of file StreamSchedule.h.

References total_events_.

Referenced by getTriggerReport(), and totalEventsFailed().

226 { return total_events_; }

◆ totalEventsFailed()

int edm::StreamSchedule::totalEventsFailed ( ) const
inline

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 234 of file StreamSchedule.h.

References totalEvents(), and totalEventsPassed().

Referenced by getTriggerReport().

234 { return totalEvents() - totalEventsPassed(); }
int totalEventsPassed() const
int totalEvents() const

◆ totalEventsPassed()

int edm::StreamSchedule::totalEventsPassed ( ) const
inline

Return the number of events which have been passed by one or more trigger paths.

Definition at line 230 of file StreamSchedule.h.

References total_passed_.

Referenced by getTriggerReport(), and totalEventsFailed().

230 { return total_passed_; }

◆ tryToPlaceConditionalModules()

std::vector< Worker * > edm::StreamSchedule::tryToPlaceConditionalModules ( Worker worker,
std::unordered_set< std::string > &  conditionalModules,
std::unordered_multimap< std::string, edm::BranchDescription const *> const &  conditionalModuleBranches,
std::unordered_multimap< std::string, AliasInfo > const &  aliasMap,
ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration 
)
private

Definition at line 672 of file StreamSchedule.cc.

References cms::cuda::assert(), edm::TypeID::className(), edm::Worker::consumesInfo(), edm::Worker::description(), HerwigMaxPtPartonFilter_cfi::moduleLabel, edm::ModuleDescription::moduleLabel(), or, edm::PRODUCT_TYPE, AlCaHLTBitMon_QueryRunRegistry::string, edm::productholderindexhelper::typeIsViewCompatible(), and workerManager_.

Referenced by fillWorkers().

680  {
681  std::vector<Worker*> returnValue;
682  auto const& consumesInfo = worker->consumesInfo();
683  auto moduleLabel = worker->description()->moduleLabel();
684  using namespace productholderindexhelper;
685  for (auto const& ci : consumesInfo) {
686  if (not ci.skipCurrentProcess() and
687  (ci.process().empty() or ci.process() == processConfiguration->processName())) {
688  auto productModuleLabel = std::string(ci.label());
689  bool productFromConditionalModule = false;
690  auto itFound = conditionalModules.find(productModuleLabel);
691  if (itFound == conditionalModules.end()) {
692  //Check to see if this was an alias
693  //note that aliasMap was previously filtered so only the conditional modules remain there
694  auto foundAlias = findBestMatchingAlias(conditionalModuleBranches, aliasMap, productModuleLabel, ci);
695  if (foundAlias) {
696  productModuleLabel = *foundAlias;
697  productFromConditionalModule = true;
698  itFound = conditionalModules.find(productModuleLabel);
699  //check that the alias-for conditional module has not been used
700  if (itFound == conditionalModules.end()) {
701  continue;
702  }
703  }
704  } else {
705  //need to check the rest of the data product info
706  auto findBranches = conditionalModuleBranches.equal_range(productModuleLabel);
707  for (auto itBranch = findBranches.first; itBranch != findBranches.second; ++itBranch) {
708  if (itBranch->second->productInstanceName() == ci.instance()) {
709  if (ci.kindOfType() == PRODUCT_TYPE) {
710  if (ci.type() == itBranch->second->unwrappedTypeID()) {
711  productFromConditionalModule = true;
712  break;
713  }
714  } else {
715  //this is a view
717  ci.type(), TypeID(itBranch->second->wrappedType().typeInfo()), itBranch->second->className())) {
718  productFromConditionalModule = true;
719  break;
720  }
721  }
722  }
723  }
724  }
725  if (productFromConditionalModule) {
726  auto condWorker =
727  getWorker(productModuleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration);
728  assert(condWorker);
729 
730  conditionalModules.erase(itFound);
731 
732  auto dependents = tryToPlaceConditionalModules(condWorker,
733  conditionalModules,
734  conditionalModuleBranches,
735  aliasMap,
736  proc_pset,
737  preg,
738  prealloc,
739  processConfiguration);
740  returnValue.insert(returnValue.end(), dependents.begin(), dependents.end());
741  returnValue.push_back(condWorker);
742  }
743  }
744  }
745  return returnValue;
746  }
std::vector< Worker * > tryToPlaceConditionalModules(Worker *, std::unordered_set< std::string > &conditionalModules, std::unordered_multimap< std::string, edm::BranchDescription const *> const &conditionalModuleBranches, std::unordered_multimap< std::string, AliasInfo > const &aliasMap, ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
assert(be >=bs)
WorkerManager workerManager_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
bool typeIsViewCompatible(TypeID const &requestedViewType, TypeID const &wrappedtypeID, std::string const &className)

◆ unscheduledWorkers()

AllWorkers const& edm::StreamSchedule::unscheduledWorkers ( ) const
inline

Definition at line 258 of file StreamSchedule.h.

References edm::WorkerManager::unscheduledWorkers(), and workerManager_.

AllWorkers const & unscheduledWorkers() const
Definition: WorkerManager.h:89
WorkerManager workerManager_

Member Data Documentation

◆ actReg_

std::shared_ptr<ActivityRegistry> edm::StreamSchedule::actReg_
private

◆ earlyDeleteBranchToCount_

std::vector<BranchToCount> edm::StreamSchedule::earlyDeleteBranchToCount_
private

Definition at line 367 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

◆ earlyDeleteHelpers_

std::vector<EarlyDeleteHelper> edm::StreamSchedule::earlyDeleteHelpers_
private

Definition at line 377 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

◆ earlyDeleteHelperToBranchIndicies_

std::vector<unsigned int> edm::StreamSchedule::earlyDeleteHelperToBranchIndicies_
private

Definition at line 374 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

◆ empty_end_paths_

std::vector<int> edm::StreamSchedule::empty_end_paths_
private

Definition at line 362 of file StreamSchedule.h.

Referenced by fillEndPath(), makePathStatusInserters(), and processOneEventAsync().

◆ empty_trig_paths_

std::vector<int> edm::StreamSchedule::empty_trig_paths_
private

Definition at line 361 of file StreamSchedule.h.

Referenced by fillTrigPath(), makePathStatusInserters(), and processOneEventAsync().

◆ end_paths_

TrigPaths edm::StreamSchedule::end_paths_
private

◆ endPathStatusInserterWorkers_

std::vector<edm::propagate_const<WorkerPtr> > edm::StreamSchedule::endPathStatusInserterWorkers_
private

Definition at line 357 of file StreamSchedule.h.

Referenced by makePathStatusInserters(), and processOneEventAsync().

◆ number_of_unscheduled_modules_

unsigned int edm::StreamSchedule::number_of_unscheduled_modules_
private

Definition at line 381 of file StreamSchedule.h.

Referenced by numberOfUnscheduledModules(), and StreamSchedule().

◆ pathStatusInserterWorkers_

std::vector<edm::propagate_const<WorkerPtr> > edm::StreamSchedule::pathStatusInserterWorkers_
private

Definition at line 356 of file StreamSchedule.h.

Referenced by makePathStatusInserters(), and processOneEventAsync().

◆ results_

edm::propagate_const<TrigResPtr> edm::StreamSchedule::results_
private

Definition at line 353 of file StreamSchedule.h.

Referenced by finishedPaths(), processOneEventAsync(), resetAll(), and results().

◆ results_inserter_

edm::propagate_const<WorkerPtr> edm::StreamSchedule::results_inserter_
private

Definition at line 355 of file StreamSchedule.h.

Referenced by finishedPaths(), and StreamSchedule().

◆ streamContext_

StreamContext edm::StreamSchedule::streamContext_
private

◆ streamID_

StreamID edm::StreamSchedule::streamID_
private

◆ total_events_

int edm::StreamSchedule::total_events_
private

Definition at line 379 of file StreamSchedule.h.

Referenced by clearCounters(), processOneEventAsync(), and totalEvents().

◆ total_passed_

int edm::StreamSchedule::total_passed_
private

Definition at line 380 of file StreamSchedule.h.

Referenced by clearCounters(), finishedPaths(), and totalEventsPassed().

◆ trig_paths_

TrigPaths edm::StreamSchedule::trig_paths_
private

◆ workerManager_

WorkerManager edm::StreamSchedule::workerManager_
private