CMS 3D CMS Logo

List of all members | Classes | Public Types | Public Member Functions | Private Member Functions | Private Attributes
edm::StreamSchedule Class Reference

#include <StreamSchedule.h>

Classes

struct  AliasInfo
 
class  SendTerminationSignalIfException
 

Public Types

typedef std::vector< Worker * > AllWorkers
 
typedef std::vector< PathNonTrigPaths
 
typedef std::vector< WorkerInPathPathWorkers
 
typedef std::vector< PathTrigPaths
 
typedef std::shared_ptr< HLTGlobalStatus const > TrigResConstPtr
 
typedef std::shared_ptr< HLTGlobalStatusTrigResPtr
 
typedef std::vector< std::string > vstring
 
typedef std::shared_ptr< WorkerWorkerPtr
 
typedef std::vector< Worker * > Workers
 

Public Member Functions

AllWorkers const & allWorkers () const
 returns the collection of pointers to workers More...
 
void availablePaths (std::vector< std::string > &oLabelsToFill) const
 adds to oLabelsToFill the labels for all paths in the process More...
 
void beginStream ()
 
void clearCounters ()
 Clear all the counters in the trigger report. More...
 
StreamContext const & context () const
 
void deleteModule (std::string const &iLabel)
 Delete the module with label iLabel. More...
 
void endStream ()
 
std::vector< ModuleDescription const * > getAllModuleDescriptions () const
 
void getTriggerReport (TriggerReport &rep) const
 
void initializeEarlyDelete (ModuleRegistry &modReg, std::vector< std::string > const &branchesToDeleteEarly, std::multimap< std::string, std::string > const &referencesToBranches, std::vector< std::string > const &modulesToSkip, edm::ProductRegistry const &preg)
 
void moduleDescriptionsInEndPath (std::string const &iEndPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
 
void moduleDescriptionsInPath (std::string const &iPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
 
void modulesInPath (std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
 adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel More...
 
unsigned int numberOfUnscheduledModules () const
 
void processOneEventAsync (WaitingTaskHolder iTask, EventTransitionInfo &, ServiceToken const &token, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
 
template<typename T >
void processOneStreamAsync (WaitingTaskHolder iTask, typename T::TransitionInfoType &transitionInfo, ServiceToken const &token, bool cleaningUpAfterException=false)
 
void replaceModule (maker::ModuleHolder *iMod, std::string const &iLabel)
 clone the type of module with label iLabel but configure with iPSet. More...
 
StreamID streamID () const
 
 StreamSchedule (std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, StreamID streamID, ProcessContext const *processContext)
 
 StreamSchedule (StreamSchedule const &)=delete
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 
AllWorkers const & unscheduledWorkers () const
 

Private Member Functions

ExceptionToActionTable const & actionTable () const
 returns the action table More...
 
void addToAllWorkers (Worker *w)
 
void fillEndPath (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)
 
void fillTrigPath (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)
 
void fillWorkers (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)
 
void finishedPaths (std::atomic< std::exception_ptr *> &, WaitingTaskHolder, EventTransitionInfo &)
 
std::exception_ptr finishProcessOneEvent (std::exception_ptr)
 
void makePathStatusInserters (std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
 
void reportSkipped (EventPrincipal const &ep) const
 
void resetAll ()
 
void resetEarlyDelete ()
 
TrigResConstPtr results () const
 
TrigResPtrresults ()
 
std::vector< Worker * > tryToPlaceConditionalModules (Worker *, std::unordered_set< std::string > &conditionalModules, std::unordered_multimap< std::string, edm::BranchDescription const *> const &conditionalModuleBranches, std::unordered_multimap< std::string, AliasInfo > const &aliasMap, ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
 

Private Attributes

std::shared_ptr< ActivityRegistryactReg_
 
std::vector< BranchToCountearlyDeleteBranchToCount_
 
std::vector< EarlyDeleteHelperearlyDeleteHelpers_
 
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
 
std::vector< int > empty_end_paths_
 
std::vector< int > empty_trig_paths_
 
TrigPaths end_paths_
 
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
 
unsigned int number_of_unscheduled_modules_
 
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
 
edm::propagate_const< TrigResPtrresults_
 
edm::propagate_const< WorkerPtrresults_inserter_
 
std::atomic< bool > skippingEvent_
 
StreamContext streamContext_
 
StreamID streamID_
 
int total_events_
 
int total_passed_
 
TrigPaths trig_paths_
 
WorkerManager workerManager_
 

Detailed Description

Definition at line 154 of file StreamSchedule.h.

Member Typedef Documentation

◆ AllWorkers

typedef std::vector<Worker*> edm::StreamSchedule::AllWorkers

Definition at line 162 of file StreamSchedule.h.

◆ NonTrigPaths

Definition at line 158 of file StreamSchedule.h.

◆ PathWorkers

Definition at line 166 of file StreamSchedule.h.

◆ TrigPaths

typedef std::vector<Path> edm::StreamSchedule::TrigPaths

Definition at line 157 of file StreamSchedule.h.

◆ TrigResConstPtr

typedef std::shared_ptr<HLTGlobalStatus const> edm::StreamSchedule::TrigResConstPtr

Definition at line 160 of file StreamSchedule.h.

◆ TrigResPtr

Definition at line 159 of file StreamSchedule.h.

◆ vstring

typedef std::vector<std::string> edm::StreamSchedule::vstring

Definition at line 156 of file StreamSchedule.h.

◆ WorkerPtr

typedef std::shared_ptr<Worker> edm::StreamSchedule::WorkerPtr

Definition at line 161 of file StreamSchedule.h.

◆ Workers

typedef std::vector<Worker*> edm::StreamSchedule::Workers

Definition at line 164 of file StreamSchedule.h.

Constructor & Destructor Documentation

◆ StreamSchedule() [1/2]

edm::StreamSchedule::StreamSchedule ( std::shared_ptr< TriggerResultInserter inserter,
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters,
std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &  endPathStatusInserters,
std::shared_ptr< ModuleRegistry modReg,
ParameterSet proc_pset,
service::TriggerNamesService const &  tns,
PreallocationConfiguration const &  prealloc,
ProductRegistry pregistry,
ExceptionToActionTable const &  actions,
std::shared_ptr< ActivityRegistry areg,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
StreamID  streamID,
ProcessContext const *  processContext 
)

Definition at line 349 of file StreamSchedule.cc.

References actions, actReg_, addToAllWorkers(), edm::WorkerManager::addToUnscheduledWorkers(), allWorkers(), cms::cuda::assert(), end_paths_, fillEndPath(), fillTrigPath(), edm::propagate_const< T >::get(), edm::service::TriggerNamesService::getEndPaths(), edm::ParameterSet::getParameter(), edm::ParameterSet::getPSetForUpdate(), edm::service::TriggerNamesService::getTrigPaths(), label, makePathStatusInserters(), number_of_unscheduled_modules_, results(), results_inserter_, DBoxMetadataHelper::set_difference(), streamID(), trig_paths_, edm::StreamID::value(), and workerManager_.

363  : workerManager_(modReg, areg, actions),
364  actReg_(areg),
365  results_(new HLTGlobalStatus(tns.getTrigPaths().size())),
367  trig_paths_(),
368  end_paths_(),
369  total_events_(),
370  total_passed_(),
373  streamContext_(streamID_, processContext),
374  skippingEvent_(false) {
375  bool hasPath = false;
376  std::vector<std::string> const& pathNames = tns.getTrigPaths();
377  std::vector<std::string> const& endPathNames = tns.getEndPaths();
378 
379  ConditionalTaskHelper conditionalTaskHelper(
380  proc_pset, preg, &prealloc, processConfiguration, workerManager_, pathNames);
381 
382  int trig_bitpos = 0;
383  trig_paths_.reserve(pathNames.size());
384  for (auto const& trig_name : pathNames) {
385  fillTrigPath(proc_pset,
386  preg,
387  &prealloc,
388  processConfiguration,
389  trig_bitpos,
390  trig_name,
391  results(),
392  endPathNames,
393  conditionalTaskHelper);
394  ++trig_bitpos;
395  hasPath = true;
396  }
397 
398  if (hasPath) {
399  // the results inserter stands alone
400  inserter->setTrigResultForStream(streamID.value(), results());
401 
402  results_inserter_ = makeInserter(actions, actReg_, inserter);
404  }
405 
406  // fill normal endpaths
407  int bitpos = 0;
408  end_paths_.reserve(endPathNames.size());
409  for (auto const& end_path_name : endPathNames) {
410  fillEndPath(
411  proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name, endPathNames, conditionalTaskHelper);
412  ++bitpos;
413  }
414 
415  makePathStatusInserters(pathStatusInserters, endPathStatusInserters, actions);
416 
417  //See if all modules were used
418  std::set<std::string> usedWorkerLabels;
419  for (auto const& worker : allWorkers()) {
420  usedWorkerLabels.insert(worker->description()->moduleLabel());
421  }
422  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
423  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
424  std::vector<std::string> unusedLabels;
425  set_difference(modulesInConfigSet.begin(),
426  modulesInConfigSet.end(),
427  usedWorkerLabels.begin(),
428  usedWorkerLabels.end(),
429  back_inserter(unusedLabels));
430  std::set<std::string> unscheduledLabels;
431  std::vector<std::string> shouldBeUsedLabels;
432  if (!unusedLabels.empty()) {
433  //Need to
434  // 1) create worker
435  // 2) if it is a WorkerT<EDProducer>, add it to our list
436  // 3) hand list to our delayed reader
437  for (auto const& label : unusedLabels) {
438  bool isTracked;
439  ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
440  assert(isTracked);
441  assert(modulePSet != nullptr);
443  *modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
444  }
445  if (!shouldBeUsedLabels.empty()) {
446  std::ostringstream unusedStream;
447  unusedStream << "'" << shouldBeUsedLabels.front() << "'";
448  for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
449  itLabelEnd = shouldBeUsedLabels.end();
450  itLabel != itLabelEnd;
451  ++itLabel) {
452  unusedStream << ",'" << *itLabel << "'";
453  }
454  LogInfo("path") << "The following module labels are not assigned to any path:\n" << unusedStream.str() << "\n";
455  }
456  }
457  number_of_unscheduled_modules_ = unscheduledLabels.size();
458  } // StreamSchedule::StreamSchedule
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)
roAction_t actions[nactions]
Definition: GenABIO.cc:181
void addToAllWorkers(Worker *w)
edm::propagate_const< WorkerPtr > results_inserter_
assert(be >=bs)
StreamID streamID() const
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
constexpr element_type const * get() const
char const * label
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)
TrigResConstPtr results() const
StreamContext streamContext_
Log< level::Info, false > LogInfo
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
edm::propagate_const< TrigResPtr > results_
std::atomic< bool > skippingEvent_
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
unsigned int value() const
Definition: StreamID.h:43
std::vector< std::string > set_difference(std::vector< std::string > const &v1, std::vector< std::string > const &v2)

◆ StreamSchedule() [2/2]

edm::StreamSchedule::StreamSchedule ( StreamSchedule const &  )
delete

Member Function Documentation

◆ actionTable()

ExceptionToActionTable const& edm::StreamSchedule::actionTable ( ) const
inlineprivate

returns the action table

Definition at line 292 of file StreamSchedule.h.

References edm::WorkerManager::actionTable(), and workerManager_.

Referenced by fillEndPath(), fillTrigPath(), and finishedPaths().

292 { return workerManager_.actionTable(); }
WorkerManager workerManager_
ExceptionToActionTable const & actionTable() const
Definition: WorkerManager.h:93

◆ addToAllWorkers()

void edm::StreamSchedule::addToAllWorkers ( Worker w)
private

◆ allWorkers()

AllWorkers const& edm::StreamSchedule::allWorkers ( ) const
inline

returns the collection of pointers to workers

Definition at line 256 of file StreamSchedule.h.

References edm::WorkerManager::allWorkers(), and workerManager_.

Referenced by clearCounters(), getAllModuleDescriptions(), getTriggerReport(), initializeEarlyDelete(), replaceModule(), and StreamSchedule().

256 { return workerManager_.allWorkers(); }
WorkerManager workerManager_
AllWorkers const & allWorkers() const
Definition: WorkerManager.h:88

◆ availablePaths()

void edm::StreamSchedule::availablePaths ( std::vector< std::string > &  oLabelsToFill) const

adds to oLabelsToFill the labels for all paths in the process

Definition at line 1144 of file StreamSchedule.cc.

References edm::Path::name(), HcalDetIdTransform::transform(), and trig_paths_.

1144  {
1145  oLabelsToFill.reserve(trig_paths_.size());
1146  std::transform(trig_paths_.begin(),
1147  trig_paths_.end(),
1148  std::back_inserter(oLabelsToFill),
1149  std::bind(&Path::name, std::placeholders::_1));
1150  }
std::string const & name() const
Definition: Path.h:74
unsigned transform(const HcalDetId &id, unsigned transformCode)

◆ beginStream()

void edm::StreamSchedule::beginStream ( )

Definition at line 915 of file StreamSchedule.cc.

References edm::WorkerManager::beginStream(), streamContext_, streamID_, and workerManager_.

WorkerManager workerManager_
StreamContext streamContext_
void beginStream(StreamID iID, StreamContext &streamContext)

◆ clearCounters()

void edm::StreamSchedule::clearCounters ( )

Clear all the counters in the trigger report.

Definition at line 1276 of file StreamSchedule.cc.

References allWorkers(), edm::Path::clearCounters(), edm::Worker::clearCounters(), end_paths_, edm::for_all(), total_events_, total_passed_, and trig_paths_.

1276  {
1277  using std::placeholders::_1;
1279  for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
1280  for_all(end_paths_, std::bind(&Path::clearCounters, _1));
1281  for_all(allWorkers(), std::bind(&Worker::clearCounters, _1));
1282  }
void clearCounters()
Definition: Worker.h:232
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
void clearCounters()
Definition: Path.cc:198

◆ context()

StreamContext const& edm::StreamSchedule::context ( ) const
inline

Definition at line 261 of file StreamSchedule.h.

References streamContext_.

261 { return streamContext_; }
StreamContext streamContext_

◆ deleteModule()

void edm::StreamSchedule::deleteModule ( std::string const &  iLabel)

Delete the module with label iLabel.

Definition at line 935 of file StreamSchedule.cc.

References edm::WorkerManager::deleteModuleIfExists(), and workerManager_.

void deleteModuleIfExists(std::string const &moduleLabel)
WorkerManager workerManager_

◆ endStream()

void edm::StreamSchedule::endStream ( )

Definition at line 917 of file StreamSchedule.cc.

References edm::WorkerManager::endStream(), streamContext_, streamID_, and workerManager_.

void endStream(StreamID iID, StreamContext &streamContext)
WorkerManager workerManager_
StreamContext streamContext_

◆ fillEndPath()

void edm::StreamSchedule::fillEndPath ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
int  bitpos,
std::string const &  name,
std::vector< std::string > const &  endPathNames,
ConditionalTaskHelper const &  conditionalTaskHelper 
)
private

Definition at line 884 of file StreamSchedule.cc.

References actionTable(), actReg_, addToAllWorkers(), empty_end_paths_, end_paths_, fillWorkers(), edm::PathContext::kEndPath, Skims_PA_cff::name, and streamContext_.

Referenced by StreamSchedule().

891  {
892  PathWorkers tmpworkers;
893  fillWorkers(
894  proc_pset, preg, prealloc, processConfiguration, name, true, tmpworkers, endPathNames, conditionalTaskHelper);
895 
896  if (!tmpworkers.empty()) {
897  //EndPaths are not supposed to stop if SkipEvent type exception happens
898  end_paths_.emplace_back(bitpos,
899  name,
900  tmpworkers,
901  TrigResPtr(),
902  actionTable(),
903  actReg_,
905  nullptr,
907  } else {
908  empty_end_paths_.push_back(bitpos);
909  }
910  for (WorkerInPath const& workerInPath : tmpworkers) {
911  addToAllWorkers(workerInPath.getWorker());
912  }
913  }
ExceptionToActionTable const & actionTable() const
returns the action table
std::shared_ptr< HLTGlobalStatus > TrigResPtr
void addToAllWorkers(Worker *w)
std::shared_ptr< ActivityRegistry > actReg_
std::vector< WorkerInPath > PathWorkers
std::vector< int > empty_end_paths_
StreamContext streamContext_
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)

◆ fillTrigPath()

void edm::StreamSchedule::fillTrigPath ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
int  bitpos,
std::string const &  name,
TrigResPtr  trptr,
std::vector< std::string > const &  endPathNames,
ConditionalTaskHelper const &  conditionalTaskHelper 
)
private

Definition at line 852 of file StreamSchedule.cc.

References actionTable(), actReg_, addToAllWorkers(), empty_trig_paths_, fillWorkers(), edm::PathContext::kPath, Skims_PA_cff::name, skippingEvent_, streamContext_, and trig_paths_.

Referenced by StreamSchedule().

860  {
861  PathWorkers tmpworkers;
862  fillWorkers(
863  proc_pset, preg, prealloc, processConfiguration, name, false, tmpworkers, endPathNames, conditionalTaskHelper);
864 
865  // an empty path will cause an extra bit that is not used
866  if (!tmpworkers.empty()) {
867  trig_paths_.emplace_back(bitpos,
868  name,
869  tmpworkers,
870  trptr,
871  actionTable(),
872  actReg_,
876  } else {
877  empty_trig_paths_.push_back(bitpos);
878  }
879  for (WorkerInPath const& workerInPath : tmpworkers) {
880  addToAllWorkers(workerInPath.getWorker());
881  }
882  }
ExceptionToActionTable const & actionTable() const
returns the action table
std::vector< int > empty_trig_paths_
void addToAllWorkers(Worker *w)
std::shared_ptr< ActivityRegistry > actReg_
std::vector< WorkerInPath > PathWorkers
StreamContext streamContext_
std::atomic< bool > skippingEvent_
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)

◆ fillWorkers()

void edm::StreamSchedule::fillWorkers ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
std::string const &  name,
bool  ignoreFilters,
PathWorkers out,
std::vector< std::string > const &  endPathNames,
ConditionalTaskHelper const &  conditionalTaskHelper 
)
private

Definition at line 749 of file StreamSchedule.cc.

References edm::ConditionalTaskHelper::aliasMap(), edm::ConditionalTaskHelper::conditionalModuleBranches(), edm::errors::Configuration, edm::Worker::description(), Exception, edm::ParameterSet::getParameter(), edm::ParameterSet::getUntrackedParameter(), edm::WorkerInPath::Ignore, edm::Worker::kFilter, HerwigMaxPtPartonFilter_cfi::moduleLabel, edm::ModuleDescription::moduleName(), edm::Worker::moduleType(), Skims_PA_cff::name, edm::WorkerInPath::Normal, or, MillePedeFileConverter_cfg::out, hltMonBTagIPClient_cfi::pathName, edm::search_all(), AlCaHLTBitMon_QueryRunRegistry::string, tryToPlaceConditionalModules(), edm::WorkerInPath::Veto, and workerManager_.

Referenced by fillEndPath(), and fillTrigPath().

757  {
758  vstring modnames = proc_pset.getParameter<vstring>(pathName);
759  PathWorkers tmpworkers;
760 
761  //Pull out ConditionalTask modules
762  auto condRange = findConditionalTaskModulesRange(modnames);
763 
764  std::unordered_set<std::string> conditionalmods;
765  //An EDAlias may be redirecting to a module on a ConditionalTask
766  std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModsBranches;
767  std::unordered_map<std::string, unsigned int> conditionalModOrder;
768  if (condRange.first != condRange.second) {
769  for (auto it = condRange.first; it != condRange.second; ++it) {
770  // ordering needs to skip the # token in the path list
771  conditionalModOrder.emplace(*it, it - modnames.begin() - 1);
772  }
773  //the last entry should be ignored since it is required to be "@"
774  conditionalmods = std::unordered_set<std::string>(std::make_move_iterator(condRange.first),
775  std::make_move_iterator(condRange.second));
776 
777  conditionalModsBranches = conditionalTaskHelper.conditionalModuleBranches(conditionalmods);
778  modnames.erase(std::prev(condRange.first), modnames.end());
779  }
780 
781  unsigned int placeInPath = 0;
782  for (auto const& name : modnames) {
783  //Modules except EDFilters are set to run concurrently by default
784  bool doNotRunConcurrently = false;
786  if (name[0] == '!') {
787  filterAction = WorkerInPath::Veto;
788  } else if (name[0] == '-' or name[0] == '+') {
789  filterAction = WorkerInPath::Ignore;
790  }
791  if (name[0] == '|' or name[0] == '+') {
792  //cms.wait was specified so do not run concurrently
793  doNotRunConcurrently = true;
794  }
795 
797  if (filterAction != WorkerInPath::Normal or name[0] == '|') {
798  moduleLabel.erase(0, 1);
799  }
800 
801  Worker* worker = getWorker(moduleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration);
802  if (worker == nullptr) {
803  std::string pathType("endpath");
804  if (!search_all(endPathNames, pathName)) {
805  pathType = std::string("path");
806  }
808  << "The unknown module label \"" << moduleLabel << "\" appears in " << pathType << " \"" << pathName
809  << "\"\n please check spelling or remove that label from the path.";
810  }
811 
812  if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType() == Worker::kFilter) {
813  // We have a filter on an end path, and the filter is not explicitly ignored.
814  // See if the filter is allowed.
815  std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
816  if (!search_all(allowed_filters, worker->description()->moduleName())) {
817  // Filter is not allowed. Ignore the result, and issue a warning.
818  filterAction = WorkerInPath::Ignore;
819  LogWarning("FilterOnEndPath") << "The EDFilter '" << worker->description()->moduleName()
820  << "' with module label '" << moduleLabel << "' appears on EndPath '"
821  << pathName << "'.\n"
822  << "The return value of the filter will be ignored.\n"
823  << "To suppress this warning, either remove the filter from the endpath,\n"
824  << "or explicitly ignore it in the configuration by using cms.ignore().\n";
825  }
826  }
827  bool runConcurrently = not doNotRunConcurrently;
828  if (runConcurrently && worker->moduleType() == Worker::kFilter and filterAction != WorkerInPath::Ignore) {
829  runConcurrently = false;
830  }
831 
832  auto condModules = tryToPlaceConditionalModules(worker,
833  conditionalmods,
834  conditionalModsBranches,
835  conditionalTaskHelper.aliasMap(),
836  proc_pset,
837  preg,
838  prealloc,
839  processConfiguration);
840  for (auto condMod : condModules) {
841  tmpworkers.emplace_back(
842  condMod, WorkerInPath::Ignore, conditionalModOrder[condMod->description()->moduleLabel()], true);
843  }
844 
845  tmpworkers.emplace_back(worker, filterAction, placeInPath, runConcurrently);
846  ++placeInPath;
847  }
848 
849  out.swap(tmpworkers);
850  }
vector< string > vstring
Definition: ExoticaDQM.cc:7
std::vector< Worker * > tryToPlaceConditionalModules(Worker *, std::unordered_set< std::string > &conditionalModules, std::unordered_multimap< std::string, edm::BranchDescription const *> const &conditionalModuleBranches, std::unordered_multimap< std::string, AliasInfo > const &aliasMap, ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
WorkerManager workerManager_
std::vector< WorkerInPath > PathWorkers
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:36
Log< level::Warning, false > LogWarning

◆ finishedPaths()

void edm::StreamSchedule::finishedPaths ( std::atomic< std::exception_ptr *> &  iExcept,
WaitingTaskHolder  iWait,
EventTransitionInfo info 
)
private

Definition at line 1054 of file StreamSchedule.cc.

References writedatasetfile::action, actionTable(), cms::Exception::addContext(), cms::cuda::assert(), CMS_SA_ALLOW, cms::Exception::context(), edm::WaitingTaskHolder::doneWaiting(), MillePedeFileConverter_cfg::e, edm::exception_actions::FailPath, edm::ExceptionToActionTable::find(), edm::propagate_const< T >::get(), edm::exception_actions::IgnoreCompletely, info(), edm::printCmsExceptionWarning(), results_, results_inserter_, edm::exception_actions::SkipEvent, streamContext_, streamID_, and total_passed_.

Referenced by processOneEventAsync().

1056  {
1057  if (iExcept) {
1058  // Caught exception is propagated via WaitingTaskHolder
1059  CMS_SA_ALLOW try { std::rethrow_exception(*(iExcept.load())); } catch (cms::Exception& e) {
1064  edm::printCmsExceptionWarning("SkipEvent", e);
1065  *(iExcept.load()) = std::exception_ptr();
1066  } else {
1067  *(iExcept.load()) = std::current_exception();
1068  }
1069  } catch (...) {
1070  *(iExcept.load()) = std::current_exception();
1071  }
1072  }
1073 
1074  if ((not iExcept) and results_->accept()) {
1075  ++total_passed_;
1076  }
1077 
1078  if (nullptr != results_inserter_.get()) {
1079  // Caught exception is propagated to the caller
1080  CMS_SA_ALLOW try {
1081  //Even if there was an exception, we need to allow results inserter
1082  // to run since some module may be waiting on its results.
1083  ParentContext parentContext(&streamContext_);
1084  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
1085 
1086  auto expt = results_inserter_->runModuleDirectly<Traits>(info, streamID_, parentContext, &streamContext_);
1087  if (expt) {
1088  std::rethrow_exception(expt);
1089  }
1090  } catch (cms::Exception& ex) {
1091  if (not iExcept) {
1092  if (ex.context().empty()) {
1093  std::ostringstream ost;
1094  ost << "Processing Event " << info.principal().id();
1095  ex.addContext(ost.str());
1096  }
1097  iExcept.store(new std::exception_ptr(std::current_exception()));
1098  }
1099  } catch (...) {
1100  if (not iExcept) {
1101  iExcept.store(new std::exception_ptr(std::current_exception()));
1102  }
1103  }
1104  }
1105  std::exception_ptr ptr;
1106  if (iExcept) {
1107  ptr = *iExcept.load();
1108  }
1109  iWait.doneWaiting(ptr);
1110  }
ExceptionToActionTable const & actionTable() const
returns the action table
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
edm::propagate_const< WorkerPtr > results_inserter_
assert(be >=bs)
constexpr element_type const * get() const
StreamContext streamContext_
exception_actions::ActionCodes find(const std::string &category) const
edm::propagate_const< TrigResPtr > results_
void addContext(std::string const &context)
Definition: Exception.cc:169
std::list< std::string > const & context() const
Definition: Exception.cc:151
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)

◆ finishProcessOneEvent()

std::exception_ptr edm::StreamSchedule::finishProcessOneEvent ( std::exception_ptr  iExcept)
private

Definition at line 1112 of file StreamSchedule.cc.

References actReg_, edm::addContextAndPrintException(), CMS_SA_ALLOW, cms::Exception::context(), edm::ExceptionFromThisContext, resetEarlyDelete(), streamContext_, and edm::convertException::wrap().

Referenced by processOneEventAsync().

1112  {
1113  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
1114 
1115  if (iExcept) {
1116  //add context information to the exception and print message
1117  try {
1118  convertException::wrap([&]() { std::rethrow_exception(iExcept); });
1119  } catch (cms::Exception& ex) {
1120  bool const cleaningUpAfterException = false;
1121  if (ex.context().empty()) {
1122  addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
1123  } else {
1124  addContextAndPrintException("", ex, cleaningUpAfterException);
1125  }
1126  iExcept = std::current_exception();
1127  }
1128 
1129  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
1130  }
1131  // Caught exception is propagated to the caller
1132  CMS_SA_ALLOW try { Traits::postScheduleSignal(actReg_.get(), &streamContext_); } catch (...) {
1133  if (not iExcept) {
1134  iExcept = std::current_exception();
1135  }
1136  }
1137  if (not iExcept) {
1138  resetEarlyDelete();
1139  }
1140 
1141  return iExcept;
1142  }
#define CMS_SA_ALLOW
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
std::shared_ptr< ActivityRegistry > actReg_
StreamContext streamContext_
auto wrap(F iFunc) -> decltype(iFunc())
std::list< std::string > const & context() const
Definition: Exception.cc:151

◆ getAllModuleDescriptions()

std::vector< ModuleDescription const * > edm::StreamSchedule::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this StreamSchedule. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 937 of file StreamSchedule.cc.

References allWorkers(), AlCaHLTBitMon_ParallelJobs::p, mps_fire::result, and findQualityFiles::size.

937  {
938  std::vector<ModuleDescription const*> result;
939  result.reserve(allWorkers().size());
940 
941  for (auto const& worker : allWorkers()) {
942  ModuleDescription const* p = worker->description();
943  result.push_back(p);
944  }
945  return result;
946  }
size
Write out results.
AllWorkers const & allWorkers() const
returns the collection of pointers to workers

◆ getTriggerReport()

void edm::StreamSchedule::getTriggerReport ( TriggerReport rep) const

Return the trigger report information on paths, modules-in-path, modules-in-endpath, and modules.

Definition at line 1266 of file StreamSchedule.cc.

References allWorkers(), end_paths_, edm::fillPathSummary(), edm::fillWorkerSummary(), cuy::rep, totalEvents(), totalEventsFailed(), totalEventsPassed(), and trig_paths_.

1266  {
1267  rep.eventSummary.totalEvents += totalEvents();
1268  rep.eventSummary.totalEventsPassed += totalEventsPassed();
1269  rep.eventSummary.totalEventsFailed += totalEventsFailed();
1270 
1271  fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
1272  fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
1273  fill_summary(allWorkers(), rep.workerSummaries, &fillWorkerSummary);
1274  }
int totalEventsPassed() const
int totalEventsFailed() const
static void fillPathSummary(Path const &path, PathSummary &sum)
rep
Definition: cuy.py:1189
int totalEvents() const
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
AllWorkers const & allWorkers() const
returns the collection of pointers to workers

◆ initializeEarlyDelete()

void edm::StreamSchedule::initializeEarlyDelete ( ModuleRegistry modReg,
std::vector< std::string > const &  branchesToDeleteEarly,
std::multimap< std::string, std::string > const &  referencesToBranches,
std::vector< std::string > const &  modulesToSkip,
edm::ProductRegistry const &  preg 
)

Definition at line 460 of file StreamSchedule.cc.

References allWorkers(), cms::cuda::assert(), MicroEventContent_cff::branch, edm::maker::ModuleHolder::createOutputModuleCommunicator(), dumpMFGeometry_cfg::delta, submitPVResolutionJobs::desc, earlyDeleteBranchToCount_, earlyDeleteHelpers_, earlyDeleteHelperToBranchIndicies_, end_paths_, edm::ModuleRegistry::forAllModuleHolders(), dqm-mbProfile::format, newFWLiteAna::found, edm::InEvent, B2GTnPMonitor_cfi::item, gpuClustering::pixelStatus::kEmpty, MainPageGenerator::l, dqmiodumpmetadata::n, AlCaHLTBitMon_ParallelJobs::p, resetEarlyDelete(), AlCaHLTBitMon_QueryRunRegistry::string, groupFilesInBlocks::temp, trig_paths_, mitigatedMETSequence_cff::U, and w().

464  {
465  // setup the list with those products actually registered for this job
466  std::multimap<std::string, Worker*> branchToReadingWorker;
467  initializeBranchToReadingWorker(branchesToDeleteEarly, preg, branchToReadingWorker);
468 
469  const std::vector<std::string> kEmpty;
470  std::map<Worker*, unsigned int> reserveSizeForWorker;
471  unsigned int upperLimitOnReadingWorker = 0;
472  unsigned int upperLimitOnIndicies = 0;
473  unsigned int nUniqueBranchesToDelete = branchToReadingWorker.size();
474 
475  //talk with output modules first
476  modReg.forAllModuleHolders([&branchToReadingWorker, &nUniqueBranchesToDelete](maker::ModuleHolder* iHolder) {
477  auto comm = iHolder->createOutputModuleCommunicator();
478  if (comm) {
479  if (!branchToReadingWorker.empty()) {
480  //If an OutputModule needs a product, we can't delete it early
481  // so we should remove it from our list
482  SelectedProductsForBranchType const& kept = comm->keptProducts();
483  for (auto const& item : kept[InEvent]) {
484  BranchDescription const& desc = *item.first;
485  auto found = branchToReadingWorker.equal_range(desc.branchName());
486  if (found.first != found.second) {
487  --nUniqueBranchesToDelete;
488  branchToReadingWorker.erase(found.first, found.second);
489  }
490  }
491  }
492  }
493  });
494 
495  if (branchToReadingWorker.empty()) {
496  return;
497  }
498 
499  std::unordered_set<std::string> modulesToExclude(modulesToSkip.begin(), modulesToSkip.end());
500  for (auto w : allWorkers()) {
501  if (modulesToExclude.end() != modulesToExclude.find(w->description()->moduleLabel())) {
502  continue;
503  }
504  //determine if this module could read a branch we want to delete early
505  auto consumes = w->consumesInfo();
506  if (not consumes.empty()) {
507  bool foundAtLeastOneMatchingBranch = false;
508  for (auto const& product : consumes) {
509  std::string branch = fmt::format("{}_{}_{}_{}",
510  product.type().friendlyClassName(),
511  product.label().data(),
512  product.instance().data(),
513  product.process().data());
514  {
515  //Handle case where worker directly consumes product
516  auto found = branchToReadingWorker.end();
517  if (product.process().empty()) {
518  auto startFound = branchToReadingWorker.lower_bound(branch);
519  if (startFound != branchToReadingWorker.end()) {
520  if (startFound->first.substr(0, branch.size()) == branch) {
521  //match all processNames here, even if it means multiple matches will happen
522  found = startFound;
523  }
524  }
525  } else {
526  auto exactFound = branchToReadingWorker.equal_range(branch);
527  if (exactFound.first != exactFound.second) {
528  found = exactFound.first;
529  }
530  }
531  if (found != branchToReadingWorker.end()) {
532  if (not foundAtLeastOneMatchingBranch) {
533  ++upperLimitOnReadingWorker;
534  foundAtLeastOneMatchingBranch = true;
535  }
536  ++upperLimitOnIndicies;
537  ++reserveSizeForWorker[w];
538  if (nullptr == found->second) {
539  found->second = w;
540  } else {
541  branchToReadingWorker.insert(make_pair(found->first, w));
542  }
543  }
544  }
545  {
546  //Handle case where indirectly consumes product
547  auto found = referencesToBranches.end();
548  if (product.process().empty()) {
549  auto startFound = referencesToBranches.lower_bound(branch);
550  if (startFound != referencesToBranches.end()) {
551  if (startFound->first.substr(0, branch.size()) == branch) {
552  //match all processNames here, even if it means multiple matches will happen
553  found = startFound;
554  }
555  }
556  } else {
557  //can match exactly
558  auto exactFound = referencesToBranches.equal_range(branch);
559  if (exactFound.first != exactFound.second) {
560  found = exactFound.first;
561  }
562  }
563  if (found != referencesToBranches.end()) {
564  for (auto itr = found; (itr != referencesToBranches.end()) and (itr->first == found->first); ++itr) {
565  auto foundInBranchToReadingWorker = branchToReadingWorker.find(itr->second);
566  if (foundInBranchToReadingWorker == branchToReadingWorker.end()) {
567  continue;
568  }
569  if (not foundAtLeastOneMatchingBranch) {
570  ++upperLimitOnReadingWorker;
571  foundAtLeastOneMatchingBranch = true;
572  }
573  ++upperLimitOnIndicies;
574  ++reserveSizeForWorker[w];
575  if (nullptr == foundInBranchToReadingWorker->second) {
576  foundInBranchToReadingWorker->second = w;
577  } else {
578  branchToReadingWorker.insert(make_pair(itr->second, w));
579  }
580  }
581  }
582  }
583  }
584  }
585  }
586  {
587  auto it = branchToReadingWorker.begin();
588  std::vector<std::string> unusedBranches;
589  while (it != branchToReadingWorker.end()) {
590  if (it->second == nullptr) {
591  unusedBranches.push_back(it->first);
592  //erasing the object invalidates the iterator so must advance it first
593  auto temp = it;
594  ++it;
595  branchToReadingWorker.erase(temp);
596  } else {
597  ++it;
598  }
599  }
600  if (not unusedBranches.empty()) {
601  LogWarning l("UnusedProductsForCanDeleteEarly");
602  l << "The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
603  " If possible, remove the producer from the job.";
604  for (auto const& n : unusedBranches) {
605  l << "\n " << n;
606  }
607  }
608  }
609  if (!branchToReadingWorker.empty()) {
610  earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
611  earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies, 0);
612  earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
613  std::map<const Worker*, EarlyDeleteHelper*> alreadySeenWorkers;
614  std::string lastBranchName;
615  size_t nextOpenIndex = 0;
616  unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
617  for (auto& branchAndWorker : branchToReadingWorker) {
618  if (lastBranchName != branchAndWorker.first) {
619  //have to put back the period we removed earlier in order to get the proper name
620  BranchID bid(branchAndWorker.first + ".");
621  earlyDeleteBranchToCount_.emplace_back(bid, 0U);
622  lastBranchName = branchAndWorker.first;
623  }
624  auto found = alreadySeenWorkers.find(branchAndWorker.second);
625  if (alreadySeenWorkers.end() == found) {
626  //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
627  // all the branches that might be read by this worker. However, initially we will only tell the
628  // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
629  // EarlyDeleteHelper will automatically advance its internal end pointer.
630  size_t index = nextOpenIndex;
631  size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
634  earlyDeleteHelpers_.emplace_back(beginAddress + index, beginAddress + index + 1, &earlyDeleteBranchToCount_);
635  branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
636  alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second, &(earlyDeleteHelpers_.back())));
637  nextOpenIndex += nIndices;
638  } else {
639  found->second->appendIndex(earlyDeleteBranchToCount_.size() - 1);
640  }
641  }
642 
643  //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
644  // space needed for each module
645  auto itLast = earlyDeleteHelpers_.begin();
646  for (auto it = earlyDeleteHelpers_.begin() + 1; it != earlyDeleteHelpers_.end(); ++it) {
647  if (itLast->end() != it->begin()) {
648  //figure the offset for next Worker since it hasn't been moved yet so it has the original address
649  unsigned int delta = it->begin() - itLast->end();
650  it->shiftIndexPointers(delta);
651 
653  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
654  earlyDeleteHelperToBranchIndicies_.begin() + (it->begin() - beginAddress));
655  }
656  itLast = it;
657  }
659  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
661 
662  //now tell the paths about the deleters
663  for (auto& p : trig_paths_) {
664  p.setEarlyDeleteHelpers(alreadySeenWorkers);
665  }
666  for (auto& p : end_paths_) {
667  p.setEarlyDeleteHelpers(alreadySeenWorkers);
668  }
670  }
671  }
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
T w() const
std::vector< BranchToCount > earlyDeleteBranchToCount_
assert(be >=bs)
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
Log< level::Warning, false > LogWarning
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_

◆ makePathStatusInserters()

void edm::StreamSchedule::makePathStatusInserters ( std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters,
std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &  endPathStatusInserters,
ExceptionToActionTable const &  actions 
)
private

Definition at line 1305 of file StreamSchedule.cc.

References actions, actReg_, addToAllWorkers(), empty_end_paths_, empty_trig_paths_, end_paths_, endPathStatusInserterWorkers_, edm::get_underlying(), pathStatusInserterWorkers_, and trig_paths_.

Referenced by StreamSchedule().

1308  {
1309  int bitpos = 0;
1310  unsigned int indexEmpty = 0;
1311  unsigned int indexOfPath = 0;
1312  for (auto& pathStatusInserter : pathStatusInserters) {
1313  std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
1314  WorkerPtr workerPtr(
1315  new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1316  pathStatusInserterWorkers_.emplace_back(workerPtr);
1317  workerPtr->setActivityRegistry(actReg_);
1318  addToAllWorkers(workerPtr.get());
1319 
1320  // A little complexity here because a C++ Path object is not
1321  // instantiated and put into end_paths if there are no modules
1322  // on the configured path.
1323  if (indexEmpty < empty_trig_paths_.size() && bitpos == empty_trig_paths_.at(indexEmpty)) {
1324  ++indexEmpty;
1325  } else {
1326  trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(), workerPtr.get());
1327  ++indexOfPath;
1328  }
1329  ++bitpos;
1330  }
1331 
1332  bitpos = 0;
1333  indexEmpty = 0;
1334  indexOfPath = 0;
1335  for (auto& endPathStatusInserter : endPathStatusInserters) {
1336  std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
1337  WorkerPtr workerPtr(
1338  new edm::WorkerT<EndPathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1339  endPathStatusInserterWorkers_.emplace_back(workerPtr);
1340  workerPtr->setActivityRegistry(actReg_);
1341  addToAllWorkers(workerPtr.get());
1342 
1343  // A little complexity here because a C++ Path object is not
1344  // instantiated and put into end_paths if there are no modules
1345  // on the configured path.
1346  if (indexEmpty < empty_end_paths_.size() && bitpos == empty_end_paths_.at(indexEmpty)) {
1347  ++indexEmpty;
1348  } else {
1349  end_paths_.at(indexOfPath).setPathStatusInserter(nullptr, workerPtr.get());
1350  ++indexOfPath;
1351  }
1352  ++bitpos;
1353  }
1354  }
std::vector< int > empty_trig_paths_
roAction_t actions[nactions]
Definition: GenABIO.cc:181
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void addToAllWorkers(Worker *w)
std::shared_ptr< Worker > WorkerPtr
std::shared_ptr< ActivityRegistry > actReg_
std::vector< int > empty_end_paths_
constexpr T & get_underlying(propagate_const< T > &)
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_

◆ moduleDescriptionsInEndPath()

void edm::StreamSchedule::moduleDescriptionsInEndPath ( std::string const &  iEndPathLabel,
std::vector< ModuleDescription const *> &  descriptions,
unsigned int  hint 
) const

Definition at line 1194 of file StreamSchedule.cc.

References end_paths_, newFWLiteAna::found, mps_fire::i, and edm::Path::name().

1196  {
1197  descriptions.clear();
1198  bool found = false;
1199  TrigPaths::const_iterator itFound;
1200 
1201  if (hint < end_paths_.size()) {
1202  itFound = end_paths_.begin() + hint;
1203  if (itFound->name() == iEndPathLabel)
1204  found = true;
1205  }
1206  if (!found) {
1207  // if the hint did not work, do it the slow way
1208  itFound = std::find_if(
1209  end_paths_.begin(),
1210  end_paths_.end(),
1211  std::bind(std::equal_to<std::string>(), iEndPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1212  if (itFound != end_paths_.end())
1213  found = true;
1214  }
1215  if (found) {
1216  descriptions.reserve(itFound->size());
1217  for (size_t i = 0; i < itFound->size(); ++i) {
1218  descriptions.push_back(itFound->getWorker(i)->description());
1219  }
1220  }
1221  }
std::string const & name() const
Definition: Path.h:74

◆ moduleDescriptionsInPath()

void edm::StreamSchedule::moduleDescriptionsInPath ( std::string const &  iPathLabel,
std::vector< ModuleDescription const *> &  descriptions,
unsigned int  hint 
) const

Definition at line 1165 of file StreamSchedule.cc.

References newFWLiteAna::found, mps_fire::i, edm::Path::name(), and trig_paths_.

1167  {
1168  descriptions.clear();
1169  bool found = false;
1170  TrigPaths::const_iterator itFound;
1171 
1172  if (hint < trig_paths_.size()) {
1173  itFound = trig_paths_.begin() + hint;
1174  if (itFound->name() == iPathLabel)
1175  found = true;
1176  }
1177  if (!found) {
1178  // if the hint did not work, do it the slow way
1179  itFound = std::find_if(
1180  trig_paths_.begin(),
1181  trig_paths_.end(),
1182  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1183  if (itFound != trig_paths_.end())
1184  found = true;
1185  }
1186  if (found) {
1187  descriptions.reserve(itFound->size());
1188  for (size_t i = 0; i < itFound->size(); ++i) {
1189  descriptions.push_back(itFound->getWorker(i)->description());
1190  }
1191  }
1192  }
std::string const & name() const
Definition: Path.h:74

◆ modulesInPath()

void edm::StreamSchedule::modulesInPath ( std::string const &  iPathLabel,
std::vector< std::string > &  oLabelsToFill 
) const

adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel

Definition at line 1152 of file StreamSchedule.cc.

References mps_fire::i, edm::Path::name(), and trig_paths_.

1152  {
1153  TrigPaths::const_iterator itFound = std::find_if(
1154  trig_paths_.begin(),
1155  trig_paths_.end(),
1156  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1157  if (itFound != trig_paths_.end()) {
1158  oLabelsToFill.reserve(itFound->size());
1159  for (size_t i = 0; i < itFound->size(); ++i) {
1160  oLabelsToFill.push_back(itFound->getWorker(i)->description()->moduleLabel());
1161  }
1162  }
1163  }
std::string const & name() const
Definition: Path.h:74

◆ numberOfUnscheduledModules()

unsigned int edm::StreamSchedule::numberOfUnscheduledModules ( ) const
inline

Definition at line 259 of file StreamSchedule.h.

References number_of_unscheduled_modules_.

unsigned int number_of_unscheduled_modules_

◆ processOneEventAsync()

void edm::StreamSchedule::processOneEventAsync ( WaitingTaskHolder  iTask,
EventTransitionInfo info,
ServiceToken const &  token,
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters 
)

Definition at line 948 of file StreamSchedule.cc.

References actReg_, CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), empty_end_paths_, empty_trig_paths_, end_paths_, endPathStatusInserterWorkers_, SiStripBadComponentsDQMServiceTemplate_cfg::ep, finishedPaths(), finishProcessOneEvent(), edm::WaitingTaskHolder::group(), info(), edm::ServiceWeakToken::lock(), edm::make_waiting_task(), eostools::move(), edm::hlt::Pass, pathStatusInserterWorkers_, edm::WorkerManager::processAccumulatorsAsync(), resetAll(), results_, edm::WorkerManager::setupOnDemandSystem(), edm::WorkerManager::setupResolvers(), streamContext_, streamID_, total_events_, trig_paths_, and workerManager_.

952  {
953  EventPrincipal& ep = info.principal();
954 
955  // Caught exception is propagated via WaitingTaskHolder
956  CMS_SA_ALLOW try {
957  this->resetAll();
958 
959  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
960 
961  Traits::setStreamContext(streamContext_, ep);
962  //a service may want to communicate with another service
963  ServiceRegistry::Operate guard(serviceToken);
964  Traits::preScheduleSignal(actReg_.get(), &streamContext_);
965 
966  // Data dependencies need to be set up before marking empty
967  // (End)Paths complete in case something consumes the status of
968  // the empty (EndPath)
971 
972  HLTPathStatus hltPathStatus(hlt::Pass, 0);
973  for (int empty_trig_path : empty_trig_paths_) {
974  results_->at(empty_trig_path) = hltPathStatus;
975  pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
976  std::exception_ptr except = pathStatusInserterWorkers_[empty_trig_path]
977  ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
978  info, streamID_, ParentContext(&streamContext_), &streamContext_);
979  if (except) {
980  iTask.doneWaiting(except);
981  return;
982  }
983  }
984  for (int empty_end_path : empty_end_paths_) {
985  std::exception_ptr except = endPathStatusInserterWorkers_[empty_end_path]
986  ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
987  info, streamID_, ParentContext(&streamContext_), &streamContext_);
988  if (except) {
989  iTask.doneWaiting(except);
990  return;
991  }
992  }
993 
994  ++total_events_;
995 
996  //use to give priorities on an error to ones from Paths
997  auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(nullptr);
998  auto pathErrorPtr = pathErrorHolder.get();
999  ServiceWeakToken weakToken = serviceToken;
1000  auto allPathsDone = make_waiting_task(
1001  [iTask, this, weakToken, pathError = std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable {
1002  ServiceRegistry::Operate operate(weakToken.lock());
1003 
1004  std::exception_ptr ptr;
1005  if (pathError->load()) {
1006  ptr = *pathError->load();
1007  delete pathError->load();
1008  }
1009  if ((not ptr) and iPtr) {
1010  ptr = *iPtr;
1011  }
1012  iTask.doneWaiting(finishProcessOneEvent(ptr));
1013  });
1014  //The holder guarantees that if the paths finish before the loop ends
1015  // that we do not start too soon. It also guarantees that the task will
1016  // run under that condition.
1017  WaitingTaskHolder allPathsHolder(*iTask.group(), allPathsDone);
1018 
1019  auto pathsDone = make_waiting_task([allPathsHolder, pathErrorPtr, transitionInfo = info, this, weakToken](
1020  std::exception_ptr const* iPtr) mutable {
1021  ServiceRegistry::Operate operate(weakToken.lock());
1022 
1023  if (iPtr) {
1024  //this is used to prioritize this error over one
1025  // that happens in EndPath or Accumulate
1026  pathErrorPtr->store(new std::exception_ptr(*iPtr));
1027  }
1028  finishedPaths(*pathErrorPtr, std::move(allPathsHolder), transitionInfo);
1029  });
1030 
1031  //The holder guarantees that if the paths finish before the loop ends
1032  // that we do not start too soon. It also guarantees that the task will
1033  // run under that condition.
1034  WaitingTaskHolder taskHolder(*iTask.group(), pathsDone);
1035 
1036  //start end paths first so on single threaded the paths will run first
1037  WaitingTaskHolder hAllPathsDone(*iTask.group(), allPathsDone);
1038  for (auto it = end_paths_.rbegin(), itEnd = end_paths_.rend(); it != itEnd; ++it) {
1039  it->processOneOccurrenceAsync(hAllPathsDone, info, serviceToken, streamID_, &streamContext_);
1040  }
1041 
1042  for (auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend(); it != itEnd; ++it) {
1043  it->processOneOccurrenceAsync(taskHolder, info, serviceToken, streamID_, &streamContext_);
1044  }
1045 
1046  ParentContext parentContext(&streamContext_);
1047  workerManager_.processAccumulatorsAsync<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
1048  hAllPathsDone, info, serviceToken, streamID_, parentContext, &streamContext_);
1049  } catch (...) {
1050  iTask.doneWaiting(std::current_exception());
1051  }
1052  }
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
std::vector< int > empty_trig_paths_
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void processAccumulatorsAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
void setupOnDemandSystem(EventTransitionInfo const &)
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
std::vector< int > empty_end_paths_
accept
Definition: HLTenums.h:18
StreamContext streamContext_
void finishedPaths(std::atomic< std::exception_ptr *> &, WaitingTaskHolder, EventTransitionInfo &)
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
edm::propagate_const< TrigResPtr > results_
void setupResolvers(Principal &principal)
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
def move(src, dest)
Definition: eostools.py:511

◆ processOneStreamAsync()

template<typename T >
void edm::StreamSchedule::processOneStreamAsync ( WaitingTaskHolder  iTask,
typename T::TransitionInfoType &  transitionInfo,
ServiceToken const &  token,
bool  cleaningUpAfterException = false 
)

Definition at line 394 of file StreamSchedule.h.

References actReg_, edm::addContextAndPrintException(), CMS_SA_ALLOW, cms::Exception::context(), edm::WaitingTaskHolder::doneWaiting(), end_paths_, edm::ExceptionFromThisContext, edm::WaitingTaskHolder::group(), h, l1ctLayer2EG_cff::id, info(), edm::ServiceWeakToken::lock(), edm::make_functor_task(), edm::make_waiting_task(), AlCaHLTBitMon_ParallelJobs::p, edm::WorkerManager::processOneOccurrenceAsync(), edm::WorkerManager::resetAll(), alignCSCRings::s, streamContext_, streamID_, TrackValidation_cff::task, unpackBuffers-CaloStage2::token, edm::transitionName(), trig_paths_, edm::StreamID::value(), workerManager_, and edm::convertException::wrap().

397  {
398  auto const& principal = transitionInfo.principal();
399  T::setStreamContext(streamContext_, principal);
400 
401  auto id = principal.id();
402  ServiceWeakToken weakToken = token;
403  auto doneTask = make_waiting_task(
404  [this, iHolder, id, cleaningUpAfterException, weakToken](std::exception_ptr const* iPtr) mutable {
405  std::exception_ptr excpt;
406  if (iPtr) {
407  excpt = *iPtr;
408  //add context information to the exception and print message
409  try {
410  convertException::wrap([&]() { std::rethrow_exception(excpt); });
411  } catch (cms::Exception& ex) {
412  //TODO: should add the transition type info
413  std::ostringstream ost;
414  if (ex.context().empty()) {
415  ost << "Processing " << T::transitionName() << " " << id;
416  }
417  ServiceRegistry::Operate op(weakToken.lock());
418  addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
419  excpt = std::current_exception();
420  }
421 
422  ServiceRegistry::Operate op(weakToken.lock());
423  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
424  }
425  // Caught exception is propagated via WaitingTaskHolder
426  CMS_SA_ALLOW try {
427  ServiceRegistry::Operate op(weakToken.lock());
428  T::postScheduleSignal(actReg_.get(), &streamContext_);
429  } catch (...) {
430  if (not excpt) {
431  excpt = std::current_exception();
432  }
433  }
434  iHolder.doneWaiting(excpt);
435  });
436 
437  auto task = make_functor_task(
438  [this, h = WaitingTaskHolder(*iHolder.group(), doneTask), info = transitionInfo, weakToken]() mutable {
439  auto token = weakToken.lock();
441  // Caught exception is propagated via WaitingTaskHolder
442  CMS_SA_ALLOW try {
443  T::preScheduleSignal(actReg_.get(), &streamContext_);
444 
446  } catch (...) {
447  h.doneWaiting(std::current_exception());
448  return;
449  }
450 
451  for (auto& p : end_paths_) {
452  p.runAllModulesAsync<T>(h, info, token, streamID_, &streamContext_);
453  }
454 
455  for (auto& p : trig_paths_) {
456  p.runAllModulesAsync<T>(h, info, token, streamID_, &streamContext_);
457  }
458 
460  });
461 
462  if (streamID_.value() == 0) {
463  //Enqueueing will start another thread if there is only
464  // one thread in the job. Having stream == 0 use spawn
465  // avoids starting up another thread when there is only one stream.
466  iHolder.group()->run([task]() {
467  TaskSentry s{task};
468  task->execute();
469  });
470  } else {
471  oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
472  arena.enqueue([task]() {
473  TaskSentry s{task};
474  task->execute();
475  });
476  }
477  }
std::string_view transitionName(GlobalContext::Transition)
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
void processOneOccurrenceAsync(WaitingTaskHolder, typename T::TransitionInfoType &, ServiceToken const &, StreamID, typename T::Context const *topContext, U const *context)
StreamContext streamContext_
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
FunctorTask< F > * make_functor_task(F f)
Definition: FunctorTask.h:44
unsigned int value() const
Definition: StreamID.h:43
auto wrap(F iFunc) -> decltype(iFunc())
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
std::list< std::string > const & context() const
Definition: Exception.cc:151
long double T

◆ replaceModule()

void edm::StreamSchedule::replaceModule ( maker::ModuleHolder iMod,
std::string const &  iLabel 
)

clone the type of module with label iLabel but configure with iPSet.

Definition at line 919 of file StreamSchedule.cc.

References allWorkers(), newFWLiteAna::found, edm::maker::ModuleHolder::replaceModuleFor(), streamContext_, and streamID_.

919  {
920  Worker* found = nullptr;
921  for (auto const& worker : allWorkers()) {
922  if (worker->description()->moduleLabel() == iLabel) {
923  found = worker;
924  break;
925  }
926  }
927  if (nullptr == found) {
928  return;
929  }
930 
931  iMod->replaceModuleFor(found);
932  found->beginStream(streamID_, streamContext_);
933  }
StreamContext streamContext_
AllWorkers const & allWorkers() const
returns the collection of pointers to workers

◆ reportSkipped()

void edm::StreamSchedule::reportSkipped ( EventPrincipal const &  ep) const
inlineprivate

Definition at line 388 of file StreamSchedule.h.

References SiStripBadComponentsDQMServiceTemplate_cfg::ep.

388  {
389  Service<JobReport> reportSvc;
390  reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
391  }

◆ resetAll()

void edm::StreamSchedule::resetAll ( )
private

Definition at line 1284 of file StreamSchedule.cc.

References results_, and skippingEvent_.

Referenced by processOneEventAsync().

1284  {
1285  skippingEvent_ = false;
1286  results_->reset();
1287  }
edm::propagate_const< TrigResPtr > results_
std::atomic< bool > skippingEvent_

◆ resetEarlyDelete()

void edm::StreamSchedule::resetEarlyDelete ( )
private

Definition at line 1291 of file StreamSchedule.cc.

References submitPVResolutionJobs::count, earlyDeleteBranchToCount_, earlyDeleteHelpers_, and earlyDeleteHelperToBranchIndicies_.

Referenced by finishProcessOneEvent(), and initializeEarlyDelete().

1291  {
1292  //must be sure we have cleared the count first
1293  for (auto& count : earlyDeleteBranchToCount_) {
1294  count.count = 0;
1295  }
1296  //now reset based on how many helpers use that branch
1298  ++(earlyDeleteBranchToCount_[index].count);
1299  }
1300  for (auto& helper : earlyDeleteHelpers_) {
1301  helper.reset();
1302  }
1303  }
Definition: helper.py:1
std::vector< BranchToCount > earlyDeleteBranchToCount_
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_

◆ results() [1/2]

TrigResConstPtr edm::StreamSchedule::results ( ) const
inlineprivate

Definition at line 342 of file StreamSchedule.h.

References edm::get_underlying_safe(), and results_.

Referenced by StreamSchedule().

342 { return get_underlying_safe(results_); }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< TrigResPtr > results_

◆ results() [2/2]

TrigResPtr& edm::StreamSchedule::results ( )
inlineprivate

Definition at line 343 of file StreamSchedule.h.

References edm::get_underlying_safe(), and results_.

343 { return get_underlying_safe(results_); }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< TrigResPtr > results_

◆ streamID()

StreamID edm::StreamSchedule::streamID ( ) const
inline

Definition at line 199 of file StreamSchedule.h.

References streamID_.

Referenced by StreamSchedule().

199 { return streamID_; }

◆ totalEvents()

int edm::StreamSchedule::totalEvents ( ) const
inline

Return the number of events this StreamSchedule has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 226 of file StreamSchedule.h.

References total_events_.

Referenced by getTriggerReport(), and totalEventsFailed().

226 { return total_events_; }

◆ totalEventsFailed()

int edm::StreamSchedule::totalEventsFailed ( ) const
inline

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 234 of file StreamSchedule.h.

References totalEvents(), and totalEventsPassed().

Referenced by getTriggerReport().

234 { return totalEvents() - totalEventsPassed(); }
int totalEventsPassed() const
int totalEvents() const

◆ totalEventsPassed()

int edm::StreamSchedule::totalEventsPassed ( ) const
inline

Return the number of events which have been passed by one or more trigger paths.

Definition at line 230 of file StreamSchedule.h.

References total_passed_.

Referenced by getTriggerReport(), and totalEventsFailed().

230 { return total_passed_; }

◆ tryToPlaceConditionalModules()

std::vector< Worker * > edm::StreamSchedule::tryToPlaceConditionalModules ( Worker worker,
std::unordered_set< std::string > &  conditionalModules,
std::unordered_multimap< std::string, edm::BranchDescription const *> const &  conditionalModuleBranches,
std::unordered_multimap< std::string, AliasInfo > const &  aliasMap,
ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration 
)
private

Definition at line 673 of file StreamSchedule.cc.

References cms::cuda::assert(), edm::TypeID::className(), edm::Worker::consumesInfo(), edm::Worker::description(), HerwigMaxPtPartonFilter_cfi::moduleLabel, edm::ModuleDescription::moduleLabel(), or, edm::PRODUCT_TYPE, AlCaHLTBitMon_QueryRunRegistry::string, edm::productholderindexhelper::typeIsViewCompatible(), and workerManager_.

Referenced by fillWorkers().

681  {
682  std::vector<Worker*> returnValue;
683  auto const& consumesInfo = worker->consumesInfo();
684  auto moduleLabel = worker->description()->moduleLabel();
685  using namespace productholderindexhelper;
686  for (auto const& ci : consumesInfo) {
687  if (not ci.skipCurrentProcess() and
688  (ci.process().empty() or ci.process() == processConfiguration->processName())) {
689  auto productModuleLabel = std::string(ci.label());
690  bool productFromConditionalModule = false;
691  auto itFound = conditionalModules.find(productModuleLabel);
692  if (itFound == conditionalModules.end()) {
693  //Check to see if this was an alias
694  //note that aliasMap was previously filtered so only the conditional modules remain there
695  auto foundAlias = findBestMatchingAlias(conditionalModuleBranches, aliasMap, productModuleLabel, ci);
696  if (foundAlias) {
697  productModuleLabel = *foundAlias;
698  productFromConditionalModule = true;
699  itFound = conditionalModules.find(productModuleLabel);
700  //check that the alias-for conditional module has not been used
701  if (itFound == conditionalModules.end()) {
702  continue;
703  }
704  }
705  } else {
706  //need to check the rest of the data product info
707  auto findBranches = conditionalModuleBranches.equal_range(productModuleLabel);
708  for (auto itBranch = findBranches.first; itBranch != findBranches.second; ++itBranch) {
709  if (itBranch->second->productInstanceName() == ci.instance()) {
710  if (ci.kindOfType() == PRODUCT_TYPE) {
711  if (ci.type() == itBranch->second->unwrappedTypeID()) {
712  productFromConditionalModule = true;
713  break;
714  }
715  } else {
716  //this is a view
718  ci.type(), TypeID(itBranch->second->wrappedType().typeInfo()), itBranch->second->className())) {
719  productFromConditionalModule = true;
720  break;
721  }
722  }
723  }
724  }
725  }
726  if (productFromConditionalModule) {
727  auto condWorker =
728  getWorker(productModuleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration);
729  assert(condWorker);
730 
731  conditionalModules.erase(itFound);
732 
733  auto dependents = tryToPlaceConditionalModules(condWorker,
734  conditionalModules,
735  conditionalModuleBranches,
736  aliasMap,
737  proc_pset,
738  preg,
739  prealloc,
740  processConfiguration);
741  returnValue.insert(returnValue.end(), dependents.begin(), dependents.end());
742  returnValue.push_back(condWorker);
743  }
744  }
745  }
746  return returnValue;
747  }
std::vector< Worker * > tryToPlaceConditionalModules(Worker *, std::unordered_set< std::string > &conditionalModules, std::unordered_multimap< std::string, edm::BranchDescription const *> const &conditionalModuleBranches, std::unordered_multimap< std::string, AliasInfo > const &aliasMap, ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
assert(be >=bs)
WorkerManager workerManager_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
bool typeIsViewCompatible(TypeID const &requestedViewType, TypeID const &wrappedtypeID, std::string const &className)

◆ unscheduledWorkers()

AllWorkers const& edm::StreamSchedule::unscheduledWorkers ( ) const
inline

Definition at line 258 of file StreamSchedule.h.

References edm::WorkerManager::unscheduledWorkers(), and workerManager_.

AllWorkers const & unscheduledWorkers() const
Definition: WorkerManager.h:89
WorkerManager workerManager_

Member Data Documentation

◆ actReg_

std::shared_ptr<ActivityRegistry> edm::StreamSchedule::actReg_
private

◆ earlyDeleteBranchToCount_

std::vector<BranchToCount> edm::StreamSchedule::earlyDeleteBranchToCount_
private

Definition at line 367 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

◆ earlyDeleteHelpers_

std::vector<EarlyDeleteHelper> edm::StreamSchedule::earlyDeleteHelpers_
private

Definition at line 377 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

◆ earlyDeleteHelperToBranchIndicies_

std::vector<unsigned int> edm::StreamSchedule::earlyDeleteHelperToBranchIndicies_
private

Definition at line 374 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

◆ empty_end_paths_

std::vector<int> edm::StreamSchedule::empty_end_paths_
private

Definition at line 362 of file StreamSchedule.h.

Referenced by fillEndPath(), makePathStatusInserters(), and processOneEventAsync().

◆ empty_trig_paths_

std::vector<int> edm::StreamSchedule::empty_trig_paths_
private

Definition at line 361 of file StreamSchedule.h.

Referenced by fillTrigPath(), makePathStatusInserters(), and processOneEventAsync().

◆ end_paths_

TrigPaths edm::StreamSchedule::end_paths_
private

◆ endPathStatusInserterWorkers_

std::vector<edm::propagate_const<WorkerPtr> > edm::StreamSchedule::endPathStatusInserterWorkers_
private

Definition at line 357 of file StreamSchedule.h.

Referenced by makePathStatusInserters(), and processOneEventAsync().

◆ number_of_unscheduled_modules_

unsigned int edm::StreamSchedule::number_of_unscheduled_modules_
private

Definition at line 381 of file StreamSchedule.h.

Referenced by numberOfUnscheduledModules(), and StreamSchedule().

◆ pathStatusInserterWorkers_

std::vector<edm::propagate_const<WorkerPtr> > edm::StreamSchedule::pathStatusInserterWorkers_
private

Definition at line 356 of file StreamSchedule.h.

Referenced by makePathStatusInserters(), and processOneEventAsync().

◆ results_

edm::propagate_const<TrigResPtr> edm::StreamSchedule::results_
private

Definition at line 353 of file StreamSchedule.h.

Referenced by finishedPaths(), processOneEventAsync(), resetAll(), and results().

◆ results_inserter_

edm::propagate_const<WorkerPtr> edm::StreamSchedule::results_inserter_
private

Definition at line 355 of file StreamSchedule.h.

Referenced by finishedPaths(), and StreamSchedule().

◆ skippingEvent_

std::atomic<bool> edm::StreamSchedule::skippingEvent_
private

Definition at line 385 of file StreamSchedule.h.

Referenced by fillTrigPath(), and resetAll().

◆ streamContext_

StreamContext edm::StreamSchedule::streamContext_
private

◆ streamID_

StreamID edm::StreamSchedule::streamID_
private

◆ total_events_

int edm::StreamSchedule::total_events_
private

Definition at line 379 of file StreamSchedule.h.

Referenced by clearCounters(), processOneEventAsync(), and totalEvents().

◆ total_passed_

int edm::StreamSchedule::total_passed_
private

Definition at line 380 of file StreamSchedule.h.

Referenced by clearCounters(), finishedPaths(), and totalEventsPassed().

◆ trig_paths_

TrigPaths edm::StreamSchedule::trig_paths_
private

◆ workerManager_

WorkerManager edm::StreamSchedule::workerManager_
private