CMS 3D CMS Logo

List of all members | Classes | Public Types | Public Member Functions | Private Member Functions | Private Attributes
edm::StreamSchedule Class Reference

#include <StreamSchedule.h>

Classes

struct  AliasInfo
 
class  SendTerminationSignalIfException
 

Public Types

typedef std::vector< Worker * > AllWorkers
 
typedef std::vector< PathNonTrigPaths
 
typedef std::vector< WorkerInPathPathWorkers
 
typedef std::vector< PathTrigPaths
 
typedef std::shared_ptr< HLTGlobalStatus const > TrigResConstPtr
 
typedef std::shared_ptr< HLTGlobalStatusTrigResPtr
 
typedef std::vector< std::string > vstring
 
typedef std::shared_ptr< WorkerWorkerPtr
 
typedef std::vector< Worker * > Workers
 

Public Member Functions

AllWorkers const & allWorkers () const
 returns the collection of pointers to workers More...
 
void availablePaths (std::vector< std::string > &oLabelsToFill) const
 adds to oLabelsToFill the labels for all paths in the process More...
 
void beginStream ()
 
void clearCounters ()
 Clear all the counters in the trigger report. More...
 
StreamContext const & context () const
 
void deleteModule (std::string const &iLabel)
 Delete the module with label iLabel. More...
 
void endStream ()
 
std::vector< ModuleDescription const * > getAllModuleDescriptions () const
 
void getTriggerReport (TriggerReport &rep) const
 
void initializeEarlyDelete (ModuleRegistry &modReg, std::vector< std::string > const &branchesToDeleteEarly, edm::ProductRegistry const &preg)
 
void moduleDescriptionsInEndPath (std::string const &iEndPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
 
void moduleDescriptionsInPath (std::string const &iPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
 
void modulesInPath (std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
 adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel More...
 
unsigned int numberOfUnscheduledModules () const
 
void processOneEventAsync (WaitingTaskHolder iTask, EventTransitionInfo &, ServiceToken const &token, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
 
template<typename T >
void processOneStreamAsync (WaitingTaskHolder iTask, typename T::TransitionInfoType &transitionInfo, ServiceToken const &token, bool cleaningUpAfterException=false)
 
void replaceModule (maker::ModuleHolder *iMod, std::string const &iLabel)
 clone the type of module with label iLabel but configure with iPSet. More...
 
StreamID streamID () const
 
 StreamSchedule (std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, StreamID streamID, ProcessContext const *processContext)
 
 StreamSchedule (StreamSchedule const &)=delete
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 

Private Member Functions

ExceptionToActionTable const & actionTable () const
 returns the action table More...
 
void addToAllWorkers (Worker *w)
 
void fillEndPath (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)
 
void fillTrigPath (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)
 
void fillWorkers (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)
 
void finishedPaths (std::atomic< std::exception_ptr *> &, WaitingTaskHolder, EventTransitionInfo &)
 
std::exception_ptr finishProcessOneEvent (std::exception_ptr)
 
void makePathStatusInserters (std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
 
void reportSkipped (EventPrincipal const &ep) const
 
void resetAll ()
 
void resetEarlyDelete ()
 
TrigResConstPtr results () const
 
TrigResPtrresults ()
 
std::vector< Worker * > tryToPlaceConditionalModules (Worker *, std::unordered_set< std::string > &conditionalModules, std::multimap< std::string, edm::BranchDescription const *> const &conditionalModuleBranches, std::multimap< std::string, AliasInfo > const &aliasMap, ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
 

Private Attributes

std::shared_ptr< ActivityRegistryactReg_
 
std::vector< BranchToCountearlyDeleteBranchToCount_
 
std::vector< EarlyDeleteHelperearlyDeleteHelpers_
 
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
 
std::vector< int > empty_end_paths_
 
std::vector< int > empty_trig_paths_
 
TrigPaths end_paths_
 
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
 
unsigned int number_of_unscheduled_modules_
 
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
 
edm::propagate_const< TrigResPtrresults_
 
edm::propagate_const< WorkerPtrresults_inserter_
 
std::atomic< bool > skippingEvent_
 
StreamContext streamContext_
 
StreamID streamID_
 
int total_events_
 
int total_passed_
 
TrigPaths trig_paths_
 
WorkerManager workerManager_
 

Detailed Description

Definition at line 155 of file StreamSchedule.h.

Member Typedef Documentation

◆ AllWorkers

typedef std::vector<Worker*> edm::StreamSchedule::AllWorkers

Definition at line 163 of file StreamSchedule.h.

◆ NonTrigPaths

Definition at line 159 of file StreamSchedule.h.

◆ PathWorkers

Definition at line 167 of file StreamSchedule.h.

◆ TrigPaths

typedef std::vector<Path> edm::StreamSchedule::TrigPaths

Definition at line 158 of file StreamSchedule.h.

◆ TrigResConstPtr

typedef std::shared_ptr<HLTGlobalStatus const> edm::StreamSchedule::TrigResConstPtr

Definition at line 161 of file StreamSchedule.h.

◆ TrigResPtr

Definition at line 160 of file StreamSchedule.h.

◆ vstring

typedef std::vector<std::string> edm::StreamSchedule::vstring

Definition at line 157 of file StreamSchedule.h.

◆ WorkerPtr

typedef std::shared_ptr<Worker> edm::StreamSchedule::WorkerPtr

Definition at line 162 of file StreamSchedule.h.

◆ Workers

typedef std::vector<Worker*> edm::StreamSchedule::Workers

Definition at line 165 of file StreamSchedule.h.

Constructor & Destructor Documentation

◆ StreamSchedule() [1/2]

edm::StreamSchedule::StreamSchedule ( std::shared_ptr< TriggerResultInserter inserter,
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters,
std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &  endPathStatusInserters,
std::shared_ptr< ModuleRegistry modReg,
ParameterSet proc_pset,
service::TriggerNamesService const &  tns,
PreallocationConfiguration const &  prealloc,
ProductRegistry pregistry,
BranchIDListHelper branchIDListHelper,
ExceptionToActionTable const &  actions,
std::shared_ptr< ActivityRegistry areg,
std::shared_ptr< ProcessConfiguration processConfiguration,
StreamID  streamID,
ProcessContext const *  processContext 
)

Definition at line 264 of file StreamSchedule.cc.

References actions, actReg_, addToAllWorkers(), edm::WorkerManager::addToUnscheduledWorkers(), allWorkers(), cms::cuda::assert(), end_paths_, fillEndPath(), fillTrigPath(), edm::propagate_const< T >::get(), edm::service::TriggerNamesService::getEndPaths(), edm::ParameterSet::getParameter(), edm::ParameterSet::getPSetForUpdate(), edm::service::TriggerNamesService::getTrigPaths(), label, makePathStatusInserters(), number_of_unscheduled_modules_, results(), results_inserter_, DBoxMetadataHelper::set_difference(), streamID(), trig_paths_, edm::StreamID::value(), and workerManager_.

279  : workerManager_(modReg, areg, actions),
280  actReg_(areg),
281  results_(new HLTGlobalStatus(tns.getTrigPaths().size())),
283  trig_paths_(),
284  end_paths_(),
285  total_events_(),
286  total_passed_(),
289  streamContext_(streamID_, processContext),
290  skippingEvent_(false) {
291  bool hasPath = false;
292  std::vector<std::string> const& pathNames = tns.getTrigPaths();
293  std::vector<std::string> const& endPathNames = tns.getEndPaths();
294 
295  ConditionalTaskHelper conditionalTaskHelper(
296  proc_pset, preg, &prealloc, processConfiguration, workerManager_, pathNames);
297 
298  int trig_bitpos = 0;
299  trig_paths_.reserve(pathNames.size());
300  for (auto const& trig_name : pathNames) {
301  fillTrigPath(proc_pset,
302  preg,
303  &prealloc,
304  processConfiguration,
305  trig_bitpos,
306  trig_name,
307  results(),
308  endPathNames,
309  conditionalTaskHelper);
310  ++trig_bitpos;
311  hasPath = true;
312  }
313 
314  if (hasPath) {
315  // the results inserter stands alone
316  inserter->setTrigResultForStream(streamID.value(), results());
317 
318  results_inserter_ = makeInserter(actions, actReg_, inserter);
320  }
321 
322  // fill normal endpaths
323  int bitpos = 0;
324  end_paths_.reserve(endPathNames.size());
325  for (auto const& end_path_name : endPathNames) {
326  fillEndPath(
327  proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name, endPathNames, conditionalTaskHelper);
328  ++bitpos;
329  }
330 
331  makePathStatusInserters(pathStatusInserters, endPathStatusInserters, actions);
332 
333  //See if all modules were used
334  std::set<std::string> usedWorkerLabels;
335  for (auto const& worker : allWorkers()) {
336  usedWorkerLabels.insert(worker->description()->moduleLabel());
337  }
338  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
339  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
340  std::vector<std::string> unusedLabels;
341  set_difference(modulesInConfigSet.begin(),
342  modulesInConfigSet.end(),
343  usedWorkerLabels.begin(),
344  usedWorkerLabels.end(),
345  back_inserter(unusedLabels));
346  std::set<std::string> unscheduledLabels;
347  std::vector<std::string> shouldBeUsedLabels;
348  if (!unusedLabels.empty()) {
349  //Need to
350  // 1) create worker
351  // 2) if it is a WorkerT<EDProducer>, add it to our list
352  // 3) hand list to our delayed reader
353  for (auto const& label : unusedLabels) {
354  bool isTracked;
355  ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
356  assert(isTracked);
357  assert(modulePSet != nullptr);
359  *modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
360  }
361  if (!shouldBeUsedLabels.empty()) {
362  std::ostringstream unusedStream;
363  unusedStream << "'" << shouldBeUsedLabels.front() << "'";
364  for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
365  itLabelEnd = shouldBeUsedLabels.end();
366  itLabel != itLabelEnd;
367  ++itLabel) {
368  unusedStream << ",'" << *itLabel << "'";
369  }
370  LogInfo("path") << "The following module labels are not assigned to any path:\n" << unusedStream.str() << "\n";
371  }
372  }
373  number_of_unscheduled_modules_ = unscheduledLabels.size();
374  } // StreamSchedule::StreamSchedule
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)
roAction_t actions[nactions]
Definition: GenABIO.cc:181
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
void addToAllWorkers(Worker *w)
edm::propagate_const< WorkerPtr > results_inserter_
assert(be >=bs)
StreamID streamID() const
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
constexpr element_type const * get() const
char const * label
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)
TrigResConstPtr results() const
StreamContext streamContext_
Log< level::Info, false > LogInfo
edm::propagate_const< TrigResPtr > results_
std::atomic< bool > skippingEvent_
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
unsigned int value() const
Definition: StreamID.h:43
std::vector< std::string > set_difference(std::vector< std::string > const &v1, std::vector< std::string > const &v2)

◆ StreamSchedule() [2/2]

edm::StreamSchedule::StreamSchedule ( StreamSchedule const &  )
delete

Member Function Documentation

◆ actionTable()

ExceptionToActionTable const& edm::StreamSchedule::actionTable ( ) const
inlineprivate

returns the action table

Definition at line 291 of file StreamSchedule.h.

References edm::WorkerManager::actionTable(), and workerManager_.

Referenced by fillEndPath(), fillTrigPath(), and finishedPaths().

291 { return workerManager_.actionTable(); }
WorkerManager workerManager_
ExceptionToActionTable const & actionTable() const
Definition: WorkerManager.h:89

◆ addToAllWorkers()

void edm::StreamSchedule::addToAllWorkers ( Worker w)
private

◆ allWorkers()

AllWorkers const& edm::StreamSchedule::allWorkers ( ) const
inline

returns the collection of pointers to workers

Definition at line 256 of file StreamSchedule.h.

References edm::WorkerManager::allWorkers(), and workerManager_.

Referenced by clearCounters(), getAllModuleDescriptions(), getTriggerReport(), initializeEarlyDelete(), replaceModule(), and StreamSchedule().

256 { return workerManager_.allWorkers(); }
WorkerManager workerManager_
AllWorkers const & allWorkers() const
Definition: WorkerManager.h:85

◆ availablePaths()

void edm::StreamSchedule::availablePaths ( std::vector< std::string > &  oLabelsToFill) const

adds to oLabelsToFill the labels for all paths in the process

Definition at line 1058 of file StreamSchedule.cc.

References edm::Path::name(), HcalDetIdTransform::transform(), and trig_paths_.

1058  {
1059  oLabelsToFill.reserve(trig_paths_.size());
1060  std::transform(trig_paths_.begin(),
1061  trig_paths_.end(),
1062  std::back_inserter(oLabelsToFill),
1063  std::bind(&Path::name, std::placeholders::_1));
1064  }
std::string const & name() const
Definition: Path.h:74
unsigned transform(const HcalDetId &id, unsigned transformCode)

◆ beginStream()

void edm::StreamSchedule::beginStream ( )

Definition at line 829 of file StreamSchedule.cc.

References edm::WorkerManager::beginStream(), streamContext_, streamID_, and workerManager_.

WorkerManager workerManager_
StreamContext streamContext_
void beginStream(StreamID iID, StreamContext &streamContext)

◆ clearCounters()

void edm::StreamSchedule::clearCounters ( )

Clear all the counters in the trigger report.

Definition at line 1190 of file StreamSchedule.cc.

References allWorkers(), edm::Path::clearCounters(), edm::Worker::clearCounters(), end_paths_, edm::for_all(), total_events_, total_passed_, and trig_paths_.

1190  {
1191  using std::placeholders::_1;
1193  for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
1194  for_all(end_paths_, std::bind(&Path::clearCounters, _1));
1195  for_all(allWorkers(), std::bind(&Worker::clearCounters, _1));
1196  }
void clearCounters()
Definition: Worker.h:222
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
void clearCounters()
Definition: Path.cc:198

◆ context()

StreamContext const& edm::StreamSchedule::context ( ) const
inline

Definition at line 260 of file StreamSchedule.h.

References streamContext_.

260 { return streamContext_; }
StreamContext streamContext_

◆ deleteModule()

void edm::StreamSchedule::deleteModule ( std::string const &  iLabel)

Delete the module with label iLabel.

Definition at line 849 of file StreamSchedule.cc.

References edm::WorkerManager::deleteModuleIfExists(), and workerManager_.

void deleteModuleIfExists(std::string const &moduleLabel)
WorkerManager workerManager_

◆ endStream()

void edm::StreamSchedule::endStream ( )

Definition at line 831 of file StreamSchedule.cc.

References edm::WorkerManager::endStream(), streamContext_, streamID_, and workerManager_.

void endStream(StreamID iID, StreamContext &streamContext)
WorkerManager workerManager_
StreamContext streamContext_

◆ fillEndPath()

void edm::StreamSchedule::fillEndPath ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
int  bitpos,
std::string const &  name,
std::vector< std::string > const &  endPathNames,
ConditionalTaskHelper const &  conditionalTaskHelper 
)
private

Definition at line 798 of file StreamSchedule.cc.

References actionTable(), actReg_, addToAllWorkers(), empty_end_paths_, end_paths_, fillWorkers(), edm::PathContext::kEndPath, Skims_PA_cff::name, and streamContext_.

Referenced by StreamSchedule().

805  {
806  PathWorkers tmpworkers;
807  fillWorkers(
808  proc_pset, preg, prealloc, processConfiguration, name, true, tmpworkers, endPathNames, conditionalTaskHelper);
809 
810  if (!tmpworkers.empty()) {
811  //EndPaths are not supposed to stop if SkipEvent type exception happens
812  end_paths_.emplace_back(bitpos,
813  name,
814  tmpworkers,
815  TrigResPtr(),
816  actionTable(),
817  actReg_,
819  nullptr,
821  } else {
822  empty_end_paths_.push_back(bitpos);
823  }
824  for (WorkerInPath const& workerInPath : tmpworkers) {
825  addToAllWorkers(workerInPath.getWorker());
826  }
827  }
ExceptionToActionTable const & actionTable() const
returns the action table
std::shared_ptr< HLTGlobalStatus > TrigResPtr
void addToAllWorkers(Worker *w)
std::shared_ptr< ActivityRegistry > actReg_
std::vector< WorkerInPath > PathWorkers
std::vector< int > empty_end_paths_
StreamContext streamContext_
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)

◆ fillTrigPath()

void edm::StreamSchedule::fillTrigPath ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
int  bitpos,
std::string const &  name,
TrigResPtr  trptr,
std::vector< std::string > const &  endPathNames,
ConditionalTaskHelper const &  conditionalTaskHelper 
)
private

Definition at line 766 of file StreamSchedule.cc.

References actionTable(), actReg_, addToAllWorkers(), empty_trig_paths_, fillWorkers(), edm::PathContext::kPath, Skims_PA_cff::name, skippingEvent_, streamContext_, and trig_paths_.

Referenced by StreamSchedule().

774  {
775  PathWorkers tmpworkers;
776  fillWorkers(
777  proc_pset, preg, prealloc, processConfiguration, name, false, tmpworkers, endPathNames, conditionalTaskHelper);
778 
779  // an empty path will cause an extra bit that is not used
780  if (!tmpworkers.empty()) {
781  trig_paths_.emplace_back(bitpos,
782  name,
783  tmpworkers,
784  trptr,
785  actionTable(),
786  actReg_,
790  } else {
791  empty_trig_paths_.push_back(bitpos);
792  }
793  for (WorkerInPath const& workerInPath : tmpworkers) {
794  addToAllWorkers(workerInPath.getWorker());
795  }
796  }
ExceptionToActionTable const & actionTable() const
returns the action table
std::vector< int > empty_trig_paths_
void addToAllWorkers(Worker *w)
std::shared_ptr< ActivityRegistry > actReg_
std::vector< WorkerInPath > PathWorkers
StreamContext streamContext_
std::atomic< bool > skippingEvent_
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)

◆ fillWorkers()

void edm::StreamSchedule::fillWorkers ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
std::string const &  name,
bool  ignoreFilters,
PathWorkers out,
std::vector< std::string > const &  endPathNames,
ConditionalTaskHelper const &  conditionalTaskHelper 
)
private

Definition at line 662 of file StreamSchedule.cc.

References edm::ConditionalTaskHelper::aliasMap(), edm::ConditionalTaskHelper::conditionalModuleBranches(), edm::errors::Configuration, edm::Worker::description(), Exception, spr::find(), edm::ParameterSet::getParameter(), edm::ParameterSet::getUntrackedParameter(), edm::WorkerInPath::Ignore, edm::Worker::kFilter, HerwigMaxPtPartonFilter_cfi::moduleLabel, edm::ModuleDescription::moduleName(), edm::Worker::moduleType(), Skims_PA_cff::name, edm::WorkerInPath::Normal, or, MillePedeFileConverter_cfg::out, hltMonBTagIPClient_cfi::pathName, edm::search_all(), AlCaHLTBitMon_QueryRunRegistry::string, tryToPlaceConditionalModules(), edm::WorkerInPath::Veto, and workerManager_.

Referenced by fillEndPath(), and fillTrigPath().

670  {
671  vstring modnames = proc_pset.getParameter<vstring>(pathName);
672  PathWorkers tmpworkers;
673 
674  //Pull out ConditionalTask modules
675  auto itCondBegin = std::find(modnames.begin(), modnames.end(), "#");
676 
677  std::unordered_set<std::string> conditionalmods;
678  //An EDAlias may be redirecting to a module on a ConditionalTask
679  std::multimap<std::string, AliasInfo> aliasMap;
680  std::multimap<std::string, edm::BranchDescription const*> conditionalModsBranches;
681  std::unordered_map<std::string, unsigned int> conditionalModOrder;
682  if (itCondBegin != modnames.end()) {
683  for (auto it = itCondBegin + 1; it != modnames.begin() + modnames.size() - 1; ++it) {
684  // ordering needs to skip the # token in the path list
685  conditionalModOrder.emplace(*it, it - modnames.begin() - 1);
686  }
687  //the last entry should be ignored since it is required to be "@"
688  conditionalmods = std::unordered_set<std::string>(
689  std::make_move_iterator(itCondBegin + 1), std::make_move_iterator(modnames.begin() + modnames.size() - 1));
690 
691  conditionalModsBranches = conditionalTaskHelper.conditionalModuleBranches(conditionalmods);
692  }
693  modnames.erase(itCondBegin, modnames.end());
694 
695  unsigned int placeInPath = 0;
696  for (auto const& name : modnames) {
697  //Modules except EDFilters are set to run concurrently by default
698  bool doNotRunConcurrently = false;
700  if (name[0] == '!') {
701  filterAction = WorkerInPath::Veto;
702  } else if (name[0] == '-' or name[0] == '+') {
703  filterAction = WorkerInPath::Ignore;
704  }
705  if (name[0] == '|' or name[0] == '+') {
706  //cms.wait was specified so do not run concurrently
707  doNotRunConcurrently = true;
708  }
709 
711  if (filterAction != WorkerInPath::Normal or name[0] == '|') {
712  moduleLabel.erase(0, 1);
713  }
714 
715  Worker* worker = getWorker(moduleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration);
716  if (worker == nullptr) {
717  std::string pathType("endpath");
718  if (!search_all(endPathNames, pathName)) {
719  pathType = std::string("path");
720  }
722  << "The unknown module label \"" << moduleLabel << "\" appears in " << pathType << " \"" << pathName
723  << "\"\n please check spelling or remove that label from the path.";
724  }
725 
726  if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType() == Worker::kFilter) {
727  // We have a filter on an end path, and the filter is not explicitly ignored.
728  // See if the filter is allowed.
729  std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
730  if (!search_all(allowed_filters, worker->description()->moduleName())) {
731  // Filter is not allowed. Ignore the result, and issue a warning.
732  filterAction = WorkerInPath::Ignore;
733  LogWarning("FilterOnEndPath") << "The EDFilter '" << worker->description()->moduleName()
734  << "' with module label '" << moduleLabel << "' appears on EndPath '"
735  << pathName << "'.\n"
736  << "The return value of the filter will be ignored.\n"
737  << "To suppress this warning, either remove the filter from the endpath,\n"
738  << "or explicitly ignore it in the configuration by using cms.ignore().\n";
739  }
740  }
741  bool runConcurrently = not doNotRunConcurrently;
742  if (runConcurrently && worker->moduleType() == Worker::kFilter and filterAction != WorkerInPath::Ignore) {
743  runConcurrently = false;
744  }
745 
746  auto condModules = tryToPlaceConditionalModules(worker,
747  conditionalmods,
748  conditionalModsBranches,
749  conditionalTaskHelper.aliasMap(),
750  proc_pset,
751  preg,
752  prealloc,
753  processConfiguration);
754  for (auto condMod : condModules) {
755  tmpworkers.emplace_back(
756  condMod, WorkerInPath::Ignore, conditionalModOrder[condMod->description()->moduleLabel()], true);
757  }
758 
759  tmpworkers.emplace_back(worker, filterAction, placeInPath, runConcurrently);
760  ++placeInPath;
761  }
762 
763  out.swap(tmpworkers);
764  }
vector< string > vstring
Definition: ExoticaDQM.cc:8
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:19
std::vector< Worker * > tryToPlaceConditionalModules(Worker *, std::unordered_set< std::string > &conditionalModules, std::multimap< std::string, edm::BranchDescription const *> const &conditionalModuleBranches, std::multimap< std::string, AliasInfo > const &aliasMap, ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
WorkerManager workerManager_
std::vector< WorkerInPath > PathWorkers
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:36
Log< level::Warning, false > LogWarning

◆ finishedPaths()

void edm::StreamSchedule::finishedPaths ( std::atomic< std::exception_ptr *> &  iExcept,
WaitingTaskHolder  iWait,
EventTransitionInfo info 
)
private

Definition at line 968 of file StreamSchedule.cc.

References writedatasetfile::action, actionTable(), cms::Exception::addContext(), cms::cuda::assert(), CMS_SA_ALLOW, cms::Exception::context(), edm::WaitingTaskHolder::doneWaiting(), MillePedeFileConverter_cfg::e, edm::exception_actions::FailPath, edm::ExceptionToActionTable::find(), edm::propagate_const< T >::get(), edm::exception_actions::IgnoreCompletely, info(), edm::printCmsExceptionWarning(), results_, results_inserter_, edm::exception_actions::SkipEvent, streamContext_, streamID_, and total_passed_.

Referenced by processOneEventAsync().

970  {
971  if (iExcept) {
972  // Caught exception is propagated via WaitingTaskHolder
973  CMS_SA_ALLOW try { std::rethrow_exception(*(iExcept.load())); } catch (cms::Exception& e) {
978  edm::printCmsExceptionWarning("SkipEvent", e);
979  *(iExcept.load()) = std::exception_ptr();
980  } else {
981  *(iExcept.load()) = std::current_exception();
982  }
983  } catch (...) {
984  *(iExcept.load()) = std::current_exception();
985  }
986  }
987 
988  if ((not iExcept) and results_->accept()) {
989  ++total_passed_;
990  }
991 
992  if (nullptr != results_inserter_.get()) {
993  // Caught exception is propagated to the caller
994  CMS_SA_ALLOW try {
995  //Even if there was an exception, we need to allow results inserter
996  // to run since some module may be waiting on its results.
997  ParentContext parentContext(&streamContext_);
998  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
999 
1000  auto expt = results_inserter_->runModuleDirectly<Traits>(info, streamID_, parentContext, &streamContext_);
1001  if (expt) {
1002  std::rethrow_exception(expt);
1003  }
1004  } catch (cms::Exception& ex) {
1005  if (not iExcept) {
1006  if (ex.context().empty()) {
1007  std::ostringstream ost;
1008  ost << "Processing Event " << info.principal().id();
1009  ex.addContext(ost.str());
1010  }
1011  iExcept.store(new std::exception_ptr(std::current_exception()));
1012  }
1013  } catch (...) {
1014  if (not iExcept) {
1015  iExcept.store(new std::exception_ptr(std::current_exception()));
1016  }
1017  }
1018  }
1019  std::exception_ptr ptr;
1020  if (iExcept) {
1021  ptr = *iExcept.load();
1022  }
1023  iWait.doneWaiting(ptr);
1024  }
ExceptionToActionTable const & actionTable() const
returns the action table
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
edm::propagate_const< WorkerPtr > results_inserter_
assert(be >=bs)
constexpr element_type const * get() const
StreamContext streamContext_
exception_actions::ActionCodes find(const std::string &category) const
edm::propagate_const< TrigResPtr > results_
void addContext(std::string const &context)
Definition: Exception.cc:165
std::list< std::string > const & context() const
Definition: Exception.cc:147
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)

◆ finishProcessOneEvent()

std::exception_ptr edm::StreamSchedule::finishProcessOneEvent ( std::exception_ptr  iExcept)
private

Definition at line 1026 of file StreamSchedule.cc.

References actReg_, edm::addContextAndPrintException(), CMS_SA_ALLOW, cms::Exception::context(), edm::ExceptionFromThisContext, resetEarlyDelete(), streamContext_, and edm::convertException::wrap().

Referenced by processOneEventAsync().

1026  {
1027  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
1028 
1029  if (iExcept) {
1030  //add context information to the exception and print message
1031  try {
1032  convertException::wrap([&]() { std::rethrow_exception(iExcept); });
1033  } catch (cms::Exception& ex) {
1034  bool const cleaningUpAfterException = false;
1035  if (ex.context().empty()) {
1036  addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
1037  } else {
1038  addContextAndPrintException("", ex, cleaningUpAfterException);
1039  }
1040  iExcept = std::current_exception();
1041  }
1042 
1043  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
1044  }
1045  // Caught exception is propagated to the caller
1046  CMS_SA_ALLOW try { Traits::postScheduleSignal(actReg_.get(), &streamContext_); } catch (...) {
1047  if (not iExcept) {
1048  iExcept = std::current_exception();
1049  }
1050  }
1051  if (not iExcept) {
1052  resetEarlyDelete();
1053  }
1054 
1055  return iExcept;
1056  }
#define CMS_SA_ALLOW
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
std::shared_ptr< ActivityRegistry > actReg_
StreamContext streamContext_
auto wrap(F iFunc) -> decltype(iFunc())
std::list< std::string > const & context() const
Definition: Exception.cc:147

◆ getAllModuleDescriptions()

std::vector< ModuleDescription const * > edm::StreamSchedule::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this StreamSchedule. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 851 of file StreamSchedule.cc.

References allWorkers(), AlCaHLTBitMon_ParallelJobs::p, mps_fire::result, and findQualityFiles::size.

851  {
852  std::vector<ModuleDescription const*> result;
853  result.reserve(allWorkers().size());
854 
855  for (auto const& worker : allWorkers()) {
856  ModuleDescription const* p = worker->description();
857  result.push_back(p);
858  }
859  return result;
860  }
size
Write out results.
AllWorkers const & allWorkers() const
returns the collection of pointers to workers

◆ getTriggerReport()

void edm::StreamSchedule::getTriggerReport ( TriggerReport rep) const

Return the trigger report information on paths, modules-in-path, modules-in-endpath, and modules.

Definition at line 1180 of file StreamSchedule.cc.

References allWorkers(), end_paths_, edm::fillPathSummary(), edm::fillWorkerSummary(), cuy::rep, totalEvents(), totalEventsFailed(), totalEventsPassed(), and trig_paths_.

1180  {
1181  rep.eventSummary.totalEvents += totalEvents();
1182  rep.eventSummary.totalEventsPassed += totalEventsPassed();
1183  rep.eventSummary.totalEventsFailed += totalEventsFailed();
1184 
1185  fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
1186  fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
1187  fill_summary(allWorkers(), rep.workerSummaries, &fillWorkerSummary);
1188  }
int totalEventsPassed() const
int totalEventsFailed() const
static void fillPathSummary(Path const &path, PathSummary &sum)
rep
Definition: cuy.py:1189
int totalEvents() const
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
AllWorkers const & allWorkers() const
returns the collection of pointers to workers

◆ initializeEarlyDelete()

void edm::StreamSchedule::initializeEarlyDelete ( ModuleRegistry modReg,
std::vector< std::string > const &  branchesToDeleteEarly,
edm::ProductRegistry const &  preg 
)

Definition at line 376 of file StreamSchedule.cc.

References allWorkers(), MicroEventContent_cff::branch, edm::maker::ModuleHolder::createOutputModuleCommunicator(), dumpMFGeometry_cfg::delta, submitPVResolutionJobs::desc, earlyDeleteBranchToCount_, earlyDeleteHelpers_, earlyDeleteHelperToBranchIndicies_, end_paths_, edm::ModuleRegistry::forAllModuleHolders(), newFWLiteAna::found, edm::pset::Registry::getMapped(), edm::InEvent, edm::pset::Registry::instance(), B2GTnPMonitor_cfi::item, gpuClustering::pixelStatus::kEmpty, cmsLHEtoEOSManager::l, dqmiodumpmetadata::n, AlCaHLTBitMon_ParallelJobs::p, muonDTDigis_cfi::pset, resetEarlyDelete(), AlCaHLTBitMon_QueryRunRegistry::string, groupFilesInBlocks::temp, trig_paths_, mitigatedMETSequence_cff::U, and w().

378  {
379  // setup the list with those products actually registered for this job
380  std::multimap<std::string, Worker*> branchToReadingWorker;
381  initializeBranchToReadingWorker(branchesToDeleteEarly, preg, branchToReadingWorker);
382 
383  const std::vector<std::string> kEmpty;
384  std::map<Worker*, unsigned int> reserveSizeForWorker;
385  unsigned int upperLimitOnReadingWorker = 0;
386  unsigned int upperLimitOnIndicies = 0;
387  unsigned int nUniqueBranchesToDelete = branchToReadingWorker.size();
388 
389  //talk with output modules first
390  modReg.forAllModuleHolders([&branchToReadingWorker, &nUniqueBranchesToDelete](maker::ModuleHolder* iHolder) {
391  auto comm = iHolder->createOutputModuleCommunicator();
392  if (comm) {
393  if (!branchToReadingWorker.empty()) {
394  //If an OutputModule needs a product, we can't delete it early
395  // so we should remove it from our list
396  SelectedProductsForBranchType const& kept = comm->keptProducts();
397  for (auto const& item : kept[InEvent]) {
398  BranchDescription const& desc = *item.first;
399  auto found = branchToReadingWorker.equal_range(desc.branchName());
400  if (found.first != found.second) {
401  --nUniqueBranchesToDelete;
402  branchToReadingWorker.erase(found.first, found.second);
403  }
404  }
405  }
406  }
407  });
408 
409  if (branchToReadingWorker.empty()) {
410  return;
411  }
412 
413  for (auto w : allWorkers()) {
414  //determine if this module could read a branch we want to delete early
415  auto pset = pset::Registry::instance()->getMapped(w->description()->parameterSetID());
416  if (nullptr != pset) {
417  auto branches = pset->getUntrackedParameter<std::vector<std::string>>("mightGet", kEmpty);
418  if (not branches.empty()) {
419  ++upperLimitOnReadingWorker;
420  }
421  for (auto const& branch : branches) {
422  auto found = branchToReadingWorker.equal_range(branch);
423  if (found.first != found.second) {
424  ++upperLimitOnIndicies;
425  ++reserveSizeForWorker[w];
426  if (nullptr == found.first->second) {
427  found.first->second = w;
428  } else {
429  branchToReadingWorker.insert(make_pair(found.first->first, w));
430  }
431  }
432  }
433  }
434  }
435  {
436  auto it = branchToReadingWorker.begin();
437  std::vector<std::string> unusedBranches;
438  while (it != branchToReadingWorker.end()) {
439  if (it->second == nullptr) {
440  unusedBranches.push_back(it->first);
441  //erasing the object invalidates the iterator so must advance it first
442  auto temp = it;
443  ++it;
444  branchToReadingWorker.erase(temp);
445  } else {
446  ++it;
447  }
448  }
449  if (not unusedBranches.empty()) {
450  LogWarning l("UnusedProductsForCanDeleteEarly");
451  l << "The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
452  " If possible, remove the producer from the job or add the product to the producer's own 'mightGet' list.";
453  for (auto const& n : unusedBranches) {
454  l << "\n " << n;
455  }
456  }
457  }
458  if (!branchToReadingWorker.empty()) {
459  earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
460  earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies, 0);
461  earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
462  std::map<const Worker*, EarlyDeleteHelper*> alreadySeenWorkers;
463  std::string lastBranchName;
464  size_t nextOpenIndex = 0;
465  unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
466  for (auto& branchAndWorker : branchToReadingWorker) {
467  if (lastBranchName != branchAndWorker.first) {
468  //have to put back the period we removed earlier in order to get the proper name
469  BranchID bid(branchAndWorker.first + ".");
470  earlyDeleteBranchToCount_.emplace_back(bid, 0U);
471  lastBranchName = branchAndWorker.first;
472  }
473  auto found = alreadySeenWorkers.find(branchAndWorker.second);
474  if (alreadySeenWorkers.end() == found) {
475  //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
476  // all the branches that might be read by this worker. However, initially we will only tell the
477  // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
478  // EarlyDeleteHelper will automatically advance its internal end pointer.
479  size_t index = nextOpenIndex;
480  size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
482  earlyDeleteHelpers_.emplace_back(beginAddress + index, beginAddress + index + 1, &earlyDeleteBranchToCount_);
483  branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
484  alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second, &(earlyDeleteHelpers_.back())));
485  nextOpenIndex += nIndices;
486  } else {
487  found->second->appendIndex(earlyDeleteBranchToCount_.size() - 1);
488  }
489  }
490 
491  //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
492  // space needed for each module
493  auto itLast = earlyDeleteHelpers_.begin();
494  for (auto it = earlyDeleteHelpers_.begin() + 1; it != earlyDeleteHelpers_.end(); ++it) {
495  if (itLast->end() != it->begin()) {
496  //figure the offset for next Worker since it hasn't been moved yet so it has the original address
497  unsigned int delta = it->begin() - itLast->end();
498  it->shiftIndexPointers(delta);
499 
501  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
502  earlyDeleteHelperToBranchIndicies_.begin() + (it->begin() - beginAddress));
503  }
504  itLast = it;
505  }
507  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
509 
510  //now tell the paths about the deleters
511  for (auto& p : trig_paths_) {
512  p.setEarlyDeleteHelpers(alreadySeenWorkers);
513  }
514  for (auto& p : end_paths_) {
515  p.setEarlyDeleteHelpers(alreadySeenWorkers);
516  }
518  }
519  }
bool getMapped(key_type const &k, value_type &result) const
Definition: Registry.cc:17
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
T w() const
std::vector< BranchToCount > earlyDeleteBranchToCount_
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
Log< level::Warning, false > LogWarning
static Registry * instance()
Definition: Registry.cc:12
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_

◆ makePathStatusInserters()

void edm::StreamSchedule::makePathStatusInserters ( std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters,
std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &  endPathStatusInserters,
ExceptionToActionTable const &  actions 
)
private

Definition at line 1219 of file StreamSchedule.cc.

References actions, actReg_, addToAllWorkers(), empty_end_paths_, empty_trig_paths_, end_paths_, endPathStatusInserterWorkers_, edm::get_underlying(), pathStatusInserterWorkers_, and trig_paths_.

Referenced by StreamSchedule().

1222  {
1223  int bitpos = 0;
1224  unsigned int indexEmpty = 0;
1225  unsigned int indexOfPath = 0;
1226  for (auto& pathStatusInserter : pathStatusInserters) {
1227  std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
1228  WorkerPtr workerPtr(
1229  new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1230  pathStatusInserterWorkers_.emplace_back(workerPtr);
1231  workerPtr->setActivityRegistry(actReg_);
1232  addToAllWorkers(workerPtr.get());
1233 
1234  // A little complexity here because a C++ Path object is not
1235  // instantiated and put into end_paths if there are no modules
1236  // on the configured path.
1237  if (indexEmpty < empty_trig_paths_.size() && bitpos == empty_trig_paths_.at(indexEmpty)) {
1238  ++indexEmpty;
1239  } else {
1240  trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(), workerPtr.get());
1241  ++indexOfPath;
1242  }
1243  ++bitpos;
1244  }
1245 
1246  bitpos = 0;
1247  indexEmpty = 0;
1248  indexOfPath = 0;
1249  for (auto& endPathStatusInserter : endPathStatusInserters) {
1250  std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
1251  WorkerPtr workerPtr(
1252  new edm::WorkerT<EndPathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1253  endPathStatusInserterWorkers_.emplace_back(workerPtr);
1254  workerPtr->setActivityRegistry(actReg_);
1255  addToAllWorkers(workerPtr.get());
1256 
1257  // A little complexity here because a C++ Path object is not
1258  // instantiated and put into end_paths if there are no modules
1259  // on the configured path.
1260  if (indexEmpty < empty_end_paths_.size() && bitpos == empty_end_paths_.at(indexEmpty)) {
1261  ++indexEmpty;
1262  } else {
1263  end_paths_.at(indexOfPath).setPathStatusInserter(nullptr, workerPtr.get());
1264  ++indexOfPath;
1265  }
1266  ++bitpos;
1267  }
1268  }
std::vector< int > empty_trig_paths_
roAction_t actions[nactions]
Definition: GenABIO.cc:181
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void addToAllWorkers(Worker *w)
std::shared_ptr< Worker > WorkerPtr
std::shared_ptr< ActivityRegistry > actReg_
std::vector< int > empty_end_paths_
constexpr T & get_underlying(propagate_const< T > &)
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_

◆ moduleDescriptionsInEndPath()

void edm::StreamSchedule::moduleDescriptionsInEndPath ( std::string const &  iEndPathLabel,
std::vector< ModuleDescription const *> &  descriptions,
unsigned int  hint 
) const

Definition at line 1108 of file StreamSchedule.cc.

References end_paths_, newFWLiteAna::found, mps_fire::i, and edm::Path::name().

1110  {
1111  descriptions.clear();
1112  bool found = false;
1113  TrigPaths::const_iterator itFound;
1114 
1115  if (hint < end_paths_.size()) {
1116  itFound = end_paths_.begin() + hint;
1117  if (itFound->name() == iEndPathLabel)
1118  found = true;
1119  }
1120  if (!found) {
1121  // if the hint did not work, do it the slow way
1122  itFound = std::find_if(
1123  end_paths_.begin(),
1124  end_paths_.end(),
1125  std::bind(std::equal_to<std::string>(), iEndPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1126  if (itFound != end_paths_.end())
1127  found = true;
1128  }
1129  if (found) {
1130  descriptions.reserve(itFound->size());
1131  for (size_t i = 0; i < itFound->size(); ++i) {
1132  descriptions.push_back(itFound->getWorker(i)->description());
1133  }
1134  }
1135  }
std::string const & name() const
Definition: Path.h:74

◆ moduleDescriptionsInPath()

void edm::StreamSchedule::moduleDescriptionsInPath ( std::string const &  iPathLabel,
std::vector< ModuleDescription const *> &  descriptions,
unsigned int  hint 
) const

Definition at line 1079 of file StreamSchedule.cc.

References newFWLiteAna::found, mps_fire::i, edm::Path::name(), and trig_paths_.

1081  {
1082  descriptions.clear();
1083  bool found = false;
1084  TrigPaths::const_iterator itFound;
1085 
1086  if (hint < trig_paths_.size()) {
1087  itFound = trig_paths_.begin() + hint;
1088  if (itFound->name() == iPathLabel)
1089  found = true;
1090  }
1091  if (!found) {
1092  // if the hint did not work, do it the slow way
1093  itFound = std::find_if(
1094  trig_paths_.begin(),
1095  trig_paths_.end(),
1096  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1097  if (itFound != trig_paths_.end())
1098  found = true;
1099  }
1100  if (found) {
1101  descriptions.reserve(itFound->size());
1102  for (size_t i = 0; i < itFound->size(); ++i) {
1103  descriptions.push_back(itFound->getWorker(i)->description());
1104  }
1105  }
1106  }
std::string const & name() const
Definition: Path.h:74

◆ modulesInPath()

void edm::StreamSchedule::modulesInPath ( std::string const &  iPathLabel,
std::vector< std::string > &  oLabelsToFill 
) const

adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel

Definition at line 1066 of file StreamSchedule.cc.

References mps_fire::i, edm::Path::name(), and trig_paths_.

1066  {
1067  TrigPaths::const_iterator itFound = std::find_if(
1068  trig_paths_.begin(),
1069  trig_paths_.end(),
1070  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1071  if (itFound != trig_paths_.end()) {
1072  oLabelsToFill.reserve(itFound->size());
1073  for (size_t i = 0; i < itFound->size(); ++i) {
1074  oLabelsToFill.push_back(itFound->getWorker(i)->description()->moduleLabel());
1075  }
1076  }
1077  }
std::string const & name() const
Definition: Path.h:74

◆ numberOfUnscheduledModules()

unsigned int edm::StreamSchedule::numberOfUnscheduledModules ( ) const
inline

Definition at line 258 of file StreamSchedule.h.

References number_of_unscheduled_modules_.

unsigned int number_of_unscheduled_modules_

◆ processOneEventAsync()

void edm::StreamSchedule::processOneEventAsync ( WaitingTaskHolder  iTask,
EventTransitionInfo info,
ServiceToken const &  token,
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters 
)

Definition at line 862 of file StreamSchedule.cc.

References actReg_, CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), empty_end_paths_, empty_trig_paths_, end_paths_, endPathStatusInserterWorkers_, SiStripBadComponentsDQMServiceTemplate_cfg::ep, finishedPaths(), finishProcessOneEvent(), edm::WaitingTaskHolder::group(), info(), edm::ServiceWeakToken::lock(), edm::make_waiting_task(), eostools::move(), edm::hlt::Pass, pathStatusInserterWorkers_, edm::WorkerManager::processAccumulatorsAsync(), resetAll(), results_, edm::WorkerManager::setupOnDemandSystem(), edm::WorkerManager::setupResolvers(), streamContext_, streamID_, total_events_, trig_paths_, and workerManager_.

866  {
867  EventPrincipal& ep = info.principal();
868 
869  // Caught exception is propagated via WaitingTaskHolder
870  CMS_SA_ALLOW try {
871  this->resetAll();
872 
873  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
874 
875  Traits::setStreamContext(streamContext_, ep);
876  //a service may want to communicate with another service
877  ServiceRegistry::Operate guard(serviceToken);
878  Traits::preScheduleSignal(actReg_.get(), &streamContext_);
879 
880  // Data dependencies need to be set up before marking empty
881  // (End)Paths complete in case something consumes the status of
882  // the empty (EndPath)
885 
886  HLTPathStatus hltPathStatus(hlt::Pass, 0);
887  for (int empty_trig_path : empty_trig_paths_) {
888  results_->at(empty_trig_path) = hltPathStatus;
889  pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
890  std::exception_ptr except = pathStatusInserterWorkers_[empty_trig_path]
891  ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
892  info, streamID_, ParentContext(&streamContext_), &streamContext_);
893  if (except) {
894  iTask.doneWaiting(except);
895  return;
896  }
897  }
898  for (int empty_end_path : empty_end_paths_) {
899  std::exception_ptr except = endPathStatusInserterWorkers_[empty_end_path]
900  ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
901  info, streamID_, ParentContext(&streamContext_), &streamContext_);
902  if (except) {
903  iTask.doneWaiting(except);
904  return;
905  }
906  }
907 
908  ++total_events_;
909 
910  //use to give priorities on an error to ones from Paths
911  auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(nullptr);
912  auto pathErrorPtr = pathErrorHolder.get();
913  ServiceWeakToken weakToken = serviceToken;
914  auto allPathsDone = make_waiting_task(
915  [iTask, this, weakToken, pathError = std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable {
916  ServiceRegistry::Operate operate(weakToken.lock());
917 
918  std::exception_ptr ptr;
919  if (pathError->load()) {
920  ptr = *pathError->load();
921  delete pathError->load();
922  }
923  if ((not ptr) and iPtr) {
924  ptr = *iPtr;
925  }
926  iTask.doneWaiting(finishProcessOneEvent(ptr));
927  });
928  //The holder guarantees that if the paths finish before the loop ends
929  // that we do not start too soon. It also guarantees that the task will
930  // run under that condition.
931  WaitingTaskHolder allPathsHolder(*iTask.group(), allPathsDone);
932 
933  auto pathsDone = make_waiting_task([allPathsHolder, pathErrorPtr, transitionInfo = info, this, weakToken](
934  std::exception_ptr const* iPtr) mutable {
935  ServiceRegistry::Operate operate(weakToken.lock());
936 
937  if (iPtr) {
938  //this is used to prioritize this error over one
939  // that happens in EndPath or Accumulate
940  pathErrorPtr->store(new std::exception_ptr(*iPtr));
941  }
942  finishedPaths(*pathErrorPtr, std::move(allPathsHolder), transitionInfo);
943  });
944 
945  //The holder guarantees that if the paths finish before the loop ends
946  // that we do not start too soon. It also guarantees that the task will
947  // run under that condition.
948  WaitingTaskHolder taskHolder(*iTask.group(), pathsDone);
949 
950  //start end paths first so on single threaded the paths will run first
951  WaitingTaskHolder hAllPathsDone(*iTask.group(), allPathsDone);
952  for (auto it = end_paths_.rbegin(), itEnd = end_paths_.rend(); it != itEnd; ++it) {
953  it->processOneOccurrenceAsync(hAllPathsDone, info, serviceToken, streamID_, &streamContext_);
954  }
955 
956  for (auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend(); it != itEnd; ++it) {
957  it->processOneOccurrenceAsync(taskHolder, info, serviceToken, streamID_, &streamContext_);
958  }
959 
960  ParentContext parentContext(&streamContext_);
961  workerManager_.processAccumulatorsAsync<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
962  hAllPathsDone, info, serviceToken, streamID_, parentContext, &streamContext_);
963  } catch (...) {
964  iTask.doneWaiting(std::current_exception());
965  }
966  }
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
std::vector< int > empty_trig_paths_
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void processAccumulatorsAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
void setupOnDemandSystem(EventTransitionInfo const &)
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
std::vector< int > empty_end_paths_
accept
Definition: HLTenums.h:18
StreamContext streamContext_
void finishedPaths(std::atomic< std::exception_ptr *> &, WaitingTaskHolder, EventTransitionInfo &)
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:101
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
edm::propagate_const< TrigResPtr > results_
void setupResolvers(Principal &principal)
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
def move(src, dest)
Definition: eostools.py:511

◆ processOneStreamAsync()

template<typename T >
void edm::StreamSchedule::processOneStreamAsync ( WaitingTaskHolder  iTask,
typename T::TransitionInfoType &  transitionInfo,
ServiceToken const &  token,
bool  cleaningUpAfterException = false 
)

Definition at line 393 of file StreamSchedule.h.

References actReg_, edm::addContextAndPrintException(), CMS_SA_ALLOW, cms::Exception::context(), edm::WaitingTaskHolder::doneWaiting(), end_paths_, edm::ExceptionFromThisContext, edm::WaitingTaskHolder::group(), h, triggerObjects_cff::id, info(), edm::ServiceWeakToken::lock(), edm::make_functor_task(), edm::make_waiting_task(), AlCaHLTBitMon_ParallelJobs::p, edm::WorkerManager::processOneOccurrenceAsync(), edm::WorkerManager::resetAll(), alignCSCRings::s, streamContext_, streamID_, TrackValidation_cff::task, unpackBuffers-CaloStage2::token, trig_paths_, edm::StreamID::value(), workerManager_, and edm::convertException::wrap().

396  {
397  auto const& principal = transitionInfo.principal();
398  T::setStreamContext(streamContext_, principal);
399 
400  auto id = principal.id();
401  ServiceWeakToken weakToken = token;
402  auto doneTask = make_waiting_task(
403  [this, iHolder, id, cleaningUpAfterException, weakToken](std::exception_ptr const* iPtr) mutable {
404  std::exception_ptr excpt;
405  if (iPtr) {
406  excpt = *iPtr;
407  //add context information to the exception and print message
408  try {
409  convertException::wrap([&]() { std::rethrow_exception(excpt); });
410  } catch (cms::Exception& ex) {
411  //TODO: should add the transition type info
412  std::ostringstream ost;
413  if (ex.context().empty()) {
414  ost << "Processing " << T::transitionName() << " " << id;
415  }
416  ServiceRegistry::Operate op(weakToken.lock());
417  addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
418  excpt = std::current_exception();
419  }
420 
421  ServiceRegistry::Operate op(weakToken.lock());
422  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
423  }
424  // Caught exception is propagated via WaitingTaskHolder
425  CMS_SA_ALLOW try {
426  ServiceRegistry::Operate op(weakToken.lock());
427  T::postScheduleSignal(actReg_.get(), &streamContext_);
428  } catch (...) {
429  if (not excpt) {
430  excpt = std::current_exception();
431  }
432  }
433  iHolder.doneWaiting(excpt);
434  });
435 
436  auto task = make_functor_task(
437  [this, h = WaitingTaskHolder(*iHolder.group(), doneTask), info = transitionInfo, weakToken]() mutable {
438  auto token = weakToken.lock();
440  // Caught exception is propagated via WaitingTaskHolder
441  CMS_SA_ALLOW try {
442  T::preScheduleSignal(actReg_.get(), &streamContext_);
443 
445  } catch (...) {
446  h.doneWaiting(std::current_exception());
447  return;
448  }
449 
450  for (auto& p : end_paths_) {
451  p.runAllModulesAsync<T>(h, info, token, streamID_, &streamContext_);
452  }
453 
454  for (auto& p : trig_paths_) {
455  p.runAllModulesAsync<T>(h, info, token, streamID_, &streamContext_);
456  }
457 
459  });
460 
461  if (streamID_.value() == 0) {
462  //Enqueueing will start another thread if there is only
463  // one thread in the job. Having stream == 0 use spawn
464  // avoids starting up another thread when there is only one stream.
465  iHolder.group()->run([task]() {
466  TaskSentry s{task};
467  task->execute();
468  });
469  } else {
470  oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
471  arena.enqueue([task]() {
472  TaskSentry s{task};
473  task->execute();
474  });
475  }
476  }
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
void processOneOccurrenceAsync(WaitingTaskHolder, typename T::TransitionInfoType &, ServiceToken const &, StreamID, typename T::Context const *topContext, U const *context)
StreamContext streamContext_
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:101
FunctorTask< F > * make_functor_task(F f)
Definition: FunctorTask.h:44
unsigned int value() const
Definition: StreamID.h:43
auto wrap(F iFunc) -> decltype(iFunc())
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
std::list< std::string > const & context() const
Definition: Exception.cc:147
long double T

◆ replaceModule()

void edm::StreamSchedule::replaceModule ( maker::ModuleHolder iMod,
std::string const &  iLabel 
)

clone the type of module with label iLabel but configure with iPSet.

Definition at line 833 of file StreamSchedule.cc.

References allWorkers(), newFWLiteAna::found, edm::maker::ModuleHolder::replaceModuleFor(), streamContext_, and streamID_.

833  {
834  Worker* found = nullptr;
835  for (auto const& worker : allWorkers()) {
836  if (worker->description()->moduleLabel() == iLabel) {
837  found = worker;
838  break;
839  }
840  }
841  if (nullptr == found) {
842  return;
843  }
844 
845  iMod->replaceModuleFor(found);
846  found->beginStream(streamID_, streamContext_);
847  }
StreamContext streamContext_
AllWorkers const & allWorkers() const
returns the collection of pointers to workers

◆ reportSkipped()

void edm::StreamSchedule::reportSkipped ( EventPrincipal const &  ep) const
inlineprivate

Definition at line 387 of file StreamSchedule.h.

References SiStripBadComponentsDQMServiceTemplate_cfg::ep.

387  {
388  Service<JobReport> reportSvc;
389  reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
390  }

◆ resetAll()

void edm::StreamSchedule::resetAll ( )
private

Definition at line 1198 of file StreamSchedule.cc.

References results_, and skippingEvent_.

Referenced by processOneEventAsync().

1198  {
1199  skippingEvent_ = false;
1200  results_->reset();
1201  }
edm::propagate_const< TrigResPtr > results_
std::atomic< bool > skippingEvent_

◆ resetEarlyDelete()

void edm::StreamSchedule::resetEarlyDelete ( )
private

Definition at line 1205 of file StreamSchedule.cc.

References submitPVResolutionJobs::count, earlyDeleteBranchToCount_, earlyDeleteHelpers_, and earlyDeleteHelperToBranchIndicies_.

Referenced by finishProcessOneEvent(), and initializeEarlyDelete().

1205  {
1206  //must be sure we have cleared the count first
1207  for (auto& count : earlyDeleteBranchToCount_) {
1208  count.count = 0;
1209  }
1210  //now reset based on how many helpers use that branch
1212  ++(earlyDeleteBranchToCount_[index].count);
1213  }
1214  for (auto& helper : earlyDeleteHelpers_) {
1215  helper.reset();
1216  }
1217  }
Definition: helper.py:1
std::vector< BranchToCount > earlyDeleteBranchToCount_
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_

◆ results() [1/2]

TrigResConstPtr edm::StreamSchedule::results ( ) const
inlineprivate

Definition at line 341 of file StreamSchedule.h.

References edm::get_underlying_safe(), and results_.

Referenced by StreamSchedule().

341 { return get_underlying_safe(results_); }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< TrigResPtr > results_

◆ results() [2/2]

TrigResPtr& edm::StreamSchedule::results ( )
inlineprivate

Definition at line 342 of file StreamSchedule.h.

References edm::get_underlying_safe(), and results_.

342 { return get_underlying_safe(results_); }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< TrigResPtr > results_

◆ streamID()

StreamID edm::StreamSchedule::streamID ( ) const
inline

Definition at line 201 of file StreamSchedule.h.

References streamID_.

Referenced by StreamSchedule().

201 { return streamID_; }

◆ totalEvents()

int edm::StreamSchedule::totalEvents ( ) const
inline

Return the number of events this StreamSchedule has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 228 of file StreamSchedule.h.

References total_events_.

Referenced by getTriggerReport(), and totalEventsFailed().

228 { return total_events_; }

◆ totalEventsFailed()

int edm::StreamSchedule::totalEventsFailed ( ) const
inline

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 236 of file StreamSchedule.h.

References totalEvents(), and totalEventsPassed().

Referenced by getTriggerReport().

236 { return totalEvents() - totalEventsPassed(); }
int totalEventsPassed() const
int totalEvents() const

◆ totalEventsPassed()

int edm::StreamSchedule::totalEventsPassed ( ) const
inline

Return the number of events which have been passed by one or more trigger paths.

Definition at line 232 of file StreamSchedule.h.

References total_passed_.

Referenced by getTriggerReport(), and totalEventsFailed().

232 { return total_passed_; }

◆ tryToPlaceConditionalModules()

std::vector< Worker * > edm::StreamSchedule::tryToPlaceConditionalModules ( Worker worker,
std::unordered_set< std::string > &  conditionalModules,
std::multimap< std::string, edm::BranchDescription const *> const &  conditionalModuleBranches,
std::multimap< std::string, AliasInfo > const &  aliasMap,
ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration 
)
private

Definition at line 521 of file StreamSchedule.cc.

References cms::cuda::assert(), MicroEventContent_cff::branch, edm::TypeID::className(), edm::Worker::consumesInfo(), edm::Worker::description(), edm::ELEMENT_TYPE, HerwigMaxPtPartonFilter_cfi::moduleLabel, edm::ModuleDescription::moduleLabel(), or, edm::PRODUCT_TYPE, edm::productholderindexhelper::typeIsViewCompatible(), and workerManager_.

Referenced by fillWorkers().

529  {
530  std::vector<Worker*> returnValue;
531  auto const& consumesInfo = worker->consumesInfo();
532  auto moduleLabel = worker->description()->moduleLabel();
533  using namespace productholderindexhelper;
534  for (auto const& ci : consumesInfo) {
535  if (not ci.skipCurrentProcess() and
536  (ci.process().empty() or ci.process() == processConfiguration->processName())) {
537  auto productModuleLabel = ci.label();
538  if (productModuleLabel.empty()) {
539  //this is a consumesMany request
540  for (auto const& branch : conditionalModuleBranches) {
541  //check that the conditional module has not been used
542  if (conditionalModules.find(branch.first) == conditionalModules.end()) {
543  continue;
544  }
545  if (ci.kindOfType() == edm::PRODUCT_TYPE) {
546  if (branch.second->unwrappedTypeID() != ci.type()) {
547  continue;
548  }
549  } else {
550  if (not typeIsViewCompatible(
551  ci.type(), TypeID(branch.second->wrappedType().typeInfo()), branch.second->className())) {
552  continue;
553  }
554  }
555 
556  auto condWorker = getWorker(branch.first, proc_pset, workerManager_, preg, prealloc, processConfiguration);
557  assert(condWorker);
558 
559  conditionalModules.erase(branch.first);
560 
561  auto dependents = tryToPlaceConditionalModules(condWorker,
562  conditionalModules,
563  conditionalModuleBranches,
564  aliasMap,
565  proc_pset,
566  preg,
567  prealloc,
568  processConfiguration);
569  returnValue.insert(returnValue.end(), dependents.begin(), dependents.end());
570  returnValue.push_back(condWorker);
571  }
572  } else {
573  //just a regular consumes
574  bool productFromConditionalModule = false;
575  auto itFound = conditionalModules.find(productModuleLabel);
576  if (itFound == conditionalModules.end()) {
577  //Check to see if this was an alias
578  auto findAlias = aliasMap.equal_range(productModuleLabel);
579  for (auto it = findAlias.first; it != findAlias.second; ++it) {
580  //this was previously filtered so only the conditional modules remain
581  productModuleLabel = it->second.originalModuleLabel;
582  if (it->second.instanceLabel == "*" or ci.instance() == it->second.instanceLabel) {
583  if (it->second.friendlyClassName == "*" or
584  (ci.type().friendlyClassName() == it->second.friendlyClassName)) {
585  productFromConditionalModule = true;
586  //need to check the rest of the data product info
587  break;
588  } else if (ci.kindOfType() == ELEMENT_TYPE) {
589  //consume is a View so need to do more intrusive search
590  //find matching branches in module
591  auto branches = conditionalModuleBranches.equal_range(productModuleLabel);
592  for (auto itBranch = branches.first; itBranch != branches.second; ++it) {
593  if (it->second.originalInstanceLabel == "*" or
594  itBranch->second->productInstanceName() == it->second.originalInstanceLabel) {
595  if (typeIsViewCompatible(ci.type(),
596  TypeID(itBranch->second->wrappedType().typeInfo()),
597  itBranch->second->className())) {
598  productFromConditionalModule = true;
599  break;
600  }
601  }
602  }
603  if (productFromConditionalModule) {
604  break;
605  }
606  }
607  }
608  }
609  if (productFromConditionalModule) {
610  itFound = conditionalModules.find(productModuleLabel);
611  //check that the alias-for conditional module has not been used
612  if (itFound == conditionalModules.end()) {
613  continue;
614  }
615  }
616  } else {
617  //need to check the rest of the data product info
618  auto findBranches = conditionalModuleBranches.equal_range(productModuleLabel);
619  for (auto itBranch = findBranches.first; itBranch != findBranches.second; ++itBranch) {
620  if (itBranch->second->productInstanceName() == ci.instance()) {
621  if (ci.kindOfType() == PRODUCT_TYPE) {
622  if (ci.type() == itBranch->second->unwrappedTypeID()) {
623  productFromConditionalModule = true;
624  break;
625  }
626  } else {
627  //this is a view
628  if (typeIsViewCompatible(ci.type(),
629  TypeID(itBranch->second->wrappedType().typeInfo()),
630  itBranch->second->className())) {
631  productFromConditionalModule = true;
632  break;
633  }
634  }
635  }
636  }
637  }
638  if (productFromConditionalModule) {
639  auto condWorker =
640  getWorker(productModuleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration);
641  assert(condWorker);
642 
643  conditionalModules.erase(itFound);
644 
645  auto dependents = tryToPlaceConditionalModules(condWorker,
646  conditionalModules,
647  conditionalModuleBranches,
648  aliasMap,
649  proc_pset,
650  preg,
651  prealloc,
652  processConfiguration);
653  returnValue.insert(returnValue.end(), dependents.begin(), dependents.end());
654  returnValue.push_back(condWorker);
655  }
656  }
657  }
658  }
659  return returnValue;
660  }
assert(be >=bs)
std::vector< Worker * > tryToPlaceConditionalModules(Worker *, std::unordered_set< std::string > &conditionalModules, std::multimap< std::string, edm::BranchDescription const *> const &conditionalModuleBranches, std::multimap< std::string, AliasInfo > const &aliasMap, ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
WorkerManager workerManager_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
bool typeIsViewCompatible(TypeID const &requestedViewType, TypeID const &wrappedtypeID, std::string const &className)

Member Data Documentation

◆ actReg_

std::shared_ptr<ActivityRegistry> edm::StreamSchedule::actReg_
private

◆ earlyDeleteBranchToCount_

std::vector<BranchToCount> edm::StreamSchedule::earlyDeleteBranchToCount_
private

Definition at line 366 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

◆ earlyDeleteHelpers_

std::vector<EarlyDeleteHelper> edm::StreamSchedule::earlyDeleteHelpers_
private

Definition at line 376 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

◆ earlyDeleteHelperToBranchIndicies_

std::vector<unsigned int> edm::StreamSchedule::earlyDeleteHelperToBranchIndicies_
private

Definition at line 373 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

◆ empty_end_paths_

std::vector<int> edm::StreamSchedule::empty_end_paths_
private

Definition at line 361 of file StreamSchedule.h.

Referenced by fillEndPath(), makePathStatusInserters(), and processOneEventAsync().

◆ empty_trig_paths_

std::vector<int> edm::StreamSchedule::empty_trig_paths_
private

Definition at line 360 of file StreamSchedule.h.

Referenced by fillTrigPath(), makePathStatusInserters(), and processOneEventAsync().

◆ end_paths_

TrigPaths edm::StreamSchedule::end_paths_
private

◆ endPathStatusInserterWorkers_

std::vector<edm::propagate_const<WorkerPtr> > edm::StreamSchedule::endPathStatusInserterWorkers_
private

Definition at line 356 of file StreamSchedule.h.

Referenced by makePathStatusInserters(), and processOneEventAsync().

◆ number_of_unscheduled_modules_

unsigned int edm::StreamSchedule::number_of_unscheduled_modules_
private

Definition at line 380 of file StreamSchedule.h.

Referenced by numberOfUnscheduledModules(), and StreamSchedule().

◆ pathStatusInserterWorkers_

std::vector<edm::propagate_const<WorkerPtr> > edm::StreamSchedule::pathStatusInserterWorkers_
private

Definition at line 355 of file StreamSchedule.h.

Referenced by makePathStatusInserters(), and processOneEventAsync().

◆ results_

edm::propagate_const<TrigResPtr> edm::StreamSchedule::results_
private

Definition at line 352 of file StreamSchedule.h.

Referenced by finishedPaths(), processOneEventAsync(), resetAll(), and results().

◆ results_inserter_

edm::propagate_const<WorkerPtr> edm::StreamSchedule::results_inserter_
private

Definition at line 354 of file StreamSchedule.h.

Referenced by finishedPaths(), and StreamSchedule().

◆ skippingEvent_

std::atomic<bool> edm::StreamSchedule::skippingEvent_
private

Definition at line 384 of file StreamSchedule.h.

Referenced by fillTrigPath(), and resetAll().

◆ streamContext_

StreamContext edm::StreamSchedule::streamContext_
private

◆ streamID_

StreamID edm::StreamSchedule::streamID_
private

◆ total_events_

int edm::StreamSchedule::total_events_
private

Definition at line 378 of file StreamSchedule.h.

Referenced by clearCounters(), processOneEventAsync(), and totalEvents().

◆ total_passed_

int edm::StreamSchedule::total_passed_
private

Definition at line 379 of file StreamSchedule.h.

Referenced by clearCounters(), finishedPaths(), and totalEventsPassed().

◆ trig_paths_

TrigPaths edm::StreamSchedule::trig_paths_
private

◆ workerManager_

WorkerManager edm::StreamSchedule::workerManager_
private