CMS 3D CMS Logo

List of all members | Classes | Public Types | Public Member Functions | Private Member Functions | Private Attributes
edm::StreamSchedule Class Reference

#include <StreamSchedule.h>

Classes

struct  AliasInfo
 
class  SendTerminationSignalIfException
 

Public Types

typedef std::vector< Worker * > AllWorkers
 
typedef std::vector< PathNonTrigPaths
 
typedef std::vector< WorkerInPathPathWorkers
 
typedef std::vector< PathTrigPaths
 
typedef std::shared_ptr< HLTGlobalStatus const > TrigResConstPtr
 
typedef std::shared_ptr< HLTGlobalStatusTrigResPtr
 
typedef std::vector< std::string > vstring
 
typedef std::shared_ptr< WorkerWorkerPtr
 
typedef std::vector< Worker * > Workers
 

Public Member Functions

AllWorkers const & allWorkers () const
 returns the collection of pointers to workers More...
 
void availablePaths (std::vector< std::string > &oLabelsToFill) const
 adds to oLabelsToFill the labels for all paths in the process More...
 
void beginStream ()
 
void clearCounters ()
 Clear all the counters in the trigger report. More...
 
StreamContext const & context () const
 
void deleteModule (std::string const &iLabel)
 Delete the module with label iLabel. More...
 
void endStream ()
 
std::vector< ModuleDescription const * > getAllModuleDescriptions () const
 
void getTriggerReport (TriggerReport &rep) const
 
void initializeEarlyDelete (ModuleRegistry &modReg, std::vector< std::string > const &branchesToDeleteEarly, edm::ProductRegistry const &preg)
 
void moduleDescriptionsInEndPath (std::string const &iEndPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
 
void moduleDescriptionsInPath (std::string const &iPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
 
void modulesInPath (std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
 adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel More...
 
unsigned int numberOfUnscheduledModules () const
 
void processOneEventAsync (WaitingTaskHolder iTask, EventTransitionInfo &, ServiceToken const &token, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
 
template<typename T >
void processOneStreamAsync (WaitingTaskHolder iTask, typename T::TransitionInfoType &transitionInfo, ServiceToken const &token, bool cleaningUpAfterException=false)
 
void replaceModule (maker::ModuleHolder *iMod, std::string const &iLabel)
 clone the type of module with label iLabel but configure with iPSet. More...
 
StreamID streamID () const
 
 StreamSchedule (std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, StreamID streamID, ProcessContext const *processContext)
 
 StreamSchedule (StreamSchedule const &)=delete
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 
AllWorkers const & unscheduledWorkers () const
 

Private Member Functions

ExceptionToActionTable const & actionTable () const
 returns the action table More...
 
void addToAllWorkers (Worker *w)
 
void fillEndPath (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)
 
void fillTrigPath (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)
 
void fillWorkers (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)
 
void finishedPaths (std::atomic< std::exception_ptr *> &, WaitingTaskHolder, EventTransitionInfo &)
 
std::exception_ptr finishProcessOneEvent (std::exception_ptr)
 
void makePathStatusInserters (std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
 
void reportSkipped (EventPrincipal const &ep) const
 
void resetAll ()
 
void resetEarlyDelete ()
 
TrigResConstPtr results () const
 
TrigResPtrresults ()
 
std::vector< Worker * > tryToPlaceConditionalModules (Worker *, std::unordered_set< std::string > &conditionalModules, std::unordered_multimap< std::string, edm::BranchDescription const *> const &conditionalModuleBranches, std::unordered_multimap< std::string, AliasInfo > const &aliasMap, ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
 

Private Attributes

std::shared_ptr< ActivityRegistryactReg_
 
std::vector< BranchToCountearlyDeleteBranchToCount_
 
std::vector< EarlyDeleteHelperearlyDeleteHelpers_
 
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
 
std::vector< int > empty_end_paths_
 
std::vector< int > empty_trig_paths_
 
TrigPaths end_paths_
 
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
 
unsigned int number_of_unscheduled_modules_
 
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
 
edm::propagate_const< TrigResPtrresults_
 
edm::propagate_const< WorkerPtrresults_inserter_
 
std::atomic< bool > skippingEvent_
 
StreamContext streamContext_
 
StreamID streamID_
 
int total_events_
 
int total_passed_
 
TrigPaths trig_paths_
 
WorkerManager workerManager_
 

Detailed Description

Definition at line 155 of file StreamSchedule.h.

Member Typedef Documentation

◆ AllWorkers

typedef std::vector<Worker*> edm::StreamSchedule::AllWorkers

Definition at line 163 of file StreamSchedule.h.

◆ NonTrigPaths

Definition at line 159 of file StreamSchedule.h.

◆ PathWorkers

Definition at line 167 of file StreamSchedule.h.

◆ TrigPaths

typedef std::vector<Path> edm::StreamSchedule::TrigPaths

Definition at line 158 of file StreamSchedule.h.

◆ TrigResConstPtr

typedef std::shared_ptr<HLTGlobalStatus const> edm::StreamSchedule::TrigResConstPtr

Definition at line 161 of file StreamSchedule.h.

◆ TrigResPtr

Definition at line 160 of file StreamSchedule.h.

◆ vstring

typedef std::vector<std::string> edm::StreamSchedule::vstring

Definition at line 157 of file StreamSchedule.h.

◆ WorkerPtr

typedef std::shared_ptr<Worker> edm::StreamSchedule::WorkerPtr

Definition at line 162 of file StreamSchedule.h.

◆ Workers

typedef std::vector<Worker*> edm::StreamSchedule::Workers

Definition at line 165 of file StreamSchedule.h.

Constructor & Destructor Documentation

◆ StreamSchedule() [1/2]

edm::StreamSchedule::StreamSchedule ( std::shared_ptr< TriggerResultInserter inserter,
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters,
std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &  endPathStatusInserters,
std::shared_ptr< ModuleRegistry modReg,
ParameterSet proc_pset,
service::TriggerNamesService const &  tns,
PreallocationConfiguration const &  prealloc,
ProductRegistry pregistry,
ExceptionToActionTable const &  actions,
std::shared_ptr< ActivityRegistry areg,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
StreamID  streamID,
ProcessContext const *  processContext 
)

Definition at line 348 of file StreamSchedule.cc.

References actions, actReg_, addToAllWorkers(), edm::WorkerManager::addToUnscheduledWorkers(), allWorkers(), cms::cuda::assert(), end_paths_, fillEndPath(), fillTrigPath(), edm::propagate_const< T >::get(), edm::service::TriggerNamesService::getEndPaths(), edm::ParameterSet::getParameter(), edm::ParameterSet::getPSetForUpdate(), edm::service::TriggerNamesService::getTrigPaths(), label, makePathStatusInserters(), number_of_unscheduled_modules_, results(), results_inserter_, DBoxMetadataHelper::set_difference(), streamID(), trig_paths_, edm::StreamID::value(), and workerManager_.

362  : workerManager_(modReg, areg, actions),
363  actReg_(areg),
364  results_(new HLTGlobalStatus(tns.getTrigPaths().size())),
366  trig_paths_(),
367  end_paths_(),
368  total_events_(),
369  total_passed_(),
372  streamContext_(streamID_, processContext),
373  skippingEvent_(false) {
374  bool hasPath = false;
375  std::vector<std::string> const& pathNames = tns.getTrigPaths();
376  std::vector<std::string> const& endPathNames = tns.getEndPaths();
377 
378  ConditionalTaskHelper conditionalTaskHelper(
379  proc_pset, preg, &prealloc, processConfiguration, workerManager_, pathNames);
380 
381  int trig_bitpos = 0;
382  trig_paths_.reserve(pathNames.size());
383  for (auto const& trig_name : pathNames) {
384  fillTrigPath(proc_pset,
385  preg,
386  &prealloc,
387  processConfiguration,
388  trig_bitpos,
389  trig_name,
390  results(),
391  endPathNames,
392  conditionalTaskHelper);
393  ++trig_bitpos;
394  hasPath = true;
395  }
396 
397  if (hasPath) {
398  // the results inserter stands alone
399  inserter->setTrigResultForStream(streamID.value(), results());
400 
401  results_inserter_ = makeInserter(actions, actReg_, inserter);
403  }
404 
405  // fill normal endpaths
406  int bitpos = 0;
407  end_paths_.reserve(endPathNames.size());
408  for (auto const& end_path_name : endPathNames) {
409  fillEndPath(
410  proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name, endPathNames, conditionalTaskHelper);
411  ++bitpos;
412  }
413 
414  makePathStatusInserters(pathStatusInserters, endPathStatusInserters, actions);
415 
416  //See if all modules were used
417  std::set<std::string> usedWorkerLabels;
418  for (auto const& worker : allWorkers()) {
419  usedWorkerLabels.insert(worker->description()->moduleLabel());
420  }
421  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
422  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
423  std::vector<std::string> unusedLabels;
424  set_difference(modulesInConfigSet.begin(),
425  modulesInConfigSet.end(),
426  usedWorkerLabels.begin(),
427  usedWorkerLabels.end(),
428  back_inserter(unusedLabels));
429  std::set<std::string> unscheduledLabels;
430  std::vector<std::string> shouldBeUsedLabels;
431  if (!unusedLabels.empty()) {
432  //Need to
433  // 1) create worker
434  // 2) if it is a WorkerT<EDProducer>, add it to our list
435  // 3) hand list to our delayed reader
436  for (auto const& label : unusedLabels) {
437  bool isTracked;
438  ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
439  assert(isTracked);
440  assert(modulePSet != nullptr);
442  *modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
443  }
444  if (!shouldBeUsedLabels.empty()) {
445  std::ostringstream unusedStream;
446  unusedStream << "'" << shouldBeUsedLabels.front() << "'";
447  for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
448  itLabelEnd = shouldBeUsedLabels.end();
449  itLabel != itLabelEnd;
450  ++itLabel) {
451  unusedStream << ",'" << *itLabel << "'";
452  }
453  LogInfo("path") << "The following module labels are not assigned to any path:\n" << unusedStream.str() << "\n";
454  }
455  }
456  number_of_unscheduled_modules_ = unscheduledLabels.size();
457  } // StreamSchedule::StreamSchedule
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)
roAction_t actions[nactions]
Definition: GenABIO.cc:181
void addToAllWorkers(Worker *w)
edm::propagate_const< WorkerPtr > results_inserter_
assert(be >=bs)
StreamID streamID() const
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
constexpr element_type const * get() const
char const * label
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)
TrigResConstPtr results() const
StreamContext streamContext_
Log< level::Info, false > LogInfo
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
edm::propagate_const< TrigResPtr > results_
std::atomic< bool > skippingEvent_
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
unsigned int value() const
Definition: StreamID.h:43
std::vector< std::string > set_difference(std::vector< std::string > const &v1, std::vector< std::string > const &v2)

◆ StreamSchedule() [2/2]

edm::StreamSchedule::StreamSchedule ( StreamSchedule const &  )
delete

Member Function Documentation

◆ actionTable()

ExceptionToActionTable const& edm::StreamSchedule::actionTable ( ) const
inlineprivate

returns the action table

Definition at line 291 of file StreamSchedule.h.

References edm::WorkerManager::actionTable(), and workerManager_.

Referenced by fillEndPath(), fillTrigPath(), and finishedPaths().

291 { return workerManager_.actionTable(); }
WorkerManager workerManager_
ExceptionToActionTable const & actionTable() const
Definition: WorkerManager.h:90

◆ addToAllWorkers()

void edm::StreamSchedule::addToAllWorkers ( Worker w)
private

◆ allWorkers()

AllWorkers const& edm::StreamSchedule::allWorkers ( ) const
inline

returns the collection of pointers to workers

Definition at line 255 of file StreamSchedule.h.

References edm::WorkerManager::allWorkers(), and workerManager_.

Referenced by clearCounters(), getAllModuleDescriptions(), getTriggerReport(), initializeEarlyDelete(), replaceModule(), and StreamSchedule().

255 { return workerManager_.allWorkers(); }
WorkerManager workerManager_
AllWorkers const & allWorkers() const
Definition: WorkerManager.h:85

◆ availablePaths()

void edm::StreamSchedule::availablePaths ( std::vector< std::string > &  oLabelsToFill) const

adds to oLabelsToFill the labels for all paths in the process

Definition at line 1113 of file StreamSchedule.cc.

References edm::Path::name(), HcalDetIdTransform::transform(), and trig_paths_.

1113  {
1114  oLabelsToFill.reserve(trig_paths_.size());
1115  std::transform(trig_paths_.begin(),
1116  trig_paths_.end(),
1117  std::back_inserter(oLabelsToFill),
1118  std::bind(&Path::name, std::placeholders::_1));
1119  }
std::string const & name() const
Definition: Path.h:74
unsigned transform(const HcalDetId &id, unsigned transformCode)

◆ beginStream()

void edm::StreamSchedule::beginStream ( )

Definition at line 884 of file StreamSchedule.cc.

References edm::WorkerManager::beginStream(), streamContext_, streamID_, and workerManager_.

WorkerManager workerManager_
StreamContext streamContext_
void beginStream(StreamID iID, StreamContext &streamContext)

◆ clearCounters()

void edm::StreamSchedule::clearCounters ( )

Clear all the counters in the trigger report.

Definition at line 1245 of file StreamSchedule.cc.

References allWorkers(), edm::Path::clearCounters(), edm::Worker::clearCounters(), end_paths_, edm::for_all(), total_events_, total_passed_, and trig_paths_.

1245  {
1246  using std::placeholders::_1;
1248  for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
1249  for_all(end_paths_, std::bind(&Path::clearCounters, _1));
1250  for_all(allWorkers(), std::bind(&Worker::clearCounters, _1));
1251  }
void clearCounters()
Definition: Worker.h:223
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
void clearCounters()
Definition: Path.cc:198

◆ context()

StreamContext const& edm::StreamSchedule::context ( ) const
inline

Definition at line 260 of file StreamSchedule.h.

References streamContext_.

260 { return streamContext_; }
StreamContext streamContext_

◆ deleteModule()

void edm::StreamSchedule::deleteModule ( std::string const &  iLabel)

Delete the module with label iLabel.

Definition at line 904 of file StreamSchedule.cc.

References edm::WorkerManager::deleteModuleIfExists(), and workerManager_.

void deleteModuleIfExists(std::string const &moduleLabel)
WorkerManager workerManager_

◆ endStream()

void edm::StreamSchedule::endStream ( )

Definition at line 886 of file StreamSchedule.cc.

References edm::WorkerManager::endStream(), streamContext_, streamID_, and workerManager_.

void endStream(StreamID iID, StreamContext &streamContext)
WorkerManager workerManager_
StreamContext streamContext_

◆ fillEndPath()

void edm::StreamSchedule::fillEndPath ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
int  bitpos,
std::string const &  name,
std::vector< std::string > const &  endPathNames,
ConditionalTaskHelper const &  conditionalTaskHelper 
)
private

Definition at line 853 of file StreamSchedule.cc.

References actionTable(), actReg_, addToAllWorkers(), empty_end_paths_, end_paths_, fillWorkers(), edm::PathContext::kEndPath, Skims_PA_cff::name, and streamContext_.

Referenced by StreamSchedule().

860  {
861  PathWorkers tmpworkers;
862  fillWorkers(
863  proc_pset, preg, prealloc, processConfiguration, name, true, tmpworkers, endPathNames, conditionalTaskHelper);
864 
865  if (!tmpworkers.empty()) {
866  //EndPaths are not supposed to stop if SkipEvent type exception happens
867  end_paths_.emplace_back(bitpos,
868  name,
869  tmpworkers,
870  TrigResPtr(),
871  actionTable(),
872  actReg_,
874  nullptr,
876  } else {
877  empty_end_paths_.push_back(bitpos);
878  }
879  for (WorkerInPath const& workerInPath : tmpworkers) {
880  addToAllWorkers(workerInPath.getWorker());
881  }
882  }
ExceptionToActionTable const & actionTable() const
returns the action table
std::shared_ptr< HLTGlobalStatus > TrigResPtr
void addToAllWorkers(Worker *w)
std::shared_ptr< ActivityRegistry > actReg_
std::vector< WorkerInPath > PathWorkers
std::vector< int > empty_end_paths_
StreamContext streamContext_
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)

◆ fillTrigPath()

void edm::StreamSchedule::fillTrigPath ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
int  bitpos,
std::string const &  name,
TrigResPtr  trptr,
std::vector< std::string > const &  endPathNames,
ConditionalTaskHelper const &  conditionalTaskHelper 
)
private

Definition at line 821 of file StreamSchedule.cc.

References actionTable(), actReg_, addToAllWorkers(), empty_trig_paths_, fillWorkers(), edm::PathContext::kPath, Skims_PA_cff::name, skippingEvent_, streamContext_, and trig_paths_.

Referenced by StreamSchedule().

829  {
830  PathWorkers tmpworkers;
831  fillWorkers(
832  proc_pset, preg, prealloc, processConfiguration, name, false, tmpworkers, endPathNames, conditionalTaskHelper);
833 
834  // an empty path will cause an extra bit that is not used
835  if (!tmpworkers.empty()) {
836  trig_paths_.emplace_back(bitpos,
837  name,
838  tmpworkers,
839  trptr,
840  actionTable(),
841  actReg_,
845  } else {
846  empty_trig_paths_.push_back(bitpos);
847  }
848  for (WorkerInPath const& workerInPath : tmpworkers) {
849  addToAllWorkers(workerInPath.getWorker());
850  }
851  }
ExceptionToActionTable const & actionTable() const
returns the action table
std::vector< int > empty_trig_paths_
void addToAllWorkers(Worker *w)
std::shared_ptr< ActivityRegistry > actReg_
std::vector< WorkerInPath > PathWorkers
StreamContext streamContext_
std::atomic< bool > skippingEvent_
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)

◆ fillWorkers()

void edm::StreamSchedule::fillWorkers ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
std::string const &  name,
bool  ignoreFilters,
PathWorkers out,
std::vector< std::string > const &  endPathNames,
ConditionalTaskHelper const &  conditionalTaskHelper 
)
private

Definition at line 718 of file StreamSchedule.cc.

References edm::ConditionalTaskHelper::aliasMap(), edm::ConditionalTaskHelper::conditionalModuleBranches(), edm::errors::Configuration, edm::Worker::description(), Exception, edm::ParameterSet::getParameter(), edm::ParameterSet::getUntrackedParameter(), edm::WorkerInPath::Ignore, edm::Worker::kFilter, HerwigMaxPtPartonFilter_cfi::moduleLabel, edm::ModuleDescription::moduleName(), edm::Worker::moduleType(), Skims_PA_cff::name, edm::WorkerInPath::Normal, or, MillePedeFileConverter_cfg::out, hltMonBTagIPClient_cfi::pathName, edm::search_all(), AlCaHLTBitMon_QueryRunRegistry::string, tryToPlaceConditionalModules(), edm::WorkerInPath::Veto, and workerManager_.

Referenced by fillEndPath(), and fillTrigPath().

726  {
727  vstring modnames = proc_pset.getParameter<vstring>(pathName);
728  PathWorkers tmpworkers;
729 
730  //Pull out ConditionalTask modules
731  auto condRange = findConditionalTaskModulesRange(modnames);
732 
733  std::unordered_set<std::string> conditionalmods;
734  //An EDAlias may be redirecting to a module on a ConditionalTask
735  std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModsBranches;
736  std::unordered_map<std::string, unsigned int> conditionalModOrder;
737  if (condRange.first != condRange.second) {
738  for (auto it = condRange.first; it != condRange.second; ++it) {
739  // ordering needs to skip the # token in the path list
740  conditionalModOrder.emplace(*it, it - modnames.begin() - 1);
741  }
742  //the last entry should be ignored since it is required to be "@"
743  conditionalmods = std::unordered_set<std::string>(std::make_move_iterator(condRange.first),
744  std::make_move_iterator(condRange.second));
745 
746  conditionalModsBranches = conditionalTaskHelper.conditionalModuleBranches(conditionalmods);
747  modnames.erase(std::prev(condRange.first), modnames.end());
748  }
749 
750  unsigned int placeInPath = 0;
751  for (auto const& name : modnames) {
752  //Modules except EDFilters are set to run concurrently by default
753  bool doNotRunConcurrently = false;
755  if (name[0] == '!') {
756  filterAction = WorkerInPath::Veto;
757  } else if (name[0] == '-' or name[0] == '+') {
758  filterAction = WorkerInPath::Ignore;
759  }
760  if (name[0] == '|' or name[0] == '+') {
761  //cms.wait was specified so do not run concurrently
762  doNotRunConcurrently = true;
763  }
764 
766  if (filterAction != WorkerInPath::Normal or name[0] == '|') {
767  moduleLabel.erase(0, 1);
768  }
769 
770  Worker* worker = getWorker(moduleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration);
771  if (worker == nullptr) {
772  std::string pathType("endpath");
773  if (!search_all(endPathNames, pathName)) {
774  pathType = std::string("path");
775  }
777  << "The unknown module label \"" << moduleLabel << "\" appears in " << pathType << " \"" << pathName
778  << "\"\n please check spelling or remove that label from the path.";
779  }
780 
781  if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType() == Worker::kFilter) {
782  // We have a filter on an end path, and the filter is not explicitly ignored.
783  // See if the filter is allowed.
784  std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
785  if (!search_all(allowed_filters, worker->description()->moduleName())) {
786  // Filter is not allowed. Ignore the result, and issue a warning.
787  filterAction = WorkerInPath::Ignore;
788  LogWarning("FilterOnEndPath") << "The EDFilter '" << worker->description()->moduleName()
789  << "' with module label '" << moduleLabel << "' appears on EndPath '"
790  << pathName << "'.\n"
791  << "The return value of the filter will be ignored.\n"
792  << "To suppress this warning, either remove the filter from the endpath,\n"
793  << "or explicitly ignore it in the configuration by using cms.ignore().\n";
794  }
795  }
796  bool runConcurrently = not doNotRunConcurrently;
797  if (runConcurrently && worker->moduleType() == Worker::kFilter and filterAction != WorkerInPath::Ignore) {
798  runConcurrently = false;
799  }
800 
801  auto condModules = tryToPlaceConditionalModules(worker,
802  conditionalmods,
803  conditionalModsBranches,
804  conditionalTaskHelper.aliasMap(),
805  proc_pset,
806  preg,
807  prealloc,
808  processConfiguration);
809  for (auto condMod : condModules) {
810  tmpworkers.emplace_back(
811  condMod, WorkerInPath::Ignore, conditionalModOrder[condMod->description()->moduleLabel()], true);
812  }
813 
814  tmpworkers.emplace_back(worker, filterAction, placeInPath, runConcurrently);
815  ++placeInPath;
816  }
817 
818  out.swap(tmpworkers);
819  }
vector< string > vstring
Definition: ExoticaDQM.cc:8
std::vector< Worker * > tryToPlaceConditionalModules(Worker *, std::unordered_set< std::string > &conditionalModules, std::unordered_multimap< std::string, edm::BranchDescription const *> const &conditionalModuleBranches, std::unordered_multimap< std::string, AliasInfo > const &aliasMap, ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
WorkerManager workerManager_
std::vector< WorkerInPath > PathWorkers
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:36
Log< level::Warning, false > LogWarning

◆ finishedPaths()

void edm::StreamSchedule::finishedPaths ( std::atomic< std::exception_ptr *> &  iExcept,
WaitingTaskHolder  iWait,
EventTransitionInfo info 
)
private

Definition at line 1023 of file StreamSchedule.cc.

References writedatasetfile::action, actionTable(), cms::Exception::addContext(), cms::cuda::assert(), CMS_SA_ALLOW, cms::Exception::context(), edm::WaitingTaskHolder::doneWaiting(), MillePedeFileConverter_cfg::e, edm::exception_actions::FailPath, edm::ExceptionToActionTable::find(), edm::propagate_const< T >::get(), edm::exception_actions::IgnoreCompletely, info(), edm::printCmsExceptionWarning(), results_, results_inserter_, edm::exception_actions::SkipEvent, streamContext_, streamID_, and total_passed_.

Referenced by processOneEventAsync().

1025  {
1026  if (iExcept) {
1027  // Caught exception is propagated via WaitingTaskHolder
1028  CMS_SA_ALLOW try { std::rethrow_exception(*(iExcept.load())); } catch (cms::Exception& e) {
1033  edm::printCmsExceptionWarning("SkipEvent", e);
1034  *(iExcept.load()) = std::exception_ptr();
1035  } else {
1036  *(iExcept.load()) = std::current_exception();
1037  }
1038  } catch (...) {
1039  *(iExcept.load()) = std::current_exception();
1040  }
1041  }
1042 
1043  if ((not iExcept) and results_->accept()) {
1044  ++total_passed_;
1045  }
1046 
1047  if (nullptr != results_inserter_.get()) {
1048  // Caught exception is propagated to the caller
1049  CMS_SA_ALLOW try {
1050  //Even if there was an exception, we need to allow results inserter
1051  // to run since some module may be waiting on its results.
1052  ParentContext parentContext(&streamContext_);
1053  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
1054 
1055  auto expt = results_inserter_->runModuleDirectly<Traits>(info, streamID_, parentContext, &streamContext_);
1056  if (expt) {
1057  std::rethrow_exception(expt);
1058  }
1059  } catch (cms::Exception& ex) {
1060  if (not iExcept) {
1061  if (ex.context().empty()) {
1062  std::ostringstream ost;
1063  ost << "Processing Event " << info.principal().id();
1064  ex.addContext(ost.str());
1065  }
1066  iExcept.store(new std::exception_ptr(std::current_exception()));
1067  }
1068  } catch (...) {
1069  if (not iExcept) {
1070  iExcept.store(new std::exception_ptr(std::current_exception()));
1071  }
1072  }
1073  }
1074  std::exception_ptr ptr;
1075  if (iExcept) {
1076  ptr = *iExcept.load();
1077  }
1078  iWait.doneWaiting(ptr);
1079  }
ExceptionToActionTable const & actionTable() const
returns the action table
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
edm::propagate_const< WorkerPtr > results_inserter_
assert(be >=bs)
constexpr element_type const * get() const
StreamContext streamContext_
exception_actions::ActionCodes find(const std::string &category) const
edm::propagate_const< TrigResPtr > results_
void addContext(std::string const &context)
Definition: Exception.cc:165
std::list< std::string > const & context() const
Definition: Exception.cc:147
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)

◆ finishProcessOneEvent()

std::exception_ptr edm::StreamSchedule::finishProcessOneEvent ( std::exception_ptr  iExcept)
private

Definition at line 1081 of file StreamSchedule.cc.

References actReg_, edm::addContextAndPrintException(), CMS_SA_ALLOW, cms::Exception::context(), edm::ExceptionFromThisContext, resetEarlyDelete(), streamContext_, and edm::convertException::wrap().

Referenced by processOneEventAsync().

1081  {
1082  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
1083 
1084  if (iExcept) {
1085  //add context information to the exception and print message
1086  try {
1087  convertException::wrap([&]() { std::rethrow_exception(iExcept); });
1088  } catch (cms::Exception& ex) {
1089  bool const cleaningUpAfterException = false;
1090  if (ex.context().empty()) {
1091  addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
1092  } else {
1093  addContextAndPrintException("", ex, cleaningUpAfterException);
1094  }
1095  iExcept = std::current_exception();
1096  }
1097 
1098  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
1099  }
1100  // Caught exception is propagated to the caller
1101  CMS_SA_ALLOW try { Traits::postScheduleSignal(actReg_.get(), &streamContext_); } catch (...) {
1102  if (not iExcept) {
1103  iExcept = std::current_exception();
1104  }
1105  }
1106  if (not iExcept) {
1107  resetEarlyDelete();
1108  }
1109 
1110  return iExcept;
1111  }
#define CMS_SA_ALLOW
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
std::shared_ptr< ActivityRegistry > actReg_
StreamContext streamContext_
auto wrap(F iFunc) -> decltype(iFunc())
std::list< std::string > const & context() const
Definition: Exception.cc:147

◆ getAllModuleDescriptions()

std::vector< ModuleDescription const * > edm::StreamSchedule::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this StreamSchedule. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 906 of file StreamSchedule.cc.

References allWorkers(), AlCaHLTBitMon_ParallelJobs::p, mps_fire::result, and findQualityFiles::size.

906  {
907  std::vector<ModuleDescription const*> result;
908  result.reserve(allWorkers().size());
909 
910  for (auto const& worker : allWorkers()) {
911  ModuleDescription const* p = worker->description();
912  result.push_back(p);
913  }
914  return result;
915  }
size
Write out results.
AllWorkers const & allWorkers() const
returns the collection of pointers to workers

◆ getTriggerReport()

void edm::StreamSchedule::getTriggerReport ( TriggerReport rep) const

Return the trigger report information on paths, modules-in-path, modules-in-endpath, and modules.

Definition at line 1235 of file StreamSchedule.cc.

References allWorkers(), end_paths_, edm::fillPathSummary(), edm::fillWorkerSummary(), cuy::rep, totalEvents(), totalEventsFailed(), totalEventsPassed(), and trig_paths_.

1235  {
1236  rep.eventSummary.totalEvents += totalEvents();
1237  rep.eventSummary.totalEventsPassed += totalEventsPassed();
1238  rep.eventSummary.totalEventsFailed += totalEventsFailed();
1239 
1240  fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
1241  fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
1242  fill_summary(allWorkers(), rep.workerSummaries, &fillWorkerSummary);
1243  }
int totalEventsPassed() const
int totalEventsFailed() const
static void fillPathSummary(Path const &path, PathSummary &sum)
rep
Definition: cuy.py:1189
int totalEvents() const
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
AllWorkers const & allWorkers() const
returns the collection of pointers to workers

◆ initializeEarlyDelete()

void edm::StreamSchedule::initializeEarlyDelete ( ModuleRegistry modReg,
std::vector< std::string > const &  branchesToDeleteEarly,
edm::ProductRegistry const &  preg 
)

Definition at line 459 of file StreamSchedule.cc.

References allWorkers(), MicroEventContent_cff::branch, edm::maker::ModuleHolder::createOutputModuleCommunicator(), dumpMFGeometry_cfg::delta, submitPVResolutionJobs::desc, earlyDeleteBranchToCount_, earlyDeleteHelpers_, earlyDeleteHelperToBranchIndicies_, end_paths_, edm::ModuleRegistry::forAllModuleHolders(), newFWLiteAna::found, edm::pset::Registry::getMapped(), edm::InEvent, edm::pset::Registry::instance(), B2GTnPMonitor_cfi::item, gpuClustering::pixelStatus::kEmpty, cmsLHEtoEOSManager::l, dqmiodumpmetadata::n, AlCaHLTBitMon_ParallelJobs::p, muonDTDigis_cfi::pset, resetEarlyDelete(), AlCaHLTBitMon_QueryRunRegistry::string, groupFilesInBlocks::temp, trig_paths_, mitigatedMETSequence_cff::U, and w().

461  {
462  // setup the list with those products actually registered for this job
463  std::multimap<std::string, Worker*> branchToReadingWorker;
464  initializeBranchToReadingWorker(branchesToDeleteEarly, preg, branchToReadingWorker);
465 
466  const std::vector<std::string> kEmpty;
467  std::map<Worker*, unsigned int> reserveSizeForWorker;
468  unsigned int upperLimitOnReadingWorker = 0;
469  unsigned int upperLimitOnIndicies = 0;
470  unsigned int nUniqueBranchesToDelete = branchToReadingWorker.size();
471 
472  //talk with output modules first
473  modReg.forAllModuleHolders([&branchToReadingWorker, &nUniqueBranchesToDelete](maker::ModuleHolder* iHolder) {
474  auto comm = iHolder->createOutputModuleCommunicator();
475  if (comm) {
476  if (!branchToReadingWorker.empty()) {
477  //If an OutputModule needs a product, we can't delete it early
478  // so we should remove it from our list
479  SelectedProductsForBranchType const& kept = comm->keptProducts();
480  for (auto const& item : kept[InEvent]) {
481  BranchDescription const& desc = *item.first;
482  auto found = branchToReadingWorker.equal_range(desc.branchName());
483  if (found.first != found.second) {
484  --nUniqueBranchesToDelete;
485  branchToReadingWorker.erase(found.first, found.second);
486  }
487  }
488  }
489  }
490  });
491 
492  if (branchToReadingWorker.empty()) {
493  return;
494  }
495 
496  for (auto w : allWorkers()) {
497  //determine if this module could read a branch we want to delete early
498  auto pset = pset::Registry::instance()->getMapped(w->description()->parameterSetID());
499  if (nullptr != pset) {
500  auto branches = pset->getUntrackedParameter<std::vector<std::string>>("mightGet", kEmpty);
501  if (not branches.empty()) {
502  ++upperLimitOnReadingWorker;
503  }
504  for (auto const& branch : branches) {
505  auto found = branchToReadingWorker.equal_range(branch);
506  if (found.first != found.second) {
507  ++upperLimitOnIndicies;
508  ++reserveSizeForWorker[w];
509  if (nullptr == found.first->second) {
510  found.first->second = w;
511  } else {
512  branchToReadingWorker.insert(make_pair(found.first->first, w));
513  }
514  }
515  }
516  }
517  }
518  {
519  auto it = branchToReadingWorker.begin();
520  std::vector<std::string> unusedBranches;
521  while (it != branchToReadingWorker.end()) {
522  if (it->second == nullptr) {
523  unusedBranches.push_back(it->first);
524  //erasing the object invalidates the iterator so must advance it first
525  auto temp = it;
526  ++it;
527  branchToReadingWorker.erase(temp);
528  } else {
529  ++it;
530  }
531  }
532  if (not unusedBranches.empty()) {
533  LogWarning l("UnusedProductsForCanDeleteEarly");
534  l << "The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
535  " If possible, remove the producer from the job or add the product to the producer's own 'mightGet' list.";
536  for (auto const& n : unusedBranches) {
537  l << "\n " << n;
538  }
539  }
540  }
541  if (!branchToReadingWorker.empty()) {
542  earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
543  earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies, 0);
544  earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
545  std::map<const Worker*, EarlyDeleteHelper*> alreadySeenWorkers;
546  std::string lastBranchName;
547  size_t nextOpenIndex = 0;
548  unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
549  for (auto& branchAndWorker : branchToReadingWorker) {
550  if (lastBranchName != branchAndWorker.first) {
551  //have to put back the period we removed earlier in order to get the proper name
552  BranchID bid(branchAndWorker.first + ".");
553  earlyDeleteBranchToCount_.emplace_back(bid, 0U);
554  lastBranchName = branchAndWorker.first;
555  }
556  auto found = alreadySeenWorkers.find(branchAndWorker.second);
557  if (alreadySeenWorkers.end() == found) {
558  //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
559  // all the branches that might be read by this worker. However, initially we will only tell the
560  // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
561  // EarlyDeleteHelper will automatically advance its internal end pointer.
562  size_t index = nextOpenIndex;
563  size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
565  earlyDeleteHelpers_.emplace_back(beginAddress + index, beginAddress + index + 1, &earlyDeleteBranchToCount_);
566  branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
567  alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second, &(earlyDeleteHelpers_.back())));
568  nextOpenIndex += nIndices;
569  } else {
570  found->second->appendIndex(earlyDeleteBranchToCount_.size() - 1);
571  }
572  }
573 
574  //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
575  // space needed for each module
576  auto itLast = earlyDeleteHelpers_.begin();
577  for (auto it = earlyDeleteHelpers_.begin() + 1; it != earlyDeleteHelpers_.end(); ++it) {
578  if (itLast->end() != it->begin()) {
579  //figure the offset for next Worker since it hasn't been moved yet so it has the original address
580  unsigned int delta = it->begin() - itLast->end();
581  it->shiftIndexPointers(delta);
582 
584  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
585  earlyDeleteHelperToBranchIndicies_.begin() + (it->begin() - beginAddress));
586  }
587  itLast = it;
588  }
590  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
592 
593  //now tell the paths about the deleters
594  for (auto& p : trig_paths_) {
595  p.setEarlyDeleteHelpers(alreadySeenWorkers);
596  }
597  for (auto& p : end_paths_) {
598  p.setEarlyDeleteHelpers(alreadySeenWorkers);
599  }
601  }
602  }
bool getMapped(key_type const &k, value_type &result) const
Definition: Registry.cc:17
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
T w() const
std::vector< BranchToCount > earlyDeleteBranchToCount_
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
Log< level::Warning, false > LogWarning
static Registry * instance()
Definition: Registry.cc:12
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_

◆ makePathStatusInserters()

void edm::StreamSchedule::makePathStatusInserters ( std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters,
std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &  endPathStatusInserters,
ExceptionToActionTable const &  actions 
)
private

Definition at line 1274 of file StreamSchedule.cc.

References actions, actReg_, addToAllWorkers(), empty_end_paths_, empty_trig_paths_, end_paths_, endPathStatusInserterWorkers_, edm::get_underlying(), pathStatusInserterWorkers_, and trig_paths_.

Referenced by StreamSchedule().

1277  {
1278  int bitpos = 0;
1279  unsigned int indexEmpty = 0;
1280  unsigned int indexOfPath = 0;
1281  for (auto& pathStatusInserter : pathStatusInserters) {
1282  std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
1283  WorkerPtr workerPtr(
1284  new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1285  pathStatusInserterWorkers_.emplace_back(workerPtr);
1286  workerPtr->setActivityRegistry(actReg_);
1287  addToAllWorkers(workerPtr.get());
1288 
1289  // A little complexity here because a C++ Path object is not
1290  // instantiated and put into end_paths if there are no modules
1291  // on the configured path.
1292  if (indexEmpty < empty_trig_paths_.size() && bitpos == empty_trig_paths_.at(indexEmpty)) {
1293  ++indexEmpty;
1294  } else {
1295  trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(), workerPtr.get());
1296  ++indexOfPath;
1297  }
1298  ++bitpos;
1299  }
1300 
1301  bitpos = 0;
1302  indexEmpty = 0;
1303  indexOfPath = 0;
1304  for (auto& endPathStatusInserter : endPathStatusInserters) {
1305  std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
1306  WorkerPtr workerPtr(
1307  new edm::WorkerT<EndPathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1308  endPathStatusInserterWorkers_.emplace_back(workerPtr);
1309  workerPtr->setActivityRegistry(actReg_);
1310  addToAllWorkers(workerPtr.get());
1311 
1312  // A little complexity here because a C++ Path object is not
1313  // instantiated and put into end_paths if there are no modules
1314  // on the configured path.
1315  if (indexEmpty < empty_end_paths_.size() && bitpos == empty_end_paths_.at(indexEmpty)) {
1316  ++indexEmpty;
1317  } else {
1318  end_paths_.at(indexOfPath).setPathStatusInserter(nullptr, workerPtr.get());
1319  ++indexOfPath;
1320  }
1321  ++bitpos;
1322  }
1323  }
std::vector< int > empty_trig_paths_
roAction_t actions[nactions]
Definition: GenABIO.cc:181
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void addToAllWorkers(Worker *w)
std::shared_ptr< Worker > WorkerPtr
std::shared_ptr< ActivityRegistry > actReg_
std::vector< int > empty_end_paths_
constexpr T & get_underlying(propagate_const< T > &)
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_

◆ moduleDescriptionsInEndPath()

void edm::StreamSchedule::moduleDescriptionsInEndPath ( std::string const &  iEndPathLabel,
std::vector< ModuleDescription const *> &  descriptions,
unsigned int  hint 
) const

Definition at line 1163 of file StreamSchedule.cc.

References end_paths_, newFWLiteAna::found, mps_fire::i, and edm::Path::name().

1165  {
1166  descriptions.clear();
1167  bool found = false;
1168  TrigPaths::const_iterator itFound;
1169 
1170  if (hint < end_paths_.size()) {
1171  itFound = end_paths_.begin() + hint;
1172  if (itFound->name() == iEndPathLabel)
1173  found = true;
1174  }
1175  if (!found) {
1176  // if the hint did not work, do it the slow way
1177  itFound = std::find_if(
1178  end_paths_.begin(),
1179  end_paths_.end(),
1180  std::bind(std::equal_to<std::string>(), iEndPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1181  if (itFound != end_paths_.end())
1182  found = true;
1183  }
1184  if (found) {
1185  descriptions.reserve(itFound->size());
1186  for (size_t i = 0; i < itFound->size(); ++i) {
1187  descriptions.push_back(itFound->getWorker(i)->description());
1188  }
1189  }
1190  }
std::string const & name() const
Definition: Path.h:74

◆ moduleDescriptionsInPath()

void edm::StreamSchedule::moduleDescriptionsInPath ( std::string const &  iPathLabel,
std::vector< ModuleDescription const *> &  descriptions,
unsigned int  hint 
) const

Definition at line 1134 of file StreamSchedule.cc.

References newFWLiteAna::found, mps_fire::i, edm::Path::name(), and trig_paths_.

1136  {
1137  descriptions.clear();
1138  bool found = false;
1139  TrigPaths::const_iterator itFound;
1140 
1141  if (hint < trig_paths_.size()) {
1142  itFound = trig_paths_.begin() + hint;
1143  if (itFound->name() == iPathLabel)
1144  found = true;
1145  }
1146  if (!found) {
1147  // if the hint did not work, do it the slow way
1148  itFound = std::find_if(
1149  trig_paths_.begin(),
1150  trig_paths_.end(),
1151  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1152  if (itFound != trig_paths_.end())
1153  found = true;
1154  }
1155  if (found) {
1156  descriptions.reserve(itFound->size());
1157  for (size_t i = 0; i < itFound->size(); ++i) {
1158  descriptions.push_back(itFound->getWorker(i)->description());
1159  }
1160  }
1161  }
std::string const & name() const
Definition: Path.h:74

◆ modulesInPath()

void edm::StreamSchedule::modulesInPath ( std::string const &  iPathLabel,
std::vector< std::string > &  oLabelsToFill 
) const

adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel

Definition at line 1121 of file StreamSchedule.cc.

References mps_fire::i, edm::Path::name(), and trig_paths_.

1121  {
1122  TrigPaths::const_iterator itFound = std::find_if(
1123  trig_paths_.begin(),
1124  trig_paths_.end(),
1125  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1126  if (itFound != trig_paths_.end()) {
1127  oLabelsToFill.reserve(itFound->size());
1128  for (size_t i = 0; i < itFound->size(); ++i) {
1129  oLabelsToFill.push_back(itFound->getWorker(i)->description()->moduleLabel());
1130  }
1131  }
1132  }
std::string const & name() const
Definition: Path.h:74

◆ numberOfUnscheduledModules()

unsigned int edm::StreamSchedule::numberOfUnscheduledModules ( ) const
inline

Definition at line 258 of file StreamSchedule.h.

References number_of_unscheduled_modules_.

unsigned int number_of_unscheduled_modules_

◆ processOneEventAsync()

void edm::StreamSchedule::processOneEventAsync ( WaitingTaskHolder  iTask,
EventTransitionInfo info,
ServiceToken const &  token,
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters 
)

Definition at line 917 of file StreamSchedule.cc.

References actReg_, CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), empty_end_paths_, empty_trig_paths_, end_paths_, endPathStatusInserterWorkers_, SiStripBadComponentsDQMServiceTemplate_cfg::ep, finishedPaths(), finishProcessOneEvent(), edm::WaitingTaskHolder::group(), info(), edm::ServiceWeakToken::lock(), edm::make_waiting_task(), eostools::move(), edm::hlt::Pass, pathStatusInserterWorkers_, edm::WorkerManager::processAccumulatorsAsync(), resetAll(), results_, edm::WorkerManager::setupOnDemandSystem(), edm::WorkerManager::setupResolvers(), streamContext_, streamID_, total_events_, trig_paths_, and workerManager_.

921  {
922  EventPrincipal& ep = info.principal();
923 
924  // Caught exception is propagated via WaitingTaskHolder
925  CMS_SA_ALLOW try {
926  this->resetAll();
927 
928  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
929 
930  Traits::setStreamContext(streamContext_, ep);
931  //a service may want to communicate with another service
932  ServiceRegistry::Operate guard(serviceToken);
933  Traits::preScheduleSignal(actReg_.get(), &streamContext_);
934 
935  // Data dependencies need to be set up before marking empty
936  // (End)Paths complete in case something consumes the status of
937  // the empty (EndPath)
940 
941  HLTPathStatus hltPathStatus(hlt::Pass, 0);
942  for (int empty_trig_path : empty_trig_paths_) {
943  results_->at(empty_trig_path) = hltPathStatus;
944  pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
945  std::exception_ptr except = pathStatusInserterWorkers_[empty_trig_path]
946  ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
947  info, streamID_, ParentContext(&streamContext_), &streamContext_);
948  if (except) {
949  iTask.doneWaiting(except);
950  return;
951  }
952  }
953  for (int empty_end_path : empty_end_paths_) {
954  std::exception_ptr except = endPathStatusInserterWorkers_[empty_end_path]
955  ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
956  info, streamID_, ParentContext(&streamContext_), &streamContext_);
957  if (except) {
958  iTask.doneWaiting(except);
959  return;
960  }
961  }
962 
963  ++total_events_;
964 
965  //use to give priorities on an error to ones from Paths
966  auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(nullptr);
967  auto pathErrorPtr = pathErrorHolder.get();
968  ServiceWeakToken weakToken = serviceToken;
969  auto allPathsDone = make_waiting_task(
970  [iTask, this, weakToken, pathError = std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable {
971  ServiceRegistry::Operate operate(weakToken.lock());
972 
973  std::exception_ptr ptr;
974  if (pathError->load()) {
975  ptr = *pathError->load();
976  delete pathError->load();
977  }
978  if ((not ptr) and iPtr) {
979  ptr = *iPtr;
980  }
981  iTask.doneWaiting(finishProcessOneEvent(ptr));
982  });
983  //The holder guarantees that if the paths finish before the loop ends
984  // that we do not start too soon. It also guarantees that the task will
985  // run under that condition.
986  WaitingTaskHolder allPathsHolder(*iTask.group(), allPathsDone);
987 
988  auto pathsDone = make_waiting_task([allPathsHolder, pathErrorPtr, transitionInfo = info, this, weakToken](
989  std::exception_ptr const* iPtr) mutable {
990  ServiceRegistry::Operate operate(weakToken.lock());
991 
992  if (iPtr) {
993  //this is used to prioritize this error over one
994  // that happens in EndPath or Accumulate
995  pathErrorPtr->store(new std::exception_ptr(*iPtr));
996  }
997  finishedPaths(*pathErrorPtr, std::move(allPathsHolder), transitionInfo);
998  });
999 
1000  //The holder guarantees that if the paths finish before the loop ends
1001  // that we do not start too soon. It also guarantees that the task will
1002  // run under that condition.
1003  WaitingTaskHolder taskHolder(*iTask.group(), pathsDone);
1004 
1005  //start end paths first so on single threaded the paths will run first
1006  WaitingTaskHolder hAllPathsDone(*iTask.group(), allPathsDone);
1007  for (auto it = end_paths_.rbegin(), itEnd = end_paths_.rend(); it != itEnd; ++it) {
1008  it->processOneOccurrenceAsync(hAllPathsDone, info, serviceToken, streamID_, &streamContext_);
1009  }
1010 
1011  for (auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend(); it != itEnd; ++it) {
1012  it->processOneOccurrenceAsync(taskHolder, info, serviceToken, streamID_, &streamContext_);
1013  }
1014 
1015  ParentContext parentContext(&streamContext_);
1016  workerManager_.processAccumulatorsAsync<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
1017  hAllPathsDone, info, serviceToken, streamID_, parentContext, &streamContext_);
1018  } catch (...) {
1019  iTask.doneWaiting(std::current_exception());
1020  }
1021  }
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
std::vector< int > empty_trig_paths_
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void processAccumulatorsAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
void setupOnDemandSystem(EventTransitionInfo const &)
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
std::vector< int > empty_end_paths_
accept
Definition: HLTenums.h:18
StreamContext streamContext_
void finishedPaths(std::atomic< std::exception_ptr *> &, WaitingTaskHolder, EventTransitionInfo &)
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
edm::propagate_const< TrigResPtr > results_
void setupResolvers(Principal &principal)
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
def move(src, dest)
Definition: eostools.py:511

◆ processOneStreamAsync()

template<typename T >
void edm::StreamSchedule::processOneStreamAsync ( WaitingTaskHolder  iTask,
typename T::TransitionInfoType &  transitionInfo,
ServiceToken const &  token,
bool  cleaningUpAfterException = false 
)

Definition at line 393 of file StreamSchedule.h.

References actReg_, edm::addContextAndPrintException(), CMS_SA_ALLOW, cms::Exception::context(), edm::WaitingTaskHolder::doneWaiting(), end_paths_, edm::ExceptionFromThisContext, edm::WaitingTaskHolder::group(), h, l1ctLayer2EG_cff::id, info(), edm::ServiceWeakToken::lock(), edm::make_functor_task(), edm::make_waiting_task(), AlCaHLTBitMon_ParallelJobs::p, edm::WorkerManager::processOneOccurrenceAsync(), edm::WorkerManager::resetAll(), alignCSCRings::s, streamContext_, streamID_, TrackValidation_cff::task, unpackBuffers-CaloStage2::token, edm::transitionName(), trig_paths_, edm::StreamID::value(), workerManager_, and edm::convertException::wrap().

396  {
397  auto const& principal = transitionInfo.principal();
398  T::setStreamContext(streamContext_, principal);
399 
400  auto id = principal.id();
401  ServiceWeakToken weakToken = token;
402  auto doneTask = make_waiting_task(
403  [this, iHolder, id, cleaningUpAfterException, weakToken](std::exception_ptr const* iPtr) mutable {
404  std::exception_ptr excpt;
405  if (iPtr) {
406  excpt = *iPtr;
407  //add context information to the exception and print message
408  try {
409  convertException::wrap([&]() { std::rethrow_exception(excpt); });
410  } catch (cms::Exception& ex) {
411  //TODO: should add the transition type info
412  std::ostringstream ost;
413  if (ex.context().empty()) {
414  ost << "Processing " << T::transitionName() << " " << id;
415  }
416  ServiceRegistry::Operate op(weakToken.lock());
417  addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
418  excpt = std::current_exception();
419  }
420 
421  ServiceRegistry::Operate op(weakToken.lock());
422  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
423  }
424  // Caught exception is propagated via WaitingTaskHolder
425  CMS_SA_ALLOW try {
426  ServiceRegistry::Operate op(weakToken.lock());
427  T::postScheduleSignal(actReg_.get(), &streamContext_);
428  } catch (...) {
429  if (not excpt) {
430  excpt = std::current_exception();
431  }
432  }
433  iHolder.doneWaiting(excpt);
434  });
435 
436  auto task = make_functor_task(
437  [this, h = WaitingTaskHolder(*iHolder.group(), doneTask), info = transitionInfo, weakToken]() mutable {
438  auto token = weakToken.lock();
440  // Caught exception is propagated via WaitingTaskHolder
441  CMS_SA_ALLOW try {
442  T::preScheduleSignal(actReg_.get(), &streamContext_);
443 
445  } catch (...) {
446  h.doneWaiting(std::current_exception());
447  return;
448  }
449 
450  for (auto& p : end_paths_) {
451  p.runAllModulesAsync<T>(h, info, token, streamID_, &streamContext_);
452  }
453 
454  for (auto& p : trig_paths_) {
455  p.runAllModulesAsync<T>(h, info, token, streamID_, &streamContext_);
456  }
457 
459  });
460 
461  if (streamID_.value() == 0) {
462  //Enqueueing will start another thread if there is only
463  // one thread in the job. Having stream == 0 use spawn
464  // avoids starting up another thread when there is only one stream.
465  iHolder.group()->run([task]() {
466  TaskSentry s{task};
467  task->execute();
468  });
469  } else {
470  oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
471  arena.enqueue([task]() {
472  TaskSentry s{task};
473  task->execute();
474  });
475  }
476  }
std::string_view transitionName(GlobalContext::Transition)
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
void processOneOccurrenceAsync(WaitingTaskHolder, typename T::TransitionInfoType &, ServiceToken const &, StreamID, typename T::Context const *topContext, U const *context)
StreamContext streamContext_
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
FunctorTask< F > * make_functor_task(F f)
Definition: FunctorTask.h:44
unsigned int value() const
Definition: StreamID.h:43
auto wrap(F iFunc) -> decltype(iFunc())
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
std::list< std::string > const & context() const
Definition: Exception.cc:147
long double T

◆ replaceModule()

void edm::StreamSchedule::replaceModule ( maker::ModuleHolder iMod,
std::string const &  iLabel 
)

clone the type of module with label iLabel but configure with iPSet.

Definition at line 888 of file StreamSchedule.cc.

References allWorkers(), newFWLiteAna::found, edm::maker::ModuleHolder::replaceModuleFor(), streamContext_, and streamID_.

888  {
889  Worker* found = nullptr;
890  for (auto const& worker : allWorkers()) {
891  if (worker->description()->moduleLabel() == iLabel) {
892  found = worker;
893  break;
894  }
895  }
896  if (nullptr == found) {
897  return;
898  }
899 
900  iMod->replaceModuleFor(found);
901  found->beginStream(streamID_, streamContext_);
902  }
StreamContext streamContext_
AllWorkers const & allWorkers() const
returns the collection of pointers to workers

◆ reportSkipped()

void edm::StreamSchedule::reportSkipped ( EventPrincipal const &  ep) const
inlineprivate

Definition at line 387 of file StreamSchedule.h.

References SiStripBadComponentsDQMServiceTemplate_cfg::ep.

387  {
388  Service<JobReport> reportSvc;
389  reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
390  }

◆ resetAll()

void edm::StreamSchedule::resetAll ( )
private

Definition at line 1253 of file StreamSchedule.cc.

References results_, and skippingEvent_.

Referenced by processOneEventAsync().

1253  {
1254  skippingEvent_ = false;
1255  results_->reset();
1256  }
edm::propagate_const< TrigResPtr > results_
std::atomic< bool > skippingEvent_

◆ resetEarlyDelete()

void edm::StreamSchedule::resetEarlyDelete ( )
private

Definition at line 1260 of file StreamSchedule.cc.

References submitPVResolutionJobs::count, earlyDeleteBranchToCount_, earlyDeleteHelpers_, and earlyDeleteHelperToBranchIndicies_.

Referenced by finishProcessOneEvent(), and initializeEarlyDelete().

1260  {
1261  //must be sure we have cleared the count first
1262  for (auto& count : earlyDeleteBranchToCount_) {
1263  count.count = 0;
1264  }
1265  //now reset based on how many helpers use that branch
1267  ++(earlyDeleteBranchToCount_[index].count);
1268  }
1269  for (auto& helper : earlyDeleteHelpers_) {
1270  helper.reset();
1271  }
1272  }
Definition: helper.py:1
std::vector< BranchToCount > earlyDeleteBranchToCount_
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_

◆ results() [1/2]

TrigResConstPtr edm::StreamSchedule::results ( ) const
inlineprivate

Definition at line 341 of file StreamSchedule.h.

References edm::get_underlying_safe(), and results_.

Referenced by StreamSchedule().

341 { return get_underlying_safe(results_); }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< TrigResPtr > results_

◆ results() [2/2]

TrigResPtr& edm::StreamSchedule::results ( )
inlineprivate

Definition at line 342 of file StreamSchedule.h.

References edm::get_underlying_safe(), and results_.

342 { return get_underlying_safe(results_); }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< TrigResPtr > results_

◆ streamID()

StreamID edm::StreamSchedule::streamID ( ) const
inline

Definition at line 200 of file StreamSchedule.h.

References streamID_.

Referenced by StreamSchedule().

200 { return streamID_; }

◆ totalEvents()

int edm::StreamSchedule::totalEvents ( ) const
inline

Return the number of events this StreamSchedule has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 227 of file StreamSchedule.h.

References total_events_.

Referenced by getTriggerReport(), and totalEventsFailed().

227 { return total_events_; }

◆ totalEventsFailed()

int edm::StreamSchedule::totalEventsFailed ( ) const
inline

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 235 of file StreamSchedule.h.

References totalEvents(), and totalEventsPassed().

Referenced by getTriggerReport().

235 { return totalEvents() - totalEventsPassed(); }
int totalEventsPassed() const
int totalEvents() const

◆ totalEventsPassed()

int edm::StreamSchedule::totalEventsPassed ( ) const
inline

Return the number of events which have been passed by one or more trigger paths.

Definition at line 231 of file StreamSchedule.h.

References total_passed_.

Referenced by getTriggerReport(), and totalEventsFailed().

231 { return total_passed_; }

◆ tryToPlaceConditionalModules()

std::vector< Worker * > edm::StreamSchedule::tryToPlaceConditionalModules ( Worker worker,
std::unordered_set< std::string > &  conditionalModules,
std::unordered_multimap< std::string, edm::BranchDescription const *> const &  conditionalModuleBranches,
std::unordered_multimap< std::string, AliasInfo > const &  aliasMap,
ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration 
)
private

Definition at line 604 of file StreamSchedule.cc.

References cms::cuda::assert(), MicroEventContent_cff::branch, edm::TypeID::className(), edm::Worker::consumesInfo(), edm::Worker::description(), HerwigMaxPtPartonFilter_cfi::moduleLabel, edm::ModuleDescription::moduleLabel(), or, edm::PRODUCT_TYPE, edm::productholderindexhelper::typeIsViewCompatible(), and workerManager_.

Referenced by fillWorkers().

612  {
613  std::vector<Worker*> returnValue;
614  auto const& consumesInfo = worker->consumesInfo();
615  auto moduleLabel = worker->description()->moduleLabel();
616  using namespace productholderindexhelper;
617  for (auto const& ci : consumesInfo) {
618  if (not ci.skipCurrentProcess() and
619  (ci.process().empty() or ci.process() == processConfiguration->processName())) {
620  auto productModuleLabel = ci.label();
621  if (productModuleLabel.empty()) {
622  //this is a consumesMany request
623  for (auto const& branch : conditionalModuleBranches) {
624  //check that the conditional module has not been used
625  if (conditionalModules.find(branch.first) == conditionalModules.end()) {
626  continue;
627  }
628  if (ci.kindOfType() == edm::PRODUCT_TYPE) {
629  if (branch.second->unwrappedTypeID() != ci.type()) {
630  continue;
631  }
632  } else {
633  if (not typeIsViewCompatible(
634  ci.type(), TypeID(branch.second->wrappedType().typeInfo()), branch.second->className())) {
635  continue;
636  }
637  }
638 
639  auto condWorker = getWorker(branch.first, proc_pset, workerManager_, preg, prealloc, processConfiguration);
640  assert(condWorker);
641 
642  conditionalModules.erase(branch.first);
643 
644  auto dependents = tryToPlaceConditionalModules(condWorker,
645  conditionalModules,
646  conditionalModuleBranches,
647  aliasMap,
648  proc_pset,
649  preg,
650  prealloc,
651  processConfiguration);
652  returnValue.insert(returnValue.end(), dependents.begin(), dependents.end());
653  returnValue.push_back(condWorker);
654  }
655  } else {
656  //just a regular consumes
657  bool productFromConditionalModule = false;
658  auto itFound = conditionalModules.find(productModuleLabel);
659  if (itFound == conditionalModules.end()) {
660  //Check to see if this was an alias
661  //note that aliasMap was previously filtered so only the conditional modules remain there
662  auto foundAlias = findBestMatchingAlias(conditionalModuleBranches, aliasMap, productModuleLabel, ci);
663  if (foundAlias) {
664  productModuleLabel = *foundAlias;
665  productFromConditionalModule = true;
666  itFound = conditionalModules.find(productModuleLabel);
667  //check that the alias-for conditional module has not been used
668  if (itFound == conditionalModules.end()) {
669  continue;
670  }
671  }
672  } else {
673  //need to check the rest of the data product info
674  auto findBranches = conditionalModuleBranches.equal_range(productModuleLabel);
675  for (auto itBranch = findBranches.first; itBranch != findBranches.second; ++itBranch) {
676  if (itBranch->second->productInstanceName() == ci.instance()) {
677  if (ci.kindOfType() == PRODUCT_TYPE) {
678  if (ci.type() == itBranch->second->unwrappedTypeID()) {
679  productFromConditionalModule = true;
680  break;
681  }
682  } else {
683  //this is a view
684  if (typeIsViewCompatible(ci.type(),
685  TypeID(itBranch->second->wrappedType().typeInfo()),
686  itBranch->second->className())) {
687  productFromConditionalModule = true;
688  break;
689  }
690  }
691  }
692  }
693  }
694  if (productFromConditionalModule) {
695  auto condWorker =
696  getWorker(productModuleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration);
697  assert(condWorker);
698 
699  conditionalModules.erase(itFound);
700 
701  auto dependents = tryToPlaceConditionalModules(condWorker,
702  conditionalModules,
703  conditionalModuleBranches,
704  aliasMap,
705  proc_pset,
706  preg,
707  prealloc,
708  processConfiguration);
709  returnValue.insert(returnValue.end(), dependents.begin(), dependents.end());
710  returnValue.push_back(condWorker);
711  }
712  }
713  }
714  }
715  return returnValue;
716  }
std::vector< Worker * > tryToPlaceConditionalModules(Worker *, std::unordered_set< std::string > &conditionalModules, std::unordered_multimap< std::string, edm::BranchDescription const *> const &conditionalModuleBranches, std::unordered_multimap< std::string, AliasInfo > const &aliasMap, ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
assert(be >=bs)
WorkerManager workerManager_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
bool typeIsViewCompatible(TypeID const &requestedViewType, TypeID const &wrappedtypeID, std::string const &className)

◆ unscheduledWorkers()

AllWorkers const& edm::StreamSchedule::unscheduledWorkers ( ) const
inline

Definition at line 257 of file StreamSchedule.h.

References edm::WorkerManager::unscheduledWorkers(), and workerManager_.

AllWorkers const & unscheduledWorkers() const
Definition: WorkerManager.h:86
WorkerManager workerManager_

Member Data Documentation

◆ actReg_

std::shared_ptr<ActivityRegistry> edm::StreamSchedule::actReg_
private

◆ earlyDeleteBranchToCount_

std::vector<BranchToCount> edm::StreamSchedule::earlyDeleteBranchToCount_
private

Definition at line 366 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

◆ earlyDeleteHelpers_

std::vector<EarlyDeleteHelper> edm::StreamSchedule::earlyDeleteHelpers_
private

Definition at line 376 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

◆ earlyDeleteHelperToBranchIndicies_

std::vector<unsigned int> edm::StreamSchedule::earlyDeleteHelperToBranchIndicies_
private

Definition at line 373 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

◆ empty_end_paths_

std::vector<int> edm::StreamSchedule::empty_end_paths_
private

Definition at line 361 of file StreamSchedule.h.

Referenced by fillEndPath(), makePathStatusInserters(), and processOneEventAsync().

◆ empty_trig_paths_

std::vector<int> edm::StreamSchedule::empty_trig_paths_
private

Definition at line 360 of file StreamSchedule.h.

Referenced by fillTrigPath(), makePathStatusInserters(), and processOneEventAsync().

◆ end_paths_

TrigPaths edm::StreamSchedule::end_paths_
private

◆ endPathStatusInserterWorkers_

std::vector<edm::propagate_const<WorkerPtr> > edm::StreamSchedule::endPathStatusInserterWorkers_
private

Definition at line 356 of file StreamSchedule.h.

Referenced by makePathStatusInserters(), and processOneEventAsync().

◆ number_of_unscheduled_modules_

unsigned int edm::StreamSchedule::number_of_unscheduled_modules_
private

Definition at line 380 of file StreamSchedule.h.

Referenced by numberOfUnscheduledModules(), and StreamSchedule().

◆ pathStatusInserterWorkers_

std::vector<edm::propagate_const<WorkerPtr> > edm::StreamSchedule::pathStatusInserterWorkers_
private

Definition at line 355 of file StreamSchedule.h.

Referenced by makePathStatusInserters(), and processOneEventAsync().

◆ results_

edm::propagate_const<TrigResPtr> edm::StreamSchedule::results_
private

Definition at line 352 of file StreamSchedule.h.

Referenced by finishedPaths(), processOneEventAsync(), resetAll(), and results().

◆ results_inserter_

edm::propagate_const<WorkerPtr> edm::StreamSchedule::results_inserter_
private

Definition at line 354 of file StreamSchedule.h.

Referenced by finishedPaths(), and StreamSchedule().

◆ skippingEvent_

std::atomic<bool> edm::StreamSchedule::skippingEvent_
private

Definition at line 384 of file StreamSchedule.h.

Referenced by fillTrigPath(), and resetAll().

◆ streamContext_

StreamContext edm::StreamSchedule::streamContext_
private

◆ streamID_

StreamID edm::StreamSchedule::streamID_
private

◆ total_events_

int edm::StreamSchedule::total_events_
private

Definition at line 378 of file StreamSchedule.h.

Referenced by clearCounters(), processOneEventAsync(), and totalEvents().

◆ total_passed_

int edm::StreamSchedule::total_passed_
private

Definition at line 379 of file StreamSchedule.h.

Referenced by clearCounters(), finishedPaths(), and totalEventsPassed().

◆ trig_paths_

TrigPaths edm::StreamSchedule::trig_paths_
private

◆ workerManager_

WorkerManager edm::StreamSchedule::workerManager_
private