CMS 3D CMS Logo

List of all members | Classes | Public Types | Public Member Functions | Private Member Functions | Private Attributes
edm::StreamSchedule Class Reference

#include <StreamSchedule.h>

Classes

struct  AliasInfo
 
class  SendTerminationSignalIfException
 

Public Types

typedef std::vector< Worker * > AllWorkers
 
typedef std::vector< PathNonTrigPaths
 
typedef std::vector< WorkerInPathPathWorkers
 
typedef std::vector< PathTrigPaths
 
typedef std::shared_ptr< HLTGlobalStatus const > TrigResConstPtr
 
typedef std::shared_ptr< HLTGlobalStatusTrigResPtr
 
typedef std::vector< std::string > vstring
 
typedef std::shared_ptr< WorkerWorkerPtr
 
typedef std::vector< Worker * > Workers
 

Public Member Functions

AllWorkers const & allWorkers () const
 returns the collection of pointers to workers More...
 
void availablePaths (std::vector< std::string > &oLabelsToFill) const
 adds to oLabelsToFill the labels for all paths in the process More...
 
void beginStream ()
 
void clearCounters ()
 Clear all the counters in the trigger report. More...
 
StreamContext const & context () const
 
void deleteModule (std::string const &iLabel)
 Delete the module with label iLabel. More...
 
void endStream ()
 
std::vector< ModuleDescription const * > getAllModuleDescriptions () const
 
void getTriggerReport (TriggerReport &rep) const
 
void initializeEarlyDelete (ModuleRegistry &modReg, std::vector< std::string > const &branchesToDeleteEarly, std::multimap< std::string, std::string > const &referencesToBranches, std::vector< std::string > const &modulesToSkip, edm::ProductRegistry const &preg)
 
void moduleDescriptionsInEndPath (std::string const &iEndPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
 
void moduleDescriptionsInPath (std::string const &iPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
 
void modulesInPath (std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
 adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel More...
 
unsigned int numberOfUnscheduledModules () const
 
void processOneEventAsync (WaitingTaskHolder iTask, EventTransitionInfo &, ServiceToken const &token, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
 
template<typename T >
void processOneStreamAsync (WaitingTaskHolder iTask, typename T::TransitionInfoType &transitionInfo, ServiceToken const &token, bool cleaningUpAfterException=false)
 
void replaceModule (maker::ModuleHolder *iMod, std::string const &iLabel)
 clone the type of module with label iLabel but configure with iPSet. More...
 
StreamID streamID () const
 
 StreamSchedule (std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, StreamID streamID, ProcessContext const *processContext)
 
 StreamSchedule (StreamSchedule const &)=delete
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 
AllWorkers const & unscheduledWorkers () const
 

Private Member Functions

ExceptionToActionTable const & actionTable () const
 returns the action table More...
 
void addToAllWorkers (Worker *w)
 
void fillEndPath (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)
 
void fillTrigPath (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)
 
void fillWorkers (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)
 
void finishedPaths (std::atomic< std::exception_ptr *> &, WaitingTaskHolder, EventTransitionInfo &)
 
std::exception_ptr finishProcessOneEvent (std::exception_ptr)
 
void makePathStatusInserters (std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
 
void reportSkipped (EventPrincipal const &ep) const
 
void resetAll ()
 
void resetEarlyDelete ()
 
TrigResConstPtr results () const
 
TrigResPtrresults ()
 
std::vector< Worker * > tryToPlaceConditionalModules (Worker *, std::unordered_set< std::string > &conditionalModules, std::unordered_multimap< std::string, edm::BranchDescription const *> const &conditionalModuleBranches, std::unordered_multimap< std::string, AliasInfo > const &aliasMap, ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
 

Private Attributes

std::shared_ptr< ActivityRegistryactReg_
 
std::vector< BranchToCountearlyDeleteBranchToCount_
 
std::vector< EarlyDeleteHelperearlyDeleteHelpers_
 
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
 
std::vector< int > empty_end_paths_
 
std::vector< int > empty_trig_paths_
 
TrigPaths end_paths_
 
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
 
unsigned int number_of_unscheduled_modules_
 
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
 
edm::propagate_const< TrigResPtrresults_
 
edm::propagate_const< WorkerPtrresults_inserter_
 
std::atomic< bool > skippingEvent_
 
StreamContext streamContext_
 
StreamID streamID_
 
int total_events_
 
int total_passed_
 
TrigPaths trig_paths_
 
WorkerManager workerManager_
 

Detailed Description

Definition at line 154 of file StreamSchedule.h.

Member Typedef Documentation

◆ AllWorkers

typedef std::vector<Worker*> edm::StreamSchedule::AllWorkers

Definition at line 162 of file StreamSchedule.h.

◆ NonTrigPaths

Definition at line 158 of file StreamSchedule.h.

◆ PathWorkers

Definition at line 166 of file StreamSchedule.h.

◆ TrigPaths

typedef std::vector<Path> edm::StreamSchedule::TrigPaths

Definition at line 157 of file StreamSchedule.h.

◆ TrigResConstPtr

typedef std::shared_ptr<HLTGlobalStatus const> edm::StreamSchedule::TrigResConstPtr

Definition at line 160 of file StreamSchedule.h.

◆ TrigResPtr

Definition at line 159 of file StreamSchedule.h.

◆ vstring

typedef std::vector<std::string> edm::StreamSchedule::vstring

Definition at line 156 of file StreamSchedule.h.

◆ WorkerPtr

typedef std::shared_ptr<Worker> edm::StreamSchedule::WorkerPtr

Definition at line 161 of file StreamSchedule.h.

◆ Workers

typedef std::vector<Worker*> edm::StreamSchedule::Workers

Definition at line 164 of file StreamSchedule.h.

Constructor & Destructor Documentation

◆ StreamSchedule() [1/2]

edm::StreamSchedule::StreamSchedule ( std::shared_ptr< TriggerResultInserter inserter,
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters,
std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &  endPathStatusInserters,
std::shared_ptr< ModuleRegistry modReg,
ParameterSet proc_pset,
service::TriggerNamesService const &  tns,
PreallocationConfiguration const &  prealloc,
ProductRegistry pregistry,
ExceptionToActionTable const &  actions,
std::shared_ptr< ActivityRegistry areg,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
StreamID  streamID,
ProcessContext const *  processContext 
)

Definition at line 349 of file StreamSchedule.cc.

References actions, actReg_, addToAllWorkers(), edm::WorkerManager::addToUnscheduledWorkers(), allWorkers(), cms::cuda::assert(), end_paths_, fillEndPath(), fillTrigPath(), edm::propagate_const< T >::get(), edm::service::TriggerNamesService::getEndPaths(), edm::ParameterSet::getParameter(), edm::ParameterSet::getPSetForUpdate(), edm::service::TriggerNamesService::getTrigPaths(), label, makePathStatusInserters(), number_of_unscheduled_modules_, results(), results_inserter_, DBoxMetadataHelper::set_difference(), streamID(), trig_paths_, edm::StreamID::value(), and workerManager_.

363  : workerManager_(modReg, areg, actions),
364  actReg_(areg),
365  results_(new HLTGlobalStatus(tns.getTrigPaths().size())),
367  trig_paths_(),
368  end_paths_(),
369  total_events_(),
370  total_passed_(),
373  streamContext_(streamID_, processContext),
374  skippingEvent_(false) {
375  bool hasPath = false;
376  std::vector<std::string> const& pathNames = tns.getTrigPaths();
377  std::vector<std::string> const& endPathNames = tns.getEndPaths();
378 
379  ConditionalTaskHelper conditionalTaskHelper(
380  proc_pset, preg, &prealloc, processConfiguration, workerManager_, pathNames);
381 
382  int trig_bitpos = 0;
383  trig_paths_.reserve(pathNames.size());
384  for (auto const& trig_name : pathNames) {
385  fillTrigPath(proc_pset,
386  preg,
387  &prealloc,
388  processConfiguration,
389  trig_bitpos,
390  trig_name,
391  results(),
392  endPathNames,
393  conditionalTaskHelper);
394  ++trig_bitpos;
395  hasPath = true;
396  }
397 
398  if (hasPath) {
399  // the results inserter stands alone
400  inserter->setTrigResultForStream(streamID.value(), results());
401 
402  results_inserter_ = makeInserter(actions, actReg_, inserter);
404  }
405 
406  // fill normal endpaths
407  int bitpos = 0;
408  end_paths_.reserve(endPathNames.size());
409  for (auto const& end_path_name : endPathNames) {
410  fillEndPath(
411  proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name, endPathNames, conditionalTaskHelper);
412  ++bitpos;
413  }
414 
415  makePathStatusInserters(pathStatusInserters, endPathStatusInserters, actions);
416 
417  //See if all modules were used
418  std::set<std::string> usedWorkerLabels;
419  for (auto const& worker : allWorkers()) {
420  usedWorkerLabels.insert(worker->description()->moduleLabel());
421  }
422  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
423  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
424  std::vector<std::string> unusedLabels;
425  set_difference(modulesInConfigSet.begin(),
426  modulesInConfigSet.end(),
427  usedWorkerLabels.begin(),
428  usedWorkerLabels.end(),
429  back_inserter(unusedLabels));
430  std::set<std::string> unscheduledLabels;
431  std::vector<std::string> shouldBeUsedLabels;
432  if (!unusedLabels.empty()) {
433  //Need to
434  // 1) create worker
435  // 2) if it is a WorkerT<EDProducer>, add it to our list
436  // 3) hand list to our delayed reader
437  for (auto const& label : unusedLabels) {
438  bool isTracked;
439  ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
440  assert(isTracked);
441  assert(modulePSet != nullptr);
443  *modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
444  }
445  if (!shouldBeUsedLabels.empty()) {
446  std::ostringstream unusedStream;
447  unusedStream << "'" << shouldBeUsedLabels.front() << "'";
448  for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
449  itLabelEnd = shouldBeUsedLabels.end();
450  itLabel != itLabelEnd;
451  ++itLabel) {
452  unusedStream << ",'" << *itLabel << "'";
453  }
454  LogInfo("path") << "The following module labels are not assigned to any path:\n" << unusedStream.str() << "\n";
455  }
456  }
457  number_of_unscheduled_modules_ = unscheduledLabels.size();
458  } // StreamSchedule::StreamSchedule
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)
roAction_t actions[nactions]
Definition: GenABIO.cc:181
void addToAllWorkers(Worker *w)
edm::propagate_const< WorkerPtr > results_inserter_
assert(be >=bs)
StreamID streamID() const
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
constexpr element_type const * get() const
char const * label
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)
TrigResConstPtr results() const
StreamContext streamContext_
Log< level::Info, false > LogInfo
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
edm::propagate_const< TrigResPtr > results_
std::atomic< bool > skippingEvent_
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
unsigned int value() const
Definition: StreamID.h:43
std::vector< std::string > set_difference(std::vector< std::string > const &v1, std::vector< std::string > const &v2)

◆ StreamSchedule() [2/2]

edm::StreamSchedule::StreamSchedule ( StreamSchedule const &  )
delete

Member Function Documentation

◆ actionTable()

ExceptionToActionTable const& edm::StreamSchedule::actionTable ( ) const
inlineprivate

returns the action table

Definition at line 292 of file StreamSchedule.h.

References edm::WorkerManager::actionTable(), and workerManager_.

Referenced by fillEndPath(), fillTrigPath(), and finishedPaths().

292 { return workerManager_.actionTable(); }
WorkerManager workerManager_
ExceptionToActionTable const & actionTable() const
Definition: WorkerManager.h:93

◆ addToAllWorkers()

void edm::StreamSchedule::addToAllWorkers ( Worker w)
private

◆ allWorkers()

AllWorkers const& edm::StreamSchedule::allWorkers ( ) const
inline

returns the collection of pointers to workers

Definition at line 256 of file StreamSchedule.h.

References edm::WorkerManager::allWorkers(), and workerManager_.

Referenced by clearCounters(), getAllModuleDescriptions(), getTriggerReport(), initializeEarlyDelete(), replaceModule(), and StreamSchedule().

256 { return workerManager_.allWorkers(); }
WorkerManager workerManager_
AllWorkers const & allWorkers() const
Definition: WorkerManager.h:88

◆ availablePaths()

void edm::StreamSchedule::availablePaths ( std::vector< std::string > &  oLabelsToFill) const

adds to oLabelsToFill the labels for all paths in the process

Definition at line 1182 of file StreamSchedule.cc.

References edm::Path::name(), HcalDetIdTransform::transform(), and trig_paths_.

1182  {
1183  oLabelsToFill.reserve(trig_paths_.size());
1184  std::transform(trig_paths_.begin(),
1185  trig_paths_.end(),
1186  std::back_inserter(oLabelsToFill),
1187  std::bind(&Path::name, std::placeholders::_1));
1188  }
std::string const & name() const
Definition: Path.h:74
unsigned transform(const HcalDetId &id, unsigned transformCode)

◆ beginStream()

void edm::StreamSchedule::beginStream ( )

Definition at line 953 of file StreamSchedule.cc.

References edm::WorkerManager::beginStream(), streamContext_, streamID_, and workerManager_.

WorkerManager workerManager_
StreamContext streamContext_
void beginStream(StreamID iID, StreamContext &streamContext)

◆ clearCounters()

void edm::StreamSchedule::clearCounters ( )

Clear all the counters in the trigger report.

Definition at line 1314 of file StreamSchedule.cc.

References allWorkers(), edm::Path::clearCounters(), edm::Worker::clearCounters(), end_paths_, edm::for_all(), total_events_, total_passed_, and trig_paths_.

1314  {
1315  using std::placeholders::_1;
1317  for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
1318  for_all(end_paths_, std::bind(&Path::clearCounters, _1));
1319  for_all(allWorkers(), std::bind(&Worker::clearCounters, _1));
1320  }
void clearCounters()
Definition: Worker.h:232
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
void clearCounters()
Definition: Path.cc:198

◆ context()

StreamContext const& edm::StreamSchedule::context ( ) const
inline

Definition at line 261 of file StreamSchedule.h.

References streamContext_.

261 { return streamContext_; }
StreamContext streamContext_

◆ deleteModule()

void edm::StreamSchedule::deleteModule ( std::string const &  iLabel)

Delete the module with label iLabel.

Definition at line 973 of file StreamSchedule.cc.

References edm::WorkerManager::deleteModuleIfExists(), and workerManager_.

void deleteModuleIfExists(std::string const &moduleLabel)
WorkerManager workerManager_

◆ endStream()

void edm::StreamSchedule::endStream ( )

Definition at line 955 of file StreamSchedule.cc.

References edm::WorkerManager::endStream(), streamContext_, streamID_, and workerManager_.

void endStream(StreamID iID, StreamContext &streamContext)
WorkerManager workerManager_
StreamContext streamContext_

◆ fillEndPath()

void edm::StreamSchedule::fillEndPath ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
int  bitpos,
std::string const &  name,
std::vector< std::string > const &  endPathNames,
ConditionalTaskHelper const &  conditionalTaskHelper 
)
private

Definition at line 922 of file StreamSchedule.cc.

References actionTable(), actReg_, addToAllWorkers(), empty_end_paths_, end_paths_, fillWorkers(), edm::PathContext::kEndPath, Skims_PA_cff::name, and streamContext_.

Referenced by StreamSchedule().

929  {
930  PathWorkers tmpworkers;
931  fillWorkers(
932  proc_pset, preg, prealloc, processConfiguration, name, true, tmpworkers, endPathNames, conditionalTaskHelper);
933 
934  if (!tmpworkers.empty()) {
935  //EndPaths are not supposed to stop if SkipEvent type exception happens
936  end_paths_.emplace_back(bitpos,
937  name,
938  tmpworkers,
939  TrigResPtr(),
940  actionTable(),
941  actReg_,
943  nullptr,
945  } else {
946  empty_end_paths_.push_back(bitpos);
947  }
948  for (WorkerInPath const& workerInPath : tmpworkers) {
949  addToAllWorkers(workerInPath.getWorker());
950  }
951  }
ExceptionToActionTable const & actionTable() const
returns the action table
std::shared_ptr< HLTGlobalStatus > TrigResPtr
void addToAllWorkers(Worker *w)
std::shared_ptr< ActivityRegistry > actReg_
std::vector< WorkerInPath > PathWorkers
std::vector< int > empty_end_paths_
StreamContext streamContext_
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)

◆ fillTrigPath()

void edm::StreamSchedule::fillTrigPath ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
int  bitpos,
std::string const &  name,
TrigResPtr  trptr,
std::vector< std::string > const &  endPathNames,
ConditionalTaskHelper const &  conditionalTaskHelper 
)
private

Definition at line 890 of file StreamSchedule.cc.

References actionTable(), actReg_, addToAllWorkers(), empty_trig_paths_, fillWorkers(), edm::PathContext::kPath, Skims_PA_cff::name, skippingEvent_, streamContext_, and trig_paths_.

Referenced by StreamSchedule().

898  {
899  PathWorkers tmpworkers;
900  fillWorkers(
901  proc_pset, preg, prealloc, processConfiguration, name, false, tmpworkers, endPathNames, conditionalTaskHelper);
902 
903  // an empty path will cause an extra bit that is not used
904  if (!tmpworkers.empty()) {
905  trig_paths_.emplace_back(bitpos,
906  name,
907  tmpworkers,
908  trptr,
909  actionTable(),
910  actReg_,
914  } else {
915  empty_trig_paths_.push_back(bitpos);
916  }
917  for (WorkerInPath const& workerInPath : tmpworkers) {
918  addToAllWorkers(workerInPath.getWorker());
919  }
920  }
ExceptionToActionTable const & actionTable() const
returns the action table
std::vector< int > empty_trig_paths_
void addToAllWorkers(Worker *w)
std::shared_ptr< ActivityRegistry > actReg_
std::vector< WorkerInPath > PathWorkers
StreamContext streamContext_
std::atomic< bool > skippingEvent_
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)

◆ fillWorkers()

void edm::StreamSchedule::fillWorkers ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
std::string const &  name,
bool  ignoreFilters,
PathWorkers out,
std::vector< std::string > const &  endPathNames,
ConditionalTaskHelper const &  conditionalTaskHelper 
)
private

Definition at line 787 of file StreamSchedule.cc.

References edm::ConditionalTaskHelper::aliasMap(), edm::ConditionalTaskHelper::conditionalModuleBranches(), edm::errors::Configuration, edm::Worker::description(), Exception, edm::ParameterSet::getParameter(), edm::ParameterSet::getUntrackedParameter(), edm::WorkerInPath::Ignore, edm::Worker::kFilter, HerwigMaxPtPartonFilter_cfi::moduleLabel, edm::ModuleDescription::moduleName(), edm::Worker::moduleType(), Skims_PA_cff::name, edm::WorkerInPath::Normal, or, MillePedeFileConverter_cfg::out, hltMonBTagIPClient_cfi::pathName, edm::search_all(), AlCaHLTBitMon_QueryRunRegistry::string, tryToPlaceConditionalModules(), edm::WorkerInPath::Veto, and workerManager_.

Referenced by fillEndPath(), and fillTrigPath().

795  {
796  vstring modnames = proc_pset.getParameter<vstring>(pathName);
797  PathWorkers tmpworkers;
798 
799  //Pull out ConditionalTask modules
800  auto condRange = findConditionalTaskModulesRange(modnames);
801 
802  std::unordered_set<std::string> conditionalmods;
803  //An EDAlias may be redirecting to a module on a ConditionalTask
804  std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModsBranches;
805  std::unordered_map<std::string, unsigned int> conditionalModOrder;
806  if (condRange.first != condRange.second) {
807  for (auto it = condRange.first; it != condRange.second; ++it) {
808  // ordering needs to skip the # token in the path list
809  conditionalModOrder.emplace(*it, it - modnames.begin() - 1);
810  }
811  //the last entry should be ignored since it is required to be "@"
812  conditionalmods = std::unordered_set<std::string>(std::make_move_iterator(condRange.first),
813  std::make_move_iterator(condRange.second));
814 
815  conditionalModsBranches = conditionalTaskHelper.conditionalModuleBranches(conditionalmods);
816  modnames.erase(std::prev(condRange.first), modnames.end());
817  }
818 
819  unsigned int placeInPath = 0;
820  for (auto const& name : modnames) {
821  //Modules except EDFilters are set to run concurrently by default
822  bool doNotRunConcurrently = false;
824  if (name[0] == '!') {
825  filterAction = WorkerInPath::Veto;
826  } else if (name[0] == '-' or name[0] == '+') {
827  filterAction = WorkerInPath::Ignore;
828  }
829  if (name[0] == '|' or name[0] == '+') {
830  //cms.wait was specified so do not run concurrently
831  doNotRunConcurrently = true;
832  }
833 
835  if (filterAction != WorkerInPath::Normal or name[0] == '|') {
836  moduleLabel.erase(0, 1);
837  }
838 
839  Worker* worker = getWorker(moduleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration);
840  if (worker == nullptr) {
841  std::string pathType("endpath");
842  if (!search_all(endPathNames, pathName)) {
843  pathType = std::string("path");
844  }
846  << "The unknown module label \"" << moduleLabel << "\" appears in " << pathType << " \"" << pathName
847  << "\"\n please check spelling or remove that label from the path.";
848  }
849 
850  if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType() == Worker::kFilter) {
851  // We have a filter on an end path, and the filter is not explicitly ignored.
852  // See if the filter is allowed.
853  std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
854  if (!search_all(allowed_filters, worker->description()->moduleName())) {
855  // Filter is not allowed. Ignore the result, and issue a warning.
856  filterAction = WorkerInPath::Ignore;
857  LogWarning("FilterOnEndPath") << "The EDFilter '" << worker->description()->moduleName()
858  << "' with module label '" << moduleLabel << "' appears on EndPath '"
859  << pathName << "'.\n"
860  << "The return value of the filter will be ignored.\n"
861  << "To suppress this warning, either remove the filter from the endpath,\n"
862  << "or explicitly ignore it in the configuration by using cms.ignore().\n";
863  }
864  }
865  bool runConcurrently = not doNotRunConcurrently;
866  if (runConcurrently && worker->moduleType() == Worker::kFilter and filterAction != WorkerInPath::Ignore) {
867  runConcurrently = false;
868  }
869 
870  auto condModules = tryToPlaceConditionalModules(worker,
871  conditionalmods,
872  conditionalModsBranches,
873  conditionalTaskHelper.aliasMap(),
874  proc_pset,
875  preg,
876  prealloc,
877  processConfiguration);
878  for (auto condMod : condModules) {
879  tmpworkers.emplace_back(
880  condMod, WorkerInPath::Ignore, conditionalModOrder[condMod->description()->moduleLabel()], true);
881  }
882 
883  tmpworkers.emplace_back(worker, filterAction, placeInPath, runConcurrently);
884  ++placeInPath;
885  }
886 
887  out.swap(tmpworkers);
888  }
vector< string > vstring
Definition: ExoticaDQM.cc:7
std::vector< Worker * > tryToPlaceConditionalModules(Worker *, std::unordered_set< std::string > &conditionalModules, std::unordered_multimap< std::string, edm::BranchDescription const *> const &conditionalModuleBranches, std::unordered_multimap< std::string, AliasInfo > const &aliasMap, ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
WorkerManager workerManager_
std::vector< WorkerInPath > PathWorkers
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:36
Log< level::Warning, false > LogWarning

◆ finishedPaths()

void edm::StreamSchedule::finishedPaths ( std::atomic< std::exception_ptr *> &  iExcept,
WaitingTaskHolder  iWait,
EventTransitionInfo info 
)
private

Definition at line 1092 of file StreamSchedule.cc.

References writedatasetfile::action, actionTable(), cms::Exception::addContext(), cms::cuda::assert(), CMS_SA_ALLOW, cms::Exception::context(), edm::WaitingTaskHolder::doneWaiting(), MillePedeFileConverter_cfg::e, edm::exception_actions::FailPath, edm::ExceptionToActionTable::find(), edm::propagate_const< T >::get(), edm::exception_actions::IgnoreCompletely, info(), edm::printCmsExceptionWarning(), results_, results_inserter_, edm::exception_actions::SkipEvent, streamContext_, streamID_, and total_passed_.

Referenced by processOneEventAsync().

1094  {
1095  if (iExcept) {
1096  // Caught exception is propagated via WaitingTaskHolder
1097  CMS_SA_ALLOW try { std::rethrow_exception(*(iExcept.load())); } catch (cms::Exception& e) {
1102  edm::printCmsExceptionWarning("SkipEvent", e);
1103  *(iExcept.load()) = std::exception_ptr();
1104  } else {
1105  *(iExcept.load()) = std::current_exception();
1106  }
1107  } catch (...) {
1108  *(iExcept.load()) = std::current_exception();
1109  }
1110  }
1111 
1112  if ((not iExcept) and results_->accept()) {
1113  ++total_passed_;
1114  }
1115 
1116  if (nullptr != results_inserter_.get()) {
1117  // Caught exception is propagated to the caller
1118  CMS_SA_ALLOW try {
1119  //Even if there was an exception, we need to allow results inserter
1120  // to run since some module may be waiting on its results.
1121  ParentContext parentContext(&streamContext_);
1122  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
1123 
1124  auto expt = results_inserter_->runModuleDirectly<Traits>(info, streamID_, parentContext, &streamContext_);
1125  if (expt) {
1126  std::rethrow_exception(expt);
1127  }
1128  } catch (cms::Exception& ex) {
1129  if (not iExcept) {
1130  if (ex.context().empty()) {
1131  std::ostringstream ost;
1132  ost << "Processing Event " << info.principal().id();
1133  ex.addContext(ost.str());
1134  }
1135  iExcept.store(new std::exception_ptr(std::current_exception()));
1136  }
1137  } catch (...) {
1138  if (not iExcept) {
1139  iExcept.store(new std::exception_ptr(std::current_exception()));
1140  }
1141  }
1142  }
1143  std::exception_ptr ptr;
1144  if (iExcept) {
1145  ptr = *iExcept.load();
1146  }
1147  iWait.doneWaiting(ptr);
1148  }
ExceptionToActionTable const & actionTable() const
returns the action table
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
edm::propagate_const< WorkerPtr > results_inserter_
assert(be >=bs)
constexpr element_type const * get() const
StreamContext streamContext_
exception_actions::ActionCodes find(const std::string &category) const
edm::propagate_const< TrigResPtr > results_
void addContext(std::string const &context)
Definition: Exception.cc:165
std::list< std::string > const & context() const
Definition: Exception.cc:147
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)

◆ finishProcessOneEvent()

std::exception_ptr edm::StreamSchedule::finishProcessOneEvent ( std::exception_ptr  iExcept)
private

Definition at line 1150 of file StreamSchedule.cc.

References actReg_, edm::addContextAndPrintException(), CMS_SA_ALLOW, cms::Exception::context(), edm::ExceptionFromThisContext, resetEarlyDelete(), streamContext_, and edm::convertException::wrap().

Referenced by processOneEventAsync().

1150  {
1151  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
1152 
1153  if (iExcept) {
1154  //add context information to the exception and print message
1155  try {
1156  convertException::wrap([&]() { std::rethrow_exception(iExcept); });
1157  } catch (cms::Exception& ex) {
1158  bool const cleaningUpAfterException = false;
1159  if (ex.context().empty()) {
1160  addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
1161  } else {
1162  addContextAndPrintException("", ex, cleaningUpAfterException);
1163  }
1164  iExcept = std::current_exception();
1165  }
1166 
1167  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
1168  }
1169  // Caught exception is propagated to the caller
1170  CMS_SA_ALLOW try { Traits::postScheduleSignal(actReg_.get(), &streamContext_); } catch (...) {
1171  if (not iExcept) {
1172  iExcept = std::current_exception();
1173  }
1174  }
1175  if (not iExcept) {
1176  resetEarlyDelete();
1177  }
1178 
1179  return iExcept;
1180  }
#define CMS_SA_ALLOW
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
std::shared_ptr< ActivityRegistry > actReg_
StreamContext streamContext_
auto wrap(F iFunc) -> decltype(iFunc())
std::list< std::string > const & context() const
Definition: Exception.cc:147

◆ getAllModuleDescriptions()

std::vector< ModuleDescription const * > edm::StreamSchedule::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this StreamSchedule. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 975 of file StreamSchedule.cc.

References allWorkers(), AlCaHLTBitMon_ParallelJobs::p, mps_fire::result, and findQualityFiles::size.

975  {
976  std::vector<ModuleDescription const*> result;
977  result.reserve(allWorkers().size());
978 
979  for (auto const& worker : allWorkers()) {
980  ModuleDescription const* p = worker->description();
981  result.push_back(p);
982  }
983  return result;
984  }
size
Write out results.
AllWorkers const & allWorkers() const
returns the collection of pointers to workers

◆ getTriggerReport()

void edm::StreamSchedule::getTriggerReport ( TriggerReport rep) const

Return the trigger report information on paths, modules-in-path, modules-in-endpath, and modules.

Definition at line 1304 of file StreamSchedule.cc.

References allWorkers(), end_paths_, edm::fillPathSummary(), edm::fillWorkerSummary(), cuy::rep, totalEvents(), totalEventsFailed(), totalEventsPassed(), and trig_paths_.

1304  {
1305  rep.eventSummary.totalEvents += totalEvents();
1306  rep.eventSummary.totalEventsPassed += totalEventsPassed();
1307  rep.eventSummary.totalEventsFailed += totalEventsFailed();
1308 
1309  fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
1310  fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
1311  fill_summary(allWorkers(), rep.workerSummaries, &fillWorkerSummary);
1312  }
int totalEventsPassed() const
int totalEventsFailed() const
static void fillPathSummary(Path const &path, PathSummary &sum)
rep
Definition: cuy.py:1189
int totalEvents() const
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
AllWorkers const & allWorkers() const
returns the collection of pointers to workers

◆ initializeEarlyDelete()

void edm::StreamSchedule::initializeEarlyDelete ( ModuleRegistry modReg,
std::vector< std::string > const &  branchesToDeleteEarly,
std::multimap< std::string, std::string > const &  referencesToBranches,
std::vector< std::string > const &  modulesToSkip,
edm::ProductRegistry const &  preg 
)

Definition at line 460 of file StreamSchedule.cc.

References allWorkers(), cms::cuda::assert(), MicroEventContent_cff::branch, edm::maker::ModuleHolder::createOutputModuleCommunicator(), dumpMFGeometry_cfg::delta, submitPVResolutionJobs::desc, earlyDeleteBranchToCount_, earlyDeleteHelpers_, earlyDeleteHelperToBranchIndicies_, end_paths_, edm::ModuleRegistry::forAllModuleHolders(), dqm-mbProfile::format, newFWLiteAna::found, edm::InEvent, B2GTnPMonitor_cfi::item, gpuClustering::pixelStatus::kEmpty, MainPageGenerator::l, dqmiodumpmetadata::n, AlCaHLTBitMon_ParallelJobs::p, resetEarlyDelete(), AlCaHLTBitMon_QueryRunRegistry::string, groupFilesInBlocks::temp, trig_paths_, mitigatedMETSequence_cff::U, and w().

464  {
465  // setup the list with those products actually registered for this job
466  std::multimap<std::string, Worker*> branchToReadingWorker;
467  initializeBranchToReadingWorker(branchesToDeleteEarly, preg, branchToReadingWorker);
468 
469  const std::vector<std::string> kEmpty;
470  std::map<Worker*, unsigned int> reserveSizeForWorker;
471  unsigned int upperLimitOnReadingWorker = 0;
472  unsigned int upperLimitOnIndicies = 0;
473  unsigned int nUniqueBranchesToDelete = branchToReadingWorker.size();
474 
475  //talk with output modules first
476  modReg.forAllModuleHolders([&branchToReadingWorker, &nUniqueBranchesToDelete](maker::ModuleHolder* iHolder) {
477  auto comm = iHolder->createOutputModuleCommunicator();
478  if (comm) {
479  if (!branchToReadingWorker.empty()) {
480  //If an OutputModule needs a product, we can't delete it early
481  // so we should remove it from our list
482  SelectedProductsForBranchType const& kept = comm->keptProducts();
483  for (auto const& item : kept[InEvent]) {
484  BranchDescription const& desc = *item.first;
485  auto found = branchToReadingWorker.equal_range(desc.branchName());
486  if (found.first != found.second) {
487  --nUniqueBranchesToDelete;
488  branchToReadingWorker.erase(found.first, found.second);
489  }
490  }
491  }
492  }
493  });
494 
495  if (branchToReadingWorker.empty()) {
496  return;
497  }
498 
499  std::unordered_set<std::string> modulesToExclude(modulesToSkip.begin(), modulesToSkip.end());
500  for (auto w : allWorkers()) {
501  if (modulesToExclude.end() != modulesToExclude.find(w->description()->moduleLabel())) {
502  continue;
503  }
504  //determine if this module could read a branch we want to delete early
505  auto consumes = w->consumesInfo();
506  if (not consumes.empty()) {
507  bool foundAtLeastOneMatchingBranch = false;
508  for (auto const& product : consumes) {
509  std::string branch = fmt::format("{}_{}_{}_{}",
510  product.type().friendlyClassName(),
511  product.label().data(),
512  product.instance().data(),
513  product.process().data());
514  {
515  //Handle case where worker directly consumes product
516  auto found = branchToReadingWorker.end();
517  if (product.process().empty()) {
518  auto startFound = branchToReadingWorker.lower_bound(branch);
519  if (startFound != branchToReadingWorker.end()) {
520  if (startFound->first.substr(0, branch.size()) == branch) {
521  //match all processNames here, even if it means multiple matches will happen
522  found = startFound;
523  }
524  }
525  } else {
526  auto exactFound = branchToReadingWorker.equal_range(branch);
527  if (exactFound.first != exactFound.second) {
528  found = exactFound.first;
529  }
530  }
531  if (found != branchToReadingWorker.end()) {
532  if (not foundAtLeastOneMatchingBranch) {
533  ++upperLimitOnReadingWorker;
534  foundAtLeastOneMatchingBranch = true;
535  }
536  ++upperLimitOnIndicies;
537  ++reserveSizeForWorker[w];
538  if (nullptr == found->second) {
539  found->second = w;
540  } else {
541  branchToReadingWorker.insert(make_pair(found->first, w));
542  }
543  }
544  }
545  {
546  //Handle case where indirectly consumes product
547  auto found = referencesToBranches.end();
548  if (product.process().empty()) {
549  auto startFound = referencesToBranches.lower_bound(branch);
550  if (startFound != referencesToBranches.end()) {
551  if (startFound->first.substr(0, branch.size()) == branch) {
552  //match all processNames here, even if it means multiple matches will happen
553  found = startFound;
554  }
555  }
556  } else {
557  //can match exactly
558  auto exactFound = referencesToBranches.equal_range(branch);
559  if (exactFound.first != exactFound.second) {
560  found = exactFound.first;
561  }
562  }
563  if (found != referencesToBranches.end()) {
564  for (auto itr = found; (itr != referencesToBranches.end()) and (itr->first == found->first); ++itr) {
565  auto foundInBranchToReadingWorker = branchToReadingWorker.find(itr->second);
566  if (foundInBranchToReadingWorker == branchToReadingWorker.end()) {
567  continue;
568  }
569  if (not foundAtLeastOneMatchingBranch) {
570  ++upperLimitOnReadingWorker;
571  foundAtLeastOneMatchingBranch = true;
572  }
573  ++upperLimitOnIndicies;
574  ++reserveSizeForWorker[w];
575  if (nullptr == foundInBranchToReadingWorker->second) {
576  foundInBranchToReadingWorker->second = w;
577  } else {
578  branchToReadingWorker.insert(make_pair(itr->second, w));
579  }
580  }
581  }
582  }
583  }
584  }
585  }
586  {
587  auto it = branchToReadingWorker.begin();
588  std::vector<std::string> unusedBranches;
589  while (it != branchToReadingWorker.end()) {
590  if (it->second == nullptr) {
591  unusedBranches.push_back(it->first);
592  //erasing the object invalidates the iterator so must advance it first
593  auto temp = it;
594  ++it;
595  branchToReadingWorker.erase(temp);
596  } else {
597  ++it;
598  }
599  }
600  if (not unusedBranches.empty()) {
601  LogWarning l("UnusedProductsForCanDeleteEarly");
602  l << "The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
603  " If possible, remove the producer from the job.";
604  for (auto const& n : unusedBranches) {
605  l << "\n " << n;
606  }
607  }
608  }
609  if (!branchToReadingWorker.empty()) {
610  earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
611  earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies, 0);
612  earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
613  std::map<const Worker*, EarlyDeleteHelper*> alreadySeenWorkers;
614  std::string lastBranchName;
615  size_t nextOpenIndex = 0;
616  unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
617  for (auto& branchAndWorker : branchToReadingWorker) {
618  if (lastBranchName != branchAndWorker.first) {
619  //have to put back the period we removed earlier in order to get the proper name
620  BranchID bid(branchAndWorker.first + ".");
621  earlyDeleteBranchToCount_.emplace_back(bid, 0U);
622  lastBranchName = branchAndWorker.first;
623  }
624  auto found = alreadySeenWorkers.find(branchAndWorker.second);
625  if (alreadySeenWorkers.end() == found) {
626  //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
627  // all the branches that might be read by this worker. However, initially we will only tell the
628  // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
629  // EarlyDeleteHelper will automatically advance its internal end pointer.
630  size_t index = nextOpenIndex;
631  size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
634  earlyDeleteHelpers_.emplace_back(beginAddress + index, beginAddress + index + 1, &earlyDeleteBranchToCount_);
635  branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
636  alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second, &(earlyDeleteHelpers_.back())));
637  nextOpenIndex += nIndices;
638  } else {
639  found->second->appendIndex(earlyDeleteBranchToCount_.size() - 1);
640  }
641  }
642 
643  //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
644  // space needed for each module
645  auto itLast = earlyDeleteHelpers_.begin();
646  for (auto it = earlyDeleteHelpers_.begin() + 1; it != earlyDeleteHelpers_.end(); ++it) {
647  if (itLast->end() != it->begin()) {
648  //figure the offset for next Worker since it hasn't been moved yet so it has the original address
649  unsigned int delta = it->begin() - itLast->end();
650  it->shiftIndexPointers(delta);
651 
653  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
654  earlyDeleteHelperToBranchIndicies_.begin() + (it->begin() - beginAddress));
655  }
656  itLast = it;
657  }
659  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
661 
662  //now tell the paths about the deleters
663  for (auto& p : trig_paths_) {
664  p.setEarlyDeleteHelpers(alreadySeenWorkers);
665  }
666  for (auto& p : end_paths_) {
667  p.setEarlyDeleteHelpers(alreadySeenWorkers);
668  }
670  }
671  }
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
T w() const
std::vector< BranchToCount > earlyDeleteBranchToCount_
assert(be >=bs)
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
Log< level::Warning, false > LogWarning
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_

◆ makePathStatusInserters()

void edm::StreamSchedule::makePathStatusInserters ( std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters,
std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &  endPathStatusInserters,
ExceptionToActionTable const &  actions 
)
private

Definition at line 1343 of file StreamSchedule.cc.

References actions, actReg_, addToAllWorkers(), empty_end_paths_, empty_trig_paths_, end_paths_, endPathStatusInserterWorkers_, edm::get_underlying(), pathStatusInserterWorkers_, and trig_paths_.

Referenced by StreamSchedule().

1346  {
1347  int bitpos = 0;
1348  unsigned int indexEmpty = 0;
1349  unsigned int indexOfPath = 0;
1350  for (auto& pathStatusInserter : pathStatusInserters) {
1351  std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
1352  WorkerPtr workerPtr(
1353  new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1354  pathStatusInserterWorkers_.emplace_back(workerPtr);
1355  workerPtr->setActivityRegistry(actReg_);
1356  addToAllWorkers(workerPtr.get());
1357 
1358  // A little complexity here because a C++ Path object is not
1359  // instantiated and put into end_paths if there are no modules
1360  // on the configured path.
1361  if (indexEmpty < empty_trig_paths_.size() && bitpos == empty_trig_paths_.at(indexEmpty)) {
1362  ++indexEmpty;
1363  } else {
1364  trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(), workerPtr.get());
1365  ++indexOfPath;
1366  }
1367  ++bitpos;
1368  }
1369 
1370  bitpos = 0;
1371  indexEmpty = 0;
1372  indexOfPath = 0;
1373  for (auto& endPathStatusInserter : endPathStatusInserters) {
1374  std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
1375  WorkerPtr workerPtr(
1376  new edm::WorkerT<EndPathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1377  endPathStatusInserterWorkers_.emplace_back(workerPtr);
1378  workerPtr->setActivityRegistry(actReg_);
1379  addToAllWorkers(workerPtr.get());
1380 
1381  // A little complexity here because a C++ Path object is not
1382  // instantiated and put into end_paths if there are no modules
1383  // on the configured path.
1384  if (indexEmpty < empty_end_paths_.size() && bitpos == empty_end_paths_.at(indexEmpty)) {
1385  ++indexEmpty;
1386  } else {
1387  end_paths_.at(indexOfPath).setPathStatusInserter(nullptr, workerPtr.get());
1388  ++indexOfPath;
1389  }
1390  ++bitpos;
1391  }
1392  }
std::vector< int > empty_trig_paths_
roAction_t actions[nactions]
Definition: GenABIO.cc:181
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void addToAllWorkers(Worker *w)
std::shared_ptr< Worker > WorkerPtr
std::shared_ptr< ActivityRegistry > actReg_
std::vector< int > empty_end_paths_
constexpr T & get_underlying(propagate_const< T > &)
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_

◆ moduleDescriptionsInEndPath()

void edm::StreamSchedule::moduleDescriptionsInEndPath ( std::string const &  iEndPathLabel,
std::vector< ModuleDescription const *> &  descriptions,
unsigned int  hint 
) const

Definition at line 1232 of file StreamSchedule.cc.

References end_paths_, newFWLiteAna::found, mps_fire::i, and edm::Path::name().

1234  {
1235  descriptions.clear();
1236  bool found = false;
1237  TrigPaths::const_iterator itFound;
1238 
1239  if (hint < end_paths_.size()) {
1240  itFound = end_paths_.begin() + hint;
1241  if (itFound->name() == iEndPathLabel)
1242  found = true;
1243  }
1244  if (!found) {
1245  // if the hint did not work, do it the slow way
1246  itFound = std::find_if(
1247  end_paths_.begin(),
1248  end_paths_.end(),
1249  std::bind(std::equal_to<std::string>(), iEndPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1250  if (itFound != end_paths_.end())
1251  found = true;
1252  }
1253  if (found) {
1254  descriptions.reserve(itFound->size());
1255  for (size_t i = 0; i < itFound->size(); ++i) {
1256  descriptions.push_back(itFound->getWorker(i)->description());
1257  }
1258  }
1259  }
std::string const & name() const
Definition: Path.h:74

◆ moduleDescriptionsInPath()

void edm::StreamSchedule::moduleDescriptionsInPath ( std::string const &  iPathLabel,
std::vector< ModuleDescription const *> &  descriptions,
unsigned int  hint 
) const

Definition at line 1203 of file StreamSchedule.cc.

References newFWLiteAna::found, mps_fire::i, edm::Path::name(), and trig_paths_.

1205  {
1206  descriptions.clear();
1207  bool found = false;
1208  TrigPaths::const_iterator itFound;
1209 
1210  if (hint < trig_paths_.size()) {
1211  itFound = trig_paths_.begin() + hint;
1212  if (itFound->name() == iPathLabel)
1213  found = true;
1214  }
1215  if (!found) {
1216  // if the hint did not work, do it the slow way
1217  itFound = std::find_if(
1218  trig_paths_.begin(),
1219  trig_paths_.end(),
1220  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1221  if (itFound != trig_paths_.end())
1222  found = true;
1223  }
1224  if (found) {
1225  descriptions.reserve(itFound->size());
1226  for (size_t i = 0; i < itFound->size(); ++i) {
1227  descriptions.push_back(itFound->getWorker(i)->description());
1228  }
1229  }
1230  }
std::string const & name() const
Definition: Path.h:74

◆ modulesInPath()

void edm::StreamSchedule::modulesInPath ( std::string const &  iPathLabel,
std::vector< std::string > &  oLabelsToFill 
) const

adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel

Definition at line 1190 of file StreamSchedule.cc.

References mps_fire::i, edm::Path::name(), and trig_paths_.

1190  {
1191  TrigPaths::const_iterator itFound = std::find_if(
1192  trig_paths_.begin(),
1193  trig_paths_.end(),
1194  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1195  if (itFound != trig_paths_.end()) {
1196  oLabelsToFill.reserve(itFound->size());
1197  for (size_t i = 0; i < itFound->size(); ++i) {
1198  oLabelsToFill.push_back(itFound->getWorker(i)->description()->moduleLabel());
1199  }
1200  }
1201  }
std::string const & name() const
Definition: Path.h:74

◆ numberOfUnscheduledModules()

unsigned int edm::StreamSchedule::numberOfUnscheduledModules ( ) const
inline

Definition at line 259 of file StreamSchedule.h.

References number_of_unscheduled_modules_.

unsigned int number_of_unscheduled_modules_

◆ processOneEventAsync()

void edm::StreamSchedule::processOneEventAsync ( WaitingTaskHolder  iTask,
EventTransitionInfo info,
ServiceToken const &  token,
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters 
)

Definition at line 986 of file StreamSchedule.cc.

References actReg_, CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), empty_end_paths_, empty_trig_paths_, end_paths_, endPathStatusInserterWorkers_, SiStripBadComponentsDQMServiceTemplate_cfg::ep, finishedPaths(), finishProcessOneEvent(), edm::WaitingTaskHolder::group(), info(), edm::ServiceWeakToken::lock(), edm::make_waiting_task(), eostools::move(), edm::hlt::Pass, pathStatusInserterWorkers_, edm::WorkerManager::processAccumulatorsAsync(), resetAll(), results_, edm::WorkerManager::setupOnDemandSystem(), edm::WorkerManager::setupResolvers(), streamContext_, streamID_, total_events_, trig_paths_, and workerManager_.

990  {
991  EventPrincipal& ep = info.principal();
992 
993  // Caught exception is propagated via WaitingTaskHolder
994  CMS_SA_ALLOW try {
995  this->resetAll();
996 
997  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
998 
999  Traits::setStreamContext(streamContext_, ep);
1000  //a service may want to communicate with another service
1001  ServiceRegistry::Operate guard(serviceToken);
1002  Traits::preScheduleSignal(actReg_.get(), &streamContext_);
1003 
1004  // Data dependencies need to be set up before marking empty
1005  // (End)Paths complete in case something consumes the status of
1006  // the empty (EndPath)
1009 
1010  HLTPathStatus hltPathStatus(hlt::Pass, 0);
1011  for (int empty_trig_path : empty_trig_paths_) {
1012  results_->at(empty_trig_path) = hltPathStatus;
1013  pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
1014  std::exception_ptr except = pathStatusInserterWorkers_[empty_trig_path]
1015  ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
1016  info, streamID_, ParentContext(&streamContext_), &streamContext_);
1017  if (except) {
1018  iTask.doneWaiting(except);
1019  return;
1020  }
1021  }
1022  for (int empty_end_path : empty_end_paths_) {
1023  std::exception_ptr except = endPathStatusInserterWorkers_[empty_end_path]
1024  ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
1025  info, streamID_, ParentContext(&streamContext_), &streamContext_);
1026  if (except) {
1027  iTask.doneWaiting(except);
1028  return;
1029  }
1030  }
1031 
1032  ++total_events_;
1033 
1034  //use to give priorities on an error to ones from Paths
1035  auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(nullptr);
1036  auto pathErrorPtr = pathErrorHolder.get();
1037  ServiceWeakToken weakToken = serviceToken;
1038  auto allPathsDone = make_waiting_task(
1039  [iTask, this, weakToken, pathError = std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable {
1040  ServiceRegistry::Operate operate(weakToken.lock());
1041 
1042  std::exception_ptr ptr;
1043  if (pathError->load()) {
1044  ptr = *pathError->load();
1045  delete pathError->load();
1046  }
1047  if ((not ptr) and iPtr) {
1048  ptr = *iPtr;
1049  }
1050  iTask.doneWaiting(finishProcessOneEvent(ptr));
1051  });
1052  //The holder guarantees that if the paths finish before the loop ends
1053  // that we do not start too soon. It also guarantees that the task will
1054  // run under that condition.
1055  WaitingTaskHolder allPathsHolder(*iTask.group(), allPathsDone);
1056 
1057  auto pathsDone = make_waiting_task([allPathsHolder, pathErrorPtr, transitionInfo = info, this, weakToken](
1058  std::exception_ptr const* iPtr) mutable {
1059  ServiceRegistry::Operate operate(weakToken.lock());
1060 
1061  if (iPtr) {
1062  //this is used to prioritize this error over one
1063  // that happens in EndPath or Accumulate
1064  pathErrorPtr->store(new std::exception_ptr(*iPtr));
1065  }
1066  finishedPaths(*pathErrorPtr, std::move(allPathsHolder), transitionInfo);
1067  });
1068 
1069  //The holder guarantees that if the paths finish before the loop ends
1070  // that we do not start too soon. It also guarantees that the task will
1071  // run under that condition.
1072  WaitingTaskHolder taskHolder(*iTask.group(), pathsDone);
1073 
1074  //start end paths first so on single threaded the paths will run first
1075  WaitingTaskHolder hAllPathsDone(*iTask.group(), allPathsDone);
1076  for (auto it = end_paths_.rbegin(), itEnd = end_paths_.rend(); it != itEnd; ++it) {
1077  it->processOneOccurrenceAsync(hAllPathsDone, info, serviceToken, streamID_, &streamContext_);
1078  }
1079 
1080  for (auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend(); it != itEnd; ++it) {
1081  it->processOneOccurrenceAsync(taskHolder, info, serviceToken, streamID_, &streamContext_);
1082  }
1083 
1084  ParentContext parentContext(&streamContext_);
1085  workerManager_.processAccumulatorsAsync<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
1086  hAllPathsDone, info, serviceToken, streamID_, parentContext, &streamContext_);
1087  } catch (...) {
1088  iTask.doneWaiting(std::current_exception());
1089  }
1090  }
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
std::vector< int > empty_trig_paths_
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void processAccumulatorsAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
void setupOnDemandSystem(EventTransitionInfo const &)
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
std::vector< int > empty_end_paths_
accept
Definition: HLTenums.h:18
StreamContext streamContext_
void finishedPaths(std::atomic< std::exception_ptr *> &, WaitingTaskHolder, EventTransitionInfo &)
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
edm::propagate_const< TrigResPtr > results_
void setupResolvers(Principal &principal)
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
def move(src, dest)
Definition: eostools.py:511

◆ processOneStreamAsync()

template<typename T >
void edm::StreamSchedule::processOneStreamAsync ( WaitingTaskHolder  iTask,
typename T::TransitionInfoType &  transitionInfo,
ServiceToken const &  token,
bool  cleaningUpAfterException = false 
)

Definition at line 394 of file StreamSchedule.h.

References actReg_, edm::addContextAndPrintException(), CMS_SA_ALLOW, cms::Exception::context(), edm::WaitingTaskHolder::doneWaiting(), end_paths_, edm::ExceptionFromThisContext, edm::WaitingTaskHolder::group(), h, l1ctLayer2EG_cff::id, info(), edm::ServiceWeakToken::lock(), edm::make_functor_task(), edm::make_waiting_task(), AlCaHLTBitMon_ParallelJobs::p, edm::WorkerManager::processOneOccurrenceAsync(), edm::WorkerManager::resetAll(), alignCSCRings::s, streamContext_, streamID_, TrackValidation_cff::task, unpackBuffers-CaloStage2::token, edm::transitionName(), trig_paths_, edm::StreamID::value(), workerManager_, and edm::convertException::wrap().

397  {
398  auto const& principal = transitionInfo.principal();
399  T::setStreamContext(streamContext_, principal);
400 
401  auto id = principal.id();
402  ServiceWeakToken weakToken = token;
403  auto doneTask = make_waiting_task(
404  [this, iHolder, id, cleaningUpAfterException, weakToken](std::exception_ptr const* iPtr) mutable {
405  std::exception_ptr excpt;
406  if (iPtr) {
407  excpt = *iPtr;
408  //add context information to the exception and print message
409  try {
410  convertException::wrap([&]() { std::rethrow_exception(excpt); });
411  } catch (cms::Exception& ex) {
412  //TODO: should add the transition type info
413  std::ostringstream ost;
414  if (ex.context().empty()) {
415  ost << "Processing " << T::transitionName() << " " << id;
416  }
417  ServiceRegistry::Operate op(weakToken.lock());
418  addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
419  excpt = std::current_exception();
420  }
421 
422  ServiceRegistry::Operate op(weakToken.lock());
423  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
424  }
425  // Caught exception is propagated via WaitingTaskHolder
426  CMS_SA_ALLOW try {
427  ServiceRegistry::Operate op(weakToken.lock());
428  T::postScheduleSignal(actReg_.get(), &streamContext_);
429  } catch (...) {
430  if (not excpt) {
431  excpt = std::current_exception();
432  }
433  }
434  iHolder.doneWaiting(excpt);
435  });
436 
437  auto task = make_functor_task(
438  [this, h = WaitingTaskHolder(*iHolder.group(), doneTask), info = transitionInfo, weakToken]() mutable {
439  auto token = weakToken.lock();
441  // Caught exception is propagated via WaitingTaskHolder
442  CMS_SA_ALLOW try {
443  T::preScheduleSignal(actReg_.get(), &streamContext_);
444 
446  } catch (...) {
447  h.doneWaiting(std::current_exception());
448  return;
449  }
450 
451  for (auto& p : end_paths_) {
452  p.runAllModulesAsync<T>(h, info, token, streamID_, &streamContext_);
453  }
454 
455  for (auto& p : trig_paths_) {
456  p.runAllModulesAsync<T>(h, info, token, streamID_, &streamContext_);
457  }
458 
460  });
461 
462  if (streamID_.value() == 0) {
463  //Enqueueing will start another thread if there is only
464  // one thread in the job. Having stream == 0 use spawn
465  // avoids starting up another thread when there is only one stream.
466  iHolder.group()->run([task]() {
467  TaskSentry s{task};
468  task->execute();
469  });
470  } else {
471  oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
472  arena.enqueue([task]() {
473  TaskSentry s{task};
474  task->execute();
475  });
476  }
477  }
std::string_view transitionName(GlobalContext::Transition)
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
void processOneOccurrenceAsync(WaitingTaskHolder, typename T::TransitionInfoType &, ServiceToken const &, StreamID, typename T::Context const *topContext, U const *context)
StreamContext streamContext_
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
FunctorTask< F > * make_functor_task(F f)
Definition: FunctorTask.h:44
unsigned int value() const
Definition: StreamID.h:43
auto wrap(F iFunc) -> decltype(iFunc())
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
std::list< std::string > const & context() const
Definition: Exception.cc:147
long double T

◆ replaceModule()

void edm::StreamSchedule::replaceModule ( maker::ModuleHolder iMod,
std::string const &  iLabel 
)

clone the type of module with label iLabel but configure with iPSet.

Definition at line 957 of file StreamSchedule.cc.

References allWorkers(), newFWLiteAna::found, edm::maker::ModuleHolder::replaceModuleFor(), streamContext_, and streamID_.

957  {
958  Worker* found = nullptr;
959  for (auto const& worker : allWorkers()) {
960  if (worker->description()->moduleLabel() == iLabel) {
961  found = worker;
962  break;
963  }
964  }
965  if (nullptr == found) {
966  return;
967  }
968 
969  iMod->replaceModuleFor(found);
970  found->beginStream(streamID_, streamContext_);
971  }
StreamContext streamContext_
AllWorkers const & allWorkers() const
returns the collection of pointers to workers

◆ reportSkipped()

void edm::StreamSchedule::reportSkipped ( EventPrincipal const &  ep) const
inlineprivate

Definition at line 388 of file StreamSchedule.h.

References SiStripBadComponentsDQMServiceTemplate_cfg::ep.

388  {
389  Service<JobReport> reportSvc;
390  reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
391  }

◆ resetAll()

void edm::StreamSchedule::resetAll ( )
private

Definition at line 1322 of file StreamSchedule.cc.

References results_, and skippingEvent_.

Referenced by processOneEventAsync().

1322  {
1323  skippingEvent_ = false;
1324  results_->reset();
1325  }
edm::propagate_const< TrigResPtr > results_
std::atomic< bool > skippingEvent_

◆ resetEarlyDelete()

void edm::StreamSchedule::resetEarlyDelete ( )
private

Definition at line 1329 of file StreamSchedule.cc.

References submitPVResolutionJobs::count, earlyDeleteBranchToCount_, earlyDeleteHelpers_, and earlyDeleteHelperToBranchIndicies_.

Referenced by finishProcessOneEvent(), and initializeEarlyDelete().

1329  {
1330  //must be sure we have cleared the count first
1331  for (auto& count : earlyDeleteBranchToCount_) {
1332  count.count = 0;
1333  }
1334  //now reset based on how many helpers use that branch
1336  ++(earlyDeleteBranchToCount_[index].count);
1337  }
1338  for (auto& helper : earlyDeleteHelpers_) {
1339  helper.reset();
1340  }
1341  }
Definition: helper.py:1
std::vector< BranchToCount > earlyDeleteBranchToCount_
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_

◆ results() [1/2]

TrigResConstPtr edm::StreamSchedule::results ( ) const
inlineprivate

Definition at line 342 of file StreamSchedule.h.

References edm::get_underlying_safe(), and results_.

Referenced by StreamSchedule().

342 { return get_underlying_safe(results_); }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< TrigResPtr > results_

◆ results() [2/2]

TrigResPtr& edm::StreamSchedule::results ( )
inlineprivate

Definition at line 343 of file StreamSchedule.h.

References edm::get_underlying_safe(), and results_.

343 { return get_underlying_safe(results_); }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< TrigResPtr > results_

◆ streamID()

StreamID edm::StreamSchedule::streamID ( ) const
inline

Definition at line 199 of file StreamSchedule.h.

References streamID_.

Referenced by StreamSchedule().

199 { return streamID_; }

◆ totalEvents()

int edm::StreamSchedule::totalEvents ( ) const
inline

Return the number of events this StreamSchedule has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 226 of file StreamSchedule.h.

References total_events_.

Referenced by getTriggerReport(), and totalEventsFailed().

226 { return total_events_; }

◆ totalEventsFailed()

int edm::StreamSchedule::totalEventsFailed ( ) const
inline

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 234 of file StreamSchedule.h.

References totalEvents(), and totalEventsPassed().

Referenced by getTriggerReport().

234 { return totalEvents() - totalEventsPassed(); }
int totalEventsPassed() const
int totalEvents() const

◆ totalEventsPassed()

int edm::StreamSchedule::totalEventsPassed ( ) const
inline

Return the number of events which have been passed by one or more trigger paths.

Definition at line 230 of file StreamSchedule.h.

References total_passed_.

Referenced by getTriggerReport(), and totalEventsFailed().

230 { return total_passed_; }

◆ tryToPlaceConditionalModules()

std::vector< Worker * > edm::StreamSchedule::tryToPlaceConditionalModules ( Worker worker,
std::unordered_set< std::string > &  conditionalModules,
std::unordered_multimap< std::string, edm::BranchDescription const *> const &  conditionalModuleBranches,
std::unordered_multimap< std::string, AliasInfo > const &  aliasMap,
ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration 
)
private

Definition at line 673 of file StreamSchedule.cc.

References cms::cuda::assert(), MicroEventContent_cff::branch, edm::TypeID::className(), edm::Worker::consumesInfo(), edm::Worker::description(), HerwigMaxPtPartonFilter_cfi::moduleLabel, edm::ModuleDescription::moduleLabel(), or, edm::PRODUCT_TYPE, AlCaHLTBitMon_QueryRunRegistry::string, edm::productholderindexhelper::typeIsViewCompatible(), and workerManager_.

Referenced by fillWorkers().

681  {
682  std::vector<Worker*> returnValue;
683  auto const& consumesInfo = worker->consumesInfo();
684  auto moduleLabel = worker->description()->moduleLabel();
685  using namespace productholderindexhelper;
686  for (auto const& ci : consumesInfo) {
687  if (not ci.skipCurrentProcess() and
688  (ci.process().empty() or ci.process() == processConfiguration->processName())) {
689  auto productModuleLabel = std::string(ci.label());
690  if (productModuleLabel.empty()) {
691  //this is a consumesMany request
692  for (auto const& branch : conditionalModuleBranches) {
693  //check that the conditional module has not been used
694  if (conditionalModules.find(branch.first) == conditionalModules.end()) {
695  continue;
696  }
697  if (ci.kindOfType() == edm::PRODUCT_TYPE) {
698  if (branch.second->unwrappedTypeID() != ci.type()) {
699  continue;
700  }
701  } else {
702  if (not typeIsViewCompatible(
703  ci.type(), TypeID(branch.second->wrappedType().typeInfo()), branch.second->className())) {
704  continue;
705  }
706  }
707 
708  auto condWorker = getWorker(branch.first, proc_pset, workerManager_, preg, prealloc, processConfiguration);
709  assert(condWorker);
710 
711  conditionalModules.erase(branch.first);
712 
713  auto dependents = tryToPlaceConditionalModules(condWorker,
714  conditionalModules,
715  conditionalModuleBranches,
716  aliasMap,
717  proc_pset,
718  preg,
719  prealloc,
720  processConfiguration);
721  returnValue.insert(returnValue.end(), dependents.begin(), dependents.end());
722  returnValue.push_back(condWorker);
723  }
724  } else {
725  //just a regular consumes
726  bool productFromConditionalModule = false;
727  auto itFound = conditionalModules.find(productModuleLabel);
728  if (itFound == conditionalModules.end()) {
729  //Check to see if this was an alias
730  //note that aliasMap was previously filtered so only the conditional modules remain there
731  auto foundAlias = findBestMatchingAlias(conditionalModuleBranches, aliasMap, productModuleLabel, ci);
732  if (foundAlias) {
733  productModuleLabel = *foundAlias;
734  productFromConditionalModule = true;
735  itFound = conditionalModules.find(productModuleLabel);
736  //check that the alias-for conditional module has not been used
737  if (itFound == conditionalModules.end()) {
738  continue;
739  }
740  }
741  } else {
742  //need to check the rest of the data product info
743  auto findBranches = conditionalModuleBranches.equal_range(productModuleLabel);
744  for (auto itBranch = findBranches.first; itBranch != findBranches.second; ++itBranch) {
745  if (itBranch->second->productInstanceName() == ci.instance()) {
746  if (ci.kindOfType() == PRODUCT_TYPE) {
747  if (ci.type() == itBranch->second->unwrappedTypeID()) {
748  productFromConditionalModule = true;
749  break;
750  }
751  } else {
752  //this is a view
753  if (typeIsViewCompatible(ci.type(),
754  TypeID(itBranch->second->wrappedType().typeInfo()),
755  itBranch->second->className())) {
756  productFromConditionalModule = true;
757  break;
758  }
759  }
760  }
761  }
762  }
763  if (productFromConditionalModule) {
764  auto condWorker =
765  getWorker(productModuleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration);
766  assert(condWorker);
767 
768  conditionalModules.erase(itFound);
769 
770  auto dependents = tryToPlaceConditionalModules(condWorker,
771  conditionalModules,
772  conditionalModuleBranches,
773  aliasMap,
774  proc_pset,
775  preg,
776  prealloc,
777  processConfiguration);
778  returnValue.insert(returnValue.end(), dependents.begin(), dependents.end());
779  returnValue.push_back(condWorker);
780  }
781  }
782  }
783  }
784  return returnValue;
785  }
std::vector< Worker * > tryToPlaceConditionalModules(Worker *, std::unordered_set< std::string > &conditionalModules, std::unordered_multimap< std::string, edm::BranchDescription const *> const &conditionalModuleBranches, std::unordered_multimap< std::string, AliasInfo > const &aliasMap, ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
assert(be >=bs)
WorkerManager workerManager_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
bool typeIsViewCompatible(TypeID const &requestedViewType, TypeID const &wrappedtypeID, std::string const &className)

◆ unscheduledWorkers()

AllWorkers const& edm::StreamSchedule::unscheduledWorkers ( ) const
inline

Definition at line 258 of file StreamSchedule.h.

References edm::WorkerManager::unscheduledWorkers(), and workerManager_.

AllWorkers const & unscheduledWorkers() const
Definition: WorkerManager.h:89
WorkerManager workerManager_

Member Data Documentation

◆ actReg_

std::shared_ptr<ActivityRegistry> edm::StreamSchedule::actReg_
private

◆ earlyDeleteBranchToCount_

std::vector<BranchToCount> edm::StreamSchedule::earlyDeleteBranchToCount_
private

Definition at line 367 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

◆ earlyDeleteHelpers_

std::vector<EarlyDeleteHelper> edm::StreamSchedule::earlyDeleteHelpers_
private

Definition at line 377 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

◆ earlyDeleteHelperToBranchIndicies_

std::vector<unsigned int> edm::StreamSchedule::earlyDeleteHelperToBranchIndicies_
private

Definition at line 374 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

◆ empty_end_paths_

std::vector<int> edm::StreamSchedule::empty_end_paths_
private

Definition at line 362 of file StreamSchedule.h.

Referenced by fillEndPath(), makePathStatusInserters(), and processOneEventAsync().

◆ empty_trig_paths_

std::vector<int> edm::StreamSchedule::empty_trig_paths_
private

Definition at line 361 of file StreamSchedule.h.

Referenced by fillTrigPath(), makePathStatusInserters(), and processOneEventAsync().

◆ end_paths_

TrigPaths edm::StreamSchedule::end_paths_
private

◆ endPathStatusInserterWorkers_

std::vector<edm::propagate_const<WorkerPtr> > edm::StreamSchedule::endPathStatusInserterWorkers_
private

Definition at line 357 of file StreamSchedule.h.

Referenced by makePathStatusInserters(), and processOneEventAsync().

◆ number_of_unscheduled_modules_

unsigned int edm::StreamSchedule::number_of_unscheduled_modules_
private

Definition at line 381 of file StreamSchedule.h.

Referenced by numberOfUnscheduledModules(), and StreamSchedule().

◆ pathStatusInserterWorkers_

std::vector<edm::propagate_const<WorkerPtr> > edm::StreamSchedule::pathStatusInserterWorkers_
private

Definition at line 356 of file StreamSchedule.h.

Referenced by makePathStatusInserters(), and processOneEventAsync().

◆ results_

edm::propagate_const<TrigResPtr> edm::StreamSchedule::results_
private

Definition at line 353 of file StreamSchedule.h.

Referenced by finishedPaths(), processOneEventAsync(), resetAll(), and results().

◆ results_inserter_

edm::propagate_const<WorkerPtr> edm::StreamSchedule::results_inserter_
private

Definition at line 355 of file StreamSchedule.h.

Referenced by finishedPaths(), and StreamSchedule().

◆ skippingEvent_

std::atomic<bool> edm::StreamSchedule::skippingEvent_
private

Definition at line 385 of file StreamSchedule.h.

Referenced by fillTrigPath(), and resetAll().

◆ streamContext_

StreamContext edm::StreamSchedule::streamContext_
private

◆ streamID_

StreamID edm::StreamSchedule::streamID_
private

◆ total_events_

int edm::StreamSchedule::total_events_
private

Definition at line 379 of file StreamSchedule.h.

Referenced by clearCounters(), processOneEventAsync(), and totalEvents().

◆ total_passed_

int edm::StreamSchedule::total_passed_
private

Definition at line 380 of file StreamSchedule.h.

Referenced by clearCounters(), finishedPaths(), and totalEventsPassed().

◆ trig_paths_

TrigPaths edm::StreamSchedule::trig_paths_
private

◆ workerManager_

WorkerManager edm::StreamSchedule::workerManager_
private