CMS 3D CMS Logo

List of all members | Classes | Public Types | Public Member Functions | Private Member Functions | Private Attributes
edm::StreamSchedule Class Reference

#include <StreamSchedule.h>

Classes

struct  AliasInfo
 
class  SendTerminationSignalIfException
 

Public Types

typedef std::vector< Worker * > AllWorkers
 
typedef std::vector< PathNonTrigPaths
 
typedef std::vector< WorkerInPathPathWorkers
 
typedef std::vector< PathTrigPaths
 
typedef std::shared_ptr< HLTGlobalStatus const > TrigResConstPtr
 
typedef std::shared_ptr< HLTGlobalStatusTrigResPtr
 
typedef std::vector< std::string > vstring
 
typedef std::shared_ptr< WorkerWorkerPtr
 
typedef std::vector< Worker * > Workers
 

Public Member Functions

AllWorkers const & allWorkers () const
 returns the collection of pointers to workers More...
 
void availablePaths (std::vector< std::string > &oLabelsToFill) const
 adds to oLabelsToFill the labels for all paths in the process More...
 
void beginStream ()
 
void clearCounters ()
 Clear all the counters in the trigger report. More...
 
StreamContext const & context () const
 
void deleteModule (std::string const &iLabel)
 Delete the module with label iLabel. More...
 
void endStream ()
 
std::vector< ModuleDescription const * > getAllModuleDescriptions () const
 
void getTriggerReport (TriggerReport &rep) const
 
void initializeEarlyDelete (ModuleRegistry &modReg, std::vector< std::string > const &branchesToDeleteEarly, edm::ProductRegistry const &preg)
 
void moduleDescriptionsInEndPath (std::string const &iEndPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
 
void moduleDescriptionsInPath (std::string const &iPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
 
void modulesInPath (std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
 adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel More...
 
unsigned int numberOfUnscheduledModules () const
 
void processOneEventAsync (WaitingTaskHolder iTask, EventTransitionInfo &, ServiceToken const &token, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
 
template<typename T >
void processOneStreamAsync (WaitingTaskHolder iTask, typename T::TransitionInfoType &transitionInfo, ServiceToken const &token, bool cleaningUpAfterException=false)
 
void replaceModule (maker::ModuleHolder *iMod, std::string const &iLabel)
 clone the type of module with label iLabel but configure with iPSet. More...
 
StreamID streamID () const
 
 StreamSchedule (std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, StreamID streamID, ProcessContext const *processContext)
 
 StreamSchedule (StreamSchedule const &)=delete
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 

Private Member Functions

ExceptionToActionTable const & actionTable () const
 returns the action table More...
 
void addToAllWorkers (Worker *w)
 
void fillEndPath (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames)
 
void fillTrigPath (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames)
 
void fillWorkers (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames)
 
void finishedPaths (std::atomic< std::exception_ptr *> &, WaitingTaskHolder, EventTransitionInfo &)
 
std::exception_ptr finishProcessOneEvent (std::exception_ptr)
 
void makePathStatusInserters (std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
 
void reportSkipped (EventPrincipal const &ep) const
 
void resetAll ()
 
void resetEarlyDelete ()
 
TrigResConstPtr results () const
 
TrigResPtrresults ()
 
std::vector< Worker * > tryToPlaceConditionalModules (Worker *, std::unordered_set< std::string > &conditionalModules, std::multimap< std::string, edm::BranchDescription const *> const &conditionalModuleBranches, std::multimap< std::string, AliasInfo > const &aliasMap, ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
 

Private Attributes

std::shared_ptr< ActivityRegistryactReg_
 
std::vector< BranchToCountearlyDeleteBranchToCount_
 
std::vector< EarlyDeleteHelperearlyDeleteHelpers_
 
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
 
std::vector< int > empty_end_paths_
 
std::vector< int > empty_trig_paths_
 
TrigPaths end_paths_
 
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
 
unsigned int number_of_unscheduled_modules_
 
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
 
edm::propagate_const< TrigResPtrresults_
 
edm::propagate_const< WorkerPtrresults_inserter_
 
std::atomic< bool > skippingEvent_
 
StreamContext streamContext_
 
StreamID streamID_
 
int total_events_
 
int total_passed_
 
TrigPaths trig_paths_
 
WorkerManager workerManager_
 

Detailed Description

Definition at line 153 of file StreamSchedule.h.

Member Typedef Documentation

◆ AllWorkers

typedef std::vector<Worker*> edm::StreamSchedule::AllWorkers

Definition at line 161 of file StreamSchedule.h.

◆ NonTrigPaths

Definition at line 157 of file StreamSchedule.h.

◆ PathWorkers

Definition at line 165 of file StreamSchedule.h.

◆ TrigPaths

typedef std::vector<Path> edm::StreamSchedule::TrigPaths

Definition at line 156 of file StreamSchedule.h.

◆ TrigResConstPtr

typedef std::shared_ptr<HLTGlobalStatus const> edm::StreamSchedule::TrigResConstPtr

Definition at line 159 of file StreamSchedule.h.

◆ TrigResPtr

Definition at line 158 of file StreamSchedule.h.

◆ vstring

typedef std::vector<std::string> edm::StreamSchedule::vstring

Definition at line 155 of file StreamSchedule.h.

◆ WorkerPtr

typedef std::shared_ptr<Worker> edm::StreamSchedule::WorkerPtr

Definition at line 160 of file StreamSchedule.h.

◆ Workers

typedef std::vector<Worker*> edm::StreamSchedule::Workers

Definition at line 163 of file StreamSchedule.h.

Constructor & Destructor Documentation

◆ StreamSchedule() [1/2]

edm::StreamSchedule::StreamSchedule ( std::shared_ptr< TriggerResultInserter inserter,
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters,
std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &  endPathStatusInserters,
std::shared_ptr< ModuleRegistry modReg,
ParameterSet proc_pset,
service::TriggerNamesService const &  tns,
PreallocationConfiguration const &  prealloc,
ProductRegistry pregistry,
BranchIDListHelper branchIDListHelper,
ExceptionToActionTable const &  actions,
std::shared_ptr< ActivityRegistry areg,
std::shared_ptr< ProcessConfiguration processConfiguration,
StreamID  streamID,
ProcessContext const *  processContext 
)

Definition at line 138 of file StreamSchedule.cc.

References actions, actReg_, addToAllWorkers(), edm::WorkerManager::addToUnscheduledWorkers(), allWorkers(), cms::cuda::assert(), end_paths_, fillEndPath(), fillTrigPath(), edm::propagate_const< T >::get(), edm::service::TriggerNamesService::getEndPaths(), edm::ParameterSet::getParameter(), edm::ParameterSet::getPSetForUpdate(), edm::service::TriggerNamesService::getTrigPaths(), label, makePathStatusInserters(), number_of_unscheduled_modules_, results(), results_inserter_, DBoxMetadataHelper::set_difference(), streamID(), trig_paths_, edm::StreamID::value(), and workerManager_.

153  : workerManager_(modReg, areg, actions),
154  actReg_(areg),
155  results_(new HLTGlobalStatus(tns.getTrigPaths().size())),
157  trig_paths_(),
158  end_paths_(),
159  total_events_(),
160  total_passed_(),
163  streamContext_(streamID_, processContext),
164  skippingEvent_(false) {
165  bool hasPath = false;
166  std::vector<std::string> const& pathNames = tns.getTrigPaths();
167  std::vector<std::string> const& endPathNames = tns.getEndPaths();
168 
169  int trig_bitpos = 0;
170  trig_paths_.reserve(pathNames.size());
171  for (auto const& trig_name : pathNames) {
172  fillTrigPath(proc_pset, preg, &prealloc, processConfiguration, trig_bitpos, trig_name, results(), endPathNames);
173  ++trig_bitpos;
174  hasPath = true;
175  }
176 
177  if (hasPath) {
178  // the results inserter stands alone
179  inserter->setTrigResultForStream(streamID.value(), results());
180 
181  results_inserter_ = makeInserter(actions, actReg_, inserter);
183  }
184 
185  // fill normal endpaths
186  int bitpos = 0;
187  end_paths_.reserve(endPathNames.size());
188  for (auto const& end_path_name : endPathNames) {
189  fillEndPath(proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name, endPathNames);
190  ++bitpos;
191  }
192 
193  makePathStatusInserters(pathStatusInserters, endPathStatusInserters, actions);
194 
195  //See if all modules were used
196  std::set<std::string> usedWorkerLabels;
197  for (auto const& worker : allWorkers()) {
198  usedWorkerLabels.insert(worker->description()->moduleLabel());
199  }
200  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
201  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
202  std::vector<std::string> unusedLabels;
203  set_difference(modulesInConfigSet.begin(),
204  modulesInConfigSet.end(),
205  usedWorkerLabels.begin(),
206  usedWorkerLabels.end(),
207  back_inserter(unusedLabels));
208  std::set<std::string> unscheduledLabels;
209  std::vector<std::string> shouldBeUsedLabels;
210  if (!unusedLabels.empty()) {
211  //Need to
212  // 1) create worker
213  // 2) if it is a WorkerT<EDProducer>, add it to our list
214  // 3) hand list to our delayed reader
215  for (auto const& label : unusedLabels) {
216  bool isTracked;
217  ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
218  assert(isTracked);
219  assert(modulePSet != nullptr);
221  *modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
222  }
223  if (!shouldBeUsedLabels.empty()) {
224  std::ostringstream unusedStream;
225  unusedStream << "'" << shouldBeUsedLabels.front() << "'";
226  for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
227  itLabelEnd = shouldBeUsedLabels.end();
228  itLabel != itLabelEnd;
229  ++itLabel) {
230  unusedStream << ",'" << *itLabel << "'";
231  }
232  LogInfo("path") << "The following module labels are not assigned to any path:\n" << unusedStream.str() << "\n";
233  }
234  }
235  number_of_unscheduled_modules_ = unscheduledLabels.size();
236  } // StreamSchedule::StreamSchedule
roAction_t actions[nactions]
Definition: GenABIO.cc:181
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
void addToAllWorkers(Worker *w)
edm::propagate_const< WorkerPtr > results_inserter_
assert(be >=bs)
StreamID streamID() const
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames)
constexpr element_type const * get() const
char const * label
TrigResConstPtr results() const
StreamContext streamContext_
Log< level::Info, false > LogInfo
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames)
edm::propagate_const< TrigResPtr > results_
std::atomic< bool > skippingEvent_
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
unsigned int value() const
Definition: StreamID.h:43
std::vector< std::string > set_difference(std::vector< std::string > const &v1, std::vector< std::string > const &v2)

◆ StreamSchedule() [2/2]

edm::StreamSchedule::StreamSchedule ( StreamSchedule const &  )
delete

Member Function Documentation

◆ actionTable()

ExceptionToActionTable const& edm::StreamSchedule::actionTable ( ) const
inlineprivate

returns the action table

Definition at line 282 of file StreamSchedule.h.

References edm::WorkerManager::actionTable(), and workerManager_.

Referenced by fillEndPath(), fillTrigPath(), and finishedPaths().

282 { return workerManager_.actionTable(); }
WorkerManager workerManager_
ExceptionToActionTable const & actionTable() const
Definition: WorkerManager.h:89

◆ addToAllWorkers()

void edm::StreamSchedule::addToAllWorkers ( Worker w)
private

◆ allWorkers()

AllWorkers const& edm::StreamSchedule::allWorkers ( ) const
inline

returns the collection of pointers to workers

Definition at line 254 of file StreamSchedule.h.

References edm::WorkerManager::allWorkers(), and workerManager_.

Referenced by clearCounters(), getAllModuleDescriptions(), getTriggerReport(), initializeEarlyDelete(), replaceModule(), and StreamSchedule().

254 { return workerManager_.allWorkers(); }
WorkerManager workerManager_
AllWorkers const & allWorkers() const
Definition: WorkerManager.h:85

◆ availablePaths()

void edm::StreamSchedule::availablePaths ( std::vector< std::string > &  oLabelsToFill) const

adds to oLabelsToFill the labels for all paths in the process

Definition at line 982 of file StreamSchedule.cc.

References edm::Path::name(), HcalDetIdTransform::transform(), and trig_paths_.

982  {
983  oLabelsToFill.reserve(trig_paths_.size());
984  std::transform(trig_paths_.begin(),
985  trig_paths_.end(),
986  std::back_inserter(oLabelsToFill),
987  std::bind(&Path::name, std::placeholders::_1));
988  }
std::string const & name() const
Definition: Path.h:74
unsigned transform(const HcalDetId &id, unsigned transformCode)

◆ beginStream()

void edm::StreamSchedule::beginStream ( )

Definition at line 756 of file StreamSchedule.cc.

References edm::WorkerManager::beginStream(), streamContext_, streamID_, and workerManager_.

WorkerManager workerManager_
StreamContext streamContext_
void beginStream(StreamID iID, StreamContext &streamContext)

◆ clearCounters()

void edm::StreamSchedule::clearCounters ( )

Clear all the counters in the trigger report.

Definition at line 1114 of file StreamSchedule.cc.

References allWorkers(), edm::Path::clearCounters(), edm::Worker::clearCounters(), end_paths_, edm::for_all(), total_events_, total_passed_, and trig_paths_.

1114  {
1115  using std::placeholders::_1;
1117  for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
1118  for_all(end_paths_, std::bind(&Path::clearCounters, _1));
1119  for_all(allWorkers(), std::bind(&Worker::clearCounters, _1));
1120  }
void clearCounters()
Definition: Worker.h:222
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
void clearCounters()
Definition: Path.cc:198

◆ context()

StreamContext const& edm::StreamSchedule::context ( ) const
inline

Definition at line 258 of file StreamSchedule.h.

References streamContext_.

258 { return streamContext_; }
StreamContext streamContext_

◆ deleteModule()

void edm::StreamSchedule::deleteModule ( std::string const &  iLabel)

Delete the module with label iLabel.

Definition at line 776 of file StreamSchedule.cc.

References edm::WorkerManager::deleteModuleIfExists(), and workerManager_.

void deleteModuleIfExists(std::string const &moduleLabel)
WorkerManager workerManager_

◆ endStream()

void edm::StreamSchedule::endStream ( )

Definition at line 758 of file StreamSchedule.cc.

References edm::WorkerManager::endStream(), streamContext_, streamID_, and workerManager_.

void endStream(StreamID iID, StreamContext &streamContext)
WorkerManager workerManager_
StreamContext streamContext_

◆ fillEndPath()

void edm::StreamSchedule::fillEndPath ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
int  bitpos,
std::string const &  name,
std::vector< std::string > const &  endPathNames 
)
private

Definition at line 727 of file StreamSchedule.cc.

References actionTable(), actReg_, addToAllWorkers(), empty_end_paths_, end_paths_, fillWorkers(), edm::PathContext::kEndPath, Skims_PA_cff::name, and streamContext_.

Referenced by StreamSchedule().

733  {
734  PathWorkers tmpworkers;
735  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, true, tmpworkers, endPathNames);
736 
737  if (!tmpworkers.empty()) {
738  //EndPaths are not supposed to stop if SkipEvent type exception happens
739  end_paths_.emplace_back(bitpos,
740  name,
741  tmpworkers,
742  TrigResPtr(),
743  actionTable(),
744  actReg_,
746  nullptr,
748  } else {
749  empty_end_paths_.push_back(bitpos);
750  }
751  for (WorkerInPath const& workerInPath : tmpworkers) {
752  addToAllWorkers(workerInPath.getWorker());
753  }
754  }
ExceptionToActionTable const & actionTable() const
returns the action table
std::shared_ptr< HLTGlobalStatus > TrigResPtr
void addToAllWorkers(Worker *w)
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames)
std::shared_ptr< ActivityRegistry > actReg_
std::vector< WorkerInPath > PathWorkers
std::vector< int > empty_end_paths_
StreamContext streamContext_

◆ fillTrigPath()

void edm::StreamSchedule::fillTrigPath ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
int  bitpos,
std::string const &  name,
TrigResPtr  trptr,
std::vector< std::string > const &  endPathNames 
)
private

Definition at line 697 of file StreamSchedule.cc.

References actionTable(), actReg_, addToAllWorkers(), empty_trig_paths_, fillWorkers(), edm::PathContext::kPath, Skims_PA_cff::name, skippingEvent_, streamContext_, and trig_paths_.

Referenced by StreamSchedule().

704  {
705  PathWorkers tmpworkers;
706  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, false, tmpworkers, endPathNames);
707 
708  // an empty path will cause an extra bit that is not used
709  if (!tmpworkers.empty()) {
710  trig_paths_.emplace_back(bitpos,
711  name,
712  tmpworkers,
713  trptr,
714  actionTable(),
715  actReg_,
719  } else {
720  empty_trig_paths_.push_back(bitpos);
721  }
722  for (WorkerInPath const& workerInPath : tmpworkers) {
723  addToAllWorkers(workerInPath.getWorker());
724  }
725  }
ExceptionToActionTable const & actionTable() const
returns the action table
std::vector< int > empty_trig_paths_
void addToAllWorkers(Worker *w)
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames)
std::shared_ptr< ActivityRegistry > actReg_
std::vector< WorkerInPath > PathWorkers
StreamContext streamContext_
std::atomic< bool > skippingEvent_

◆ fillWorkers()

void edm::StreamSchedule::fillWorkers ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
std::string const &  name,
bool  ignoreFilters,
PathWorkers out,
std::vector< std::string > const &  endPathNames 
)
private

Definition at line 540 of file StreamSchedule.cc.

References SiStripOfflineCRack_cfg::alias, edm::errors::Configuration, edm::Worker::description(), Exception, spr::find(), edm::ParameterSet::getParameter(), edm::ParameterSet::getUntrackedParameter(), edm::getWorker(), edm::WorkerInPath::Ignore, info(), instance, edm::Worker::kFilter, mod(), callgraph::module, HerwigMaxPtPartonFilter_cfi::moduleLabel, edm::ModuleDescription::moduleName(), edm::Worker::moduleType(), Skims_PA_cff::name, edm::WorkerInPath::Normal, or, MillePedeFileConverter_cfg::out, hltMonBTagIPClient_cfi::pathName, edm::detail::processEDAliases(), dumpMFGeometry_cfg::prod, edm::ProductRegistry::productList(), FastTimerService_cff::range, edm::search_all(), AlCaHLTBitMon_QueryRunRegistry::string, tryToPlaceConditionalModules(), edm::WorkerInPath::Veto, funct::void, and workerManager_.

Referenced by fillEndPath(), and fillTrigPath().

547  {
548  vstring modnames = proc_pset.getParameter<vstring>(pathName);
549  PathWorkers tmpworkers;
550 
551  //Pull out ConditionalTask modules
552  auto itCondBegin = std::find(modnames.begin(), modnames.end(), "#");
553 
554  std::unordered_set<std::string> conditionalmods;
555  //An EDAlias may be redirecting to a module on a ConditionalTask
556  std::multimap<std::string, AliasInfo> aliasMap;
557  std::multimap<std::string, edm::BranchDescription const*> conditionalModsBranches;
558  std::unordered_map<std::string, unsigned int> conditionalModOrder;
559  if (itCondBegin != modnames.end()) {
560  for (auto it = itCondBegin + 1; it != modnames.begin() + modnames.size() - 1; ++it) {
561  // ordering needs to skip the # token in the path list
562  conditionalModOrder.emplace(*it, it - modnames.begin() - 1);
563  }
564  //the last entry should be ignored since it is required to be "@"
565  conditionalmods = std::unordered_set<std::string>(
566  std::make_move_iterator(itCondBegin + 1), std::make_move_iterator(modnames.begin() + modnames.size() - 1));
567 
568  for (auto const& cond : conditionalmods) {
569  //force the creation of the conditional modules so alias check can work
570  (void)getWorker(cond, proc_pset, workerManager_, preg, prealloc, processConfiguration);
571  }
572  //find aliases
573  {
574  auto aliases = proc_pset.getParameter<std::vector<std::string>>("@all_aliases");
575  std::string const star("*");
576  for (auto const& alias : aliases) {
577  auto info = proc_pset.getParameter<edm::ParameterSet>(alias);
578  auto aliasedToModuleLabels = info.getParameterNames();
579  for (auto const& mod : aliasedToModuleLabels) {
580  if (not mod.empty() and mod[0] != '@' and conditionalmods.find(mod) != conditionalmods.end()) {
581  auto aliasVPSet = info.getParameter<std::vector<edm::ParameterSet>>(mod);
582  for (auto const& aliasPSet : aliasVPSet) {
583  std::string type = star;
584  std::string instance = star;
585  std::string originalInstance = star;
586  if (aliasPSet.exists("type")) {
587  type = aliasPSet.getParameter<std::string>("type");
588  }
589  if (aliasPSet.exists("toProductInstance")) {
590  instance = aliasPSet.getParameter<std::string>("toProductInstance");
591  }
592  if (aliasPSet.exists("fromProductInstance")) {
593  originalInstance = aliasPSet.getParameter<std::string>("fromProductInstance");
594  }
595 
596  aliasMap.emplace(alias, AliasInfo{type, instance, originalInstance, mod});
597  }
598  }
599  }
600  }
601  }
602  //find SwitchProducers whose cases are aliases
603  {
604  auto const& all_modules = proc_pset.getParameter<std::vector<std::string>>("@all_modules");
605  std::vector<std::string> switchEDAliases;
606  for (auto const& module : all_modules) {
607  auto const& mod_pset = proc_pset.getParameter<edm::ParameterSet>(module);
608  if (mod_pset.getParameter<std::string>("@module_type") == "SwitchProducer") {
609  auto const& all_cases = mod_pset.getParameter<std::vector<std::string>>("@all_cases");
610  for (auto const& case_label : all_cases) {
611  auto range = aliasMap.equal_range(case_label);
612  if (range.first != range.second) {
613  switchEDAliases.push_back(case_label);
614  }
615  }
616  }
617  }
619  switchEDAliases, conditionalmods, proc_pset, processConfiguration->processName(), preg);
620  }
621  {
622  //find branches created by the conditional modules
623  for (auto const& prod : preg.productList()) {
624  if (conditionalmods.find(prod.first.moduleLabel()) != conditionalmods.end()) {
625  conditionalModsBranches.emplace(prod.first.moduleLabel(), &prod.second);
626  }
627  }
628  }
629  }
630  modnames.erase(itCondBegin, modnames.end());
631 
632  unsigned int placeInPath = 0;
633  for (auto const& name : modnames) {
634  //Modules except EDFilters are set to run concurrently by default
635  bool doNotRunConcurrently = false;
637  if (name[0] == '!') {
638  filterAction = WorkerInPath::Veto;
639  } else if (name[0] == '-' or name[0] == '+') {
640  filterAction = WorkerInPath::Ignore;
641  }
642  if (name[0] == '|' or name[0] == '+') {
643  //cms.wait was specified so do not run concurrently
644  doNotRunConcurrently = true;
645  }
646 
648  if (filterAction != WorkerInPath::Normal or name[0] == '|') {
649  moduleLabel.erase(0, 1);
650  }
651 
652  Worker* worker = getWorker(moduleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration);
653  if (worker == nullptr) {
654  std::string pathType("endpath");
655  if (!search_all(endPathNames, pathName)) {
656  pathType = std::string("path");
657  }
659  << "The unknown module label \"" << moduleLabel << "\" appears in " << pathType << " \"" << pathName
660  << "\"\n please check spelling or remove that label from the path.";
661  }
662 
663  if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType() == Worker::kFilter) {
664  // We have a filter on an end path, and the filter is not explicitly ignored.
665  // See if the filter is allowed.
666  std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
667  if (!search_all(allowed_filters, worker->description()->moduleName())) {
668  // Filter is not allowed. Ignore the result, and issue a warning.
669  filterAction = WorkerInPath::Ignore;
670  LogWarning("FilterOnEndPath") << "The EDFilter '" << worker->description()->moduleName()
671  << "' with module label '" << moduleLabel << "' appears on EndPath '"
672  << pathName << "'.\n"
673  << "The return value of the filter will be ignored.\n"
674  << "To suppress this warning, either remove the filter from the endpath,\n"
675  << "or explicitly ignore it in the configuration by using cms.ignore().\n";
676  }
677  }
678  bool runConcurrently = not doNotRunConcurrently;
679  if (runConcurrently && worker->moduleType() == Worker::kFilter and filterAction != WorkerInPath::Ignore) {
680  runConcurrently = false;
681  }
682 
683  auto condModules = tryToPlaceConditionalModules(
684  worker, conditionalmods, conditionalModsBranches, aliasMap, proc_pset, preg, prealloc, processConfiguration);
685  for (auto condMod : condModules) {
686  tmpworkers.emplace_back(
687  condMod, WorkerInPath::Ignore, conditionalModOrder[condMod->description()->moduleLabel()], true);
688  }
689 
690  tmpworkers.emplace_back(worker, filterAction, placeInPath, runConcurrently);
691  ++placeInPath;
692  }
693 
694  out.swap(tmpworkers);
695  }
T getParameter(std::string const &) const
Definition: ParameterSet.h:303
static const TGPicture * info(bool iBackgroundIsBlack)
vector< string > vstring
Definition: ExoticaDQM.cc:8
static PFTauRenderPlugin instance
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:19
std::vector< Worker * > tryToPlaceConditionalModules(Worker *, std::unordered_set< std::string > &conditionalModules, std::multimap< std::string, edm::BranchDescription const *> const &conditionalModuleBranches, std::multimap< std::string, AliasInfo > const &aliasMap, ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
TEMPL(T2) struct Divides void
Definition: Factorize.h:24
WorkerManager workerManager_
void processEDAliases(std::vector< std::string > const &aliasNamesToProcess, std::unordered_set< std::string > const &aliasModulesToProcess, ParameterSet const &proc_pset, std::string const &processName, ProductRegistry &preg)
std::vector< WorkerInPath > PathWorkers
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
static Worker * getWorker(std::string const &moduleLabel, ParameterSet &proc_pset, WorkerManager &workerManager, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:36
Definition: plugin.cc:23
Log< level::Warning, false > LogWarning
T mod(const T &a, const T &b)
Definition: ecalDccMap.h:4

◆ finishedPaths()

void edm::StreamSchedule::finishedPaths ( std::atomic< std::exception_ptr *> &  iExcept,
WaitingTaskHolder  iWait,
EventTransitionInfo info 
)
private

Definition at line 892 of file StreamSchedule.cc.

References writedatasetfile::action, actionTable(), cms::Exception::addContext(), cms::cuda::assert(), CMS_SA_ALLOW, cms::Exception::context(), edm::WaitingTaskHolder::doneWaiting(), MillePedeFileConverter_cfg::e, edm::exception_actions::FailPath, edm::ExceptionToActionTable::find(), edm::propagate_const< T >::get(), edm::exception_actions::IgnoreCompletely, info(), edm::printCmsExceptionWarning(), results_, results_inserter_, edm::exception_actions::SkipEvent, streamContext_, streamID_, and total_passed_.

Referenced by processOneEventAsync().

894  {
895  if (iExcept) {
896  // Caught exception is propagated via WaitingTaskHolder
897  CMS_SA_ALLOW try { std::rethrow_exception(*(iExcept.load())); } catch (cms::Exception& e) {
902  edm::printCmsExceptionWarning("SkipEvent", e);
903  *(iExcept.load()) = std::exception_ptr();
904  } else {
905  *(iExcept.load()) = std::current_exception();
906  }
907  } catch (...) {
908  *(iExcept.load()) = std::current_exception();
909  }
910  }
911 
912  if ((not iExcept) and results_->accept()) {
913  ++total_passed_;
914  }
915 
916  if (nullptr != results_inserter_.get()) {
917  // Caught exception is propagated to the caller
918  CMS_SA_ALLOW try {
919  //Even if there was an exception, we need to allow results inserter
920  // to run since some module may be waiting on its results.
921  ParentContext parentContext(&streamContext_);
922  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
923 
924  auto expt = results_inserter_->runModuleDirectly<Traits>(info, streamID_, parentContext, &streamContext_);
925  if (expt) {
926  std::rethrow_exception(expt);
927  }
928  } catch (cms::Exception& ex) {
929  if (not iExcept) {
930  if (ex.context().empty()) {
931  std::ostringstream ost;
932  ost << "Processing Event " << info.principal().id();
933  ex.addContext(ost.str());
934  }
935  iExcept.store(new std::exception_ptr(std::current_exception()));
936  }
937  } catch (...) {
938  if (not iExcept) {
939  iExcept.store(new std::exception_ptr(std::current_exception()));
940  }
941  }
942  }
943  std::exception_ptr ptr;
944  if (iExcept) {
945  ptr = *iExcept.load();
946  }
947  iWait.doneWaiting(ptr);
948  }
ExceptionToActionTable const & actionTable() const
returns the action table
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
edm::propagate_const< WorkerPtr > results_inserter_
assert(be >=bs)
constexpr element_type const * get() const
StreamContext streamContext_
exception_actions::ActionCodes find(const std::string &category) const
edm::propagate_const< TrigResPtr > results_
void addContext(std::string const &context)
Definition: Exception.cc:165
std::list< std::string > const & context() const
Definition: Exception.cc:147
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)

◆ finishProcessOneEvent()

std::exception_ptr edm::StreamSchedule::finishProcessOneEvent ( std::exception_ptr  iExcept)
private

Definition at line 950 of file StreamSchedule.cc.

References actReg_, edm::addContextAndPrintException(), CMS_SA_ALLOW, cms::Exception::context(), edm::ExceptionFromThisContext, resetEarlyDelete(), streamContext_, and edm::convertException::wrap().

Referenced by processOneEventAsync().

950  {
951  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
952 
953  if (iExcept) {
954  //add context information to the exception and print message
955  try {
956  convertException::wrap([&]() { std::rethrow_exception(iExcept); });
957  } catch (cms::Exception& ex) {
958  bool const cleaningUpAfterException = false;
959  if (ex.context().empty()) {
960  addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
961  } else {
962  addContextAndPrintException("", ex, cleaningUpAfterException);
963  }
964  iExcept = std::current_exception();
965  }
966 
967  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
968  }
969  // Caught exception is propagated to the caller
970  CMS_SA_ALLOW try { Traits::postScheduleSignal(actReg_.get(), &streamContext_); } catch (...) {
971  if (not iExcept) {
972  iExcept = std::current_exception();
973  }
974  }
975  if (not iExcept) {
977  }
978 
979  return iExcept;
980  }
#define CMS_SA_ALLOW
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
std::shared_ptr< ActivityRegistry > actReg_
StreamContext streamContext_
auto wrap(F iFunc) -> decltype(iFunc())
std::list< std::string > const & context() const
Definition: Exception.cc:147

◆ getAllModuleDescriptions()

std::vector< ModuleDescription const * > edm::StreamSchedule::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this StreamSchedule. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 778 of file StreamSchedule.cc.

References allWorkers(), AlCaHLTBitMon_ParallelJobs::p, mps_fire::result, and findQualityFiles::size.

778  {
779  std::vector<ModuleDescription const*> result;
780  result.reserve(allWorkers().size());
781 
782  for (auto const& worker : allWorkers()) {
783  ModuleDescription const* p = worker->description();
784  result.push_back(p);
785  }
786  return result;
787  }
size
Write out results.
AllWorkers const & allWorkers() const
returns the collection of pointers to workers

◆ getTriggerReport()

void edm::StreamSchedule::getTriggerReport ( TriggerReport rep) const

Return the trigger report information on paths, modules-in-path, modules-in-endpath, and modules.

Definition at line 1104 of file StreamSchedule.cc.

References allWorkers(), end_paths_, edm::fillPathSummary(), edm::fillWorkerSummary(), cuy::rep, totalEvents(), totalEventsFailed(), totalEventsPassed(), and trig_paths_.

1104  {
1105  rep.eventSummary.totalEvents += totalEvents();
1106  rep.eventSummary.totalEventsPassed += totalEventsPassed();
1107  rep.eventSummary.totalEventsFailed += totalEventsFailed();
1108 
1109  fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
1110  fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
1111  fill_summary(allWorkers(), rep.workerSummaries, &fillWorkerSummary);
1112  }
int totalEventsPassed() const
int totalEventsFailed() const
static void fillPathSummary(Path const &path, PathSummary &sum)
rep
Definition: cuy.py:1189
int totalEvents() const
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
AllWorkers const & allWorkers() const
returns the collection of pointers to workers

◆ initializeEarlyDelete()

void edm::StreamSchedule::initializeEarlyDelete ( ModuleRegistry modReg,
std::vector< std::string > const &  branchesToDeleteEarly,
edm::ProductRegistry const &  preg 
)

Definition at line 238 of file StreamSchedule.cc.

References allWorkers(), MicroEventContent_cff::branch, edm::maker::ModuleHolder::createOutputModuleCommunicator(), dumpMFGeometry_cfg::delta, submitPVResolutionJobs::desc, earlyDeleteBranchToCount_, earlyDeleteHelpers_, earlyDeleteHelperToBranchIndicies_, end_paths_, edm::ModuleRegistry::forAllModuleHolders(), newFWLiteAna::found, edm::pset::Registry::getMapped(), edm::InEvent, edm::pset::Registry::instance(), B2GTnPMonitor_cfi::item, cmsLHEtoEOSManager::l, dqmiodumpmetadata::n, AlCaHLTBitMon_ParallelJobs::p, muonDTDigis_cfi::pset, resetEarlyDelete(), AlCaHLTBitMon_QueryRunRegistry::string, groupFilesInBlocks::temp, trig_paths_, mitigatedMETSequence_cff::U, and w().

240  {
241  // setup the list with those products actually registered for this job
242  std::multimap<std::string, Worker*> branchToReadingWorker;
243  initializeBranchToReadingWorker(branchesToDeleteEarly, preg, branchToReadingWorker);
244 
245  const std::vector<std::string> kEmpty;
246  std::map<Worker*, unsigned int> reserveSizeForWorker;
247  unsigned int upperLimitOnReadingWorker = 0;
248  unsigned int upperLimitOnIndicies = 0;
249  unsigned int nUniqueBranchesToDelete = branchToReadingWorker.size();
250 
251  //talk with output modules first
252  modReg.forAllModuleHolders([&branchToReadingWorker, &nUniqueBranchesToDelete](maker::ModuleHolder* iHolder) {
253  auto comm = iHolder->createOutputModuleCommunicator();
254  if (comm) {
255  if (!branchToReadingWorker.empty()) {
256  //If an OutputModule needs a product, we can't delete it early
257  // so we should remove it from our list
258  SelectedProductsForBranchType const& kept = comm->keptProducts();
259  for (auto const& item : kept[InEvent]) {
260  BranchDescription const& desc = *item.first;
261  auto found = branchToReadingWorker.equal_range(desc.branchName());
262  if (found.first != found.second) {
263  --nUniqueBranchesToDelete;
264  branchToReadingWorker.erase(found.first, found.second);
265  }
266  }
267  }
268  }
269  });
270 
271  if (branchToReadingWorker.empty()) {
272  return;
273  }
274 
275  for (auto w : allWorkers()) {
276  //determine if this module could read a branch we want to delete early
277  auto pset = pset::Registry::instance()->getMapped(w->description()->parameterSetID());
278  if (nullptr != pset) {
279  auto branches = pset->getUntrackedParameter<std::vector<std::string>>("mightGet", kEmpty);
280  if (not branches.empty()) {
281  ++upperLimitOnReadingWorker;
282  }
283  for (auto const& branch : branches) {
284  auto found = branchToReadingWorker.equal_range(branch);
285  if (found.first != found.second) {
286  ++upperLimitOnIndicies;
287  ++reserveSizeForWorker[w];
288  if (nullptr == found.first->second) {
289  found.first->second = w;
290  } else {
291  branchToReadingWorker.insert(make_pair(found.first->first, w));
292  }
293  }
294  }
295  }
296  }
297  {
298  auto it = branchToReadingWorker.begin();
299  std::vector<std::string> unusedBranches;
300  while (it != branchToReadingWorker.end()) {
301  if (it->second == nullptr) {
302  unusedBranches.push_back(it->first);
303  //erasing the object invalidates the iterator so must advance it first
304  auto temp = it;
305  ++it;
306  branchToReadingWorker.erase(temp);
307  } else {
308  ++it;
309  }
310  }
311  if (not unusedBranches.empty()) {
312  LogWarning l("UnusedProductsForCanDeleteEarly");
313  l << "The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
314  " If possible, remove the producer from the job or add the product to the producer's own 'mightGet' list.";
315  for (auto const& n : unusedBranches) {
316  l << "\n " << n;
317  }
318  }
319  }
320  if (!branchToReadingWorker.empty()) {
321  earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
322  earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies, 0);
323  earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
324  std::map<const Worker*, EarlyDeleteHelper*> alreadySeenWorkers;
325  std::string lastBranchName;
326  size_t nextOpenIndex = 0;
327  unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
328  for (auto& branchAndWorker : branchToReadingWorker) {
329  if (lastBranchName != branchAndWorker.first) {
330  //have to put back the period we removed earlier in order to get the proper name
331  BranchID bid(branchAndWorker.first + ".");
332  earlyDeleteBranchToCount_.emplace_back(bid, 0U);
333  lastBranchName = branchAndWorker.first;
334  }
335  auto found = alreadySeenWorkers.find(branchAndWorker.second);
336  if (alreadySeenWorkers.end() == found) {
337  //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
338  // all the branches that might be read by this worker. However, initially we will only tell the
339  // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
340  // EarlyDeleteHelper will automatically advance its internal end pointer.
341  size_t index = nextOpenIndex;
342  size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
344  earlyDeleteHelpers_.emplace_back(beginAddress + index, beginAddress + index + 1, &earlyDeleteBranchToCount_);
345  branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
346  alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second, &(earlyDeleteHelpers_.back())));
347  nextOpenIndex += nIndices;
348  } else {
349  found->second->appendIndex(earlyDeleteBranchToCount_.size() - 1);
350  }
351  }
352 
353  //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
354  // space needed for each module
355  auto itLast = earlyDeleteHelpers_.begin();
356  for (auto it = earlyDeleteHelpers_.begin() + 1; it != earlyDeleteHelpers_.end(); ++it) {
357  if (itLast->end() != it->begin()) {
358  //figure the offset for next Worker since it hasn't been moved yet so it has the original address
359  unsigned int delta = it->begin() - itLast->end();
360  it->shiftIndexPointers(delta);
361 
363  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
364  earlyDeleteHelperToBranchIndicies_.begin() + (it->begin() - beginAddress));
365  }
366  itLast = it;
367  }
369  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
371 
372  //now tell the paths about the deleters
373  for (auto& p : trig_paths_) {
374  p.setEarlyDeleteHelpers(alreadySeenWorkers);
375  }
376  for (auto& p : end_paths_) {
377  p.setEarlyDeleteHelpers(alreadySeenWorkers);
378  }
380  }
381  }
bool getMapped(key_type const &k, value_type &result) const
Definition: Registry.cc:17
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
T w() const
std::vector< BranchToCount > earlyDeleteBranchToCount_
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
Log< level::Warning, false > LogWarning
static Registry * instance()
Definition: Registry.cc:12
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_

◆ makePathStatusInserters()

void edm::StreamSchedule::makePathStatusInserters ( std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters,
std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &  endPathStatusInserters,
ExceptionToActionTable const &  actions 
)
private

Definition at line 1143 of file StreamSchedule.cc.

References actions, actReg_, addToAllWorkers(), empty_end_paths_, empty_trig_paths_, end_paths_, endPathStatusInserterWorkers_, edm::get_underlying(), pathStatusInserterWorkers_, and trig_paths_.

Referenced by StreamSchedule().

1146  {
1147  int bitpos = 0;
1148  unsigned int indexEmpty = 0;
1149  unsigned int indexOfPath = 0;
1150  for (auto& pathStatusInserter : pathStatusInserters) {
1151  std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
1152  WorkerPtr workerPtr(
1153  new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1154  pathStatusInserterWorkers_.emplace_back(workerPtr);
1155  workerPtr->setActivityRegistry(actReg_);
1156  addToAllWorkers(workerPtr.get());
1157 
1158  // A little complexity here because a C++ Path object is not
1159  // instantiated and put into end_paths if there are no modules
1160  // on the configured path.
1161  if (indexEmpty < empty_trig_paths_.size() && bitpos == empty_trig_paths_.at(indexEmpty)) {
1162  ++indexEmpty;
1163  } else {
1164  trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(), workerPtr.get());
1165  ++indexOfPath;
1166  }
1167  ++bitpos;
1168  }
1169 
1170  bitpos = 0;
1171  indexEmpty = 0;
1172  indexOfPath = 0;
1173  for (auto& endPathStatusInserter : endPathStatusInserters) {
1174  std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
1175  WorkerPtr workerPtr(
1176  new edm::WorkerT<EndPathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1177  endPathStatusInserterWorkers_.emplace_back(workerPtr);
1178  workerPtr->setActivityRegistry(actReg_);
1179  addToAllWorkers(workerPtr.get());
1180 
1181  // A little complexity here because a C++ Path object is not
1182  // instantiated and put into end_paths if there are no modules
1183  // on the configured path.
1184  if (indexEmpty < empty_end_paths_.size() && bitpos == empty_end_paths_.at(indexEmpty)) {
1185  ++indexEmpty;
1186  } else {
1187  end_paths_.at(indexOfPath).setPathStatusInserter(nullptr, workerPtr.get());
1188  ++indexOfPath;
1189  }
1190  ++bitpos;
1191  }
1192  }
std::vector< int > empty_trig_paths_
roAction_t actions[nactions]
Definition: GenABIO.cc:181
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void addToAllWorkers(Worker *w)
std::shared_ptr< Worker > WorkerPtr
std::shared_ptr< ActivityRegistry > actReg_
std::vector< int > empty_end_paths_
constexpr T & get_underlying(propagate_const< T > &)
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_

◆ moduleDescriptionsInEndPath()

void edm::StreamSchedule::moduleDescriptionsInEndPath ( std::string const &  iEndPathLabel,
std::vector< ModuleDescription const *> &  descriptions,
unsigned int  hint 
) const

Definition at line 1032 of file StreamSchedule.cc.

References end_paths_, newFWLiteAna::found, mps_fire::i, and edm::Path::name().

1034  {
1035  descriptions.clear();
1036  bool found = false;
1037  TrigPaths::const_iterator itFound;
1038 
1039  if (hint < end_paths_.size()) {
1040  itFound = end_paths_.begin() + hint;
1041  if (itFound->name() == iEndPathLabel)
1042  found = true;
1043  }
1044  if (!found) {
1045  // if the hint did not work, do it the slow way
1046  itFound = std::find_if(
1047  end_paths_.begin(),
1048  end_paths_.end(),
1049  std::bind(std::equal_to<std::string>(), iEndPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1050  if (itFound != end_paths_.end())
1051  found = true;
1052  }
1053  if (found) {
1054  descriptions.reserve(itFound->size());
1055  for (size_t i = 0; i < itFound->size(); ++i) {
1056  descriptions.push_back(itFound->getWorker(i)->description());
1057  }
1058  }
1059  }
std::string const & name() const
Definition: Path.h:74

◆ moduleDescriptionsInPath()

void edm::StreamSchedule::moduleDescriptionsInPath ( std::string const &  iPathLabel,
std::vector< ModuleDescription const *> &  descriptions,
unsigned int  hint 
) const

Definition at line 1003 of file StreamSchedule.cc.

References newFWLiteAna::found, mps_fire::i, edm::Path::name(), and trig_paths_.

1005  {
1006  descriptions.clear();
1007  bool found = false;
1008  TrigPaths::const_iterator itFound;
1009 
1010  if (hint < trig_paths_.size()) {
1011  itFound = trig_paths_.begin() + hint;
1012  if (itFound->name() == iPathLabel)
1013  found = true;
1014  }
1015  if (!found) {
1016  // if the hint did not work, do it the slow way
1017  itFound = std::find_if(
1018  trig_paths_.begin(),
1019  trig_paths_.end(),
1020  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1021  if (itFound != trig_paths_.end())
1022  found = true;
1023  }
1024  if (found) {
1025  descriptions.reserve(itFound->size());
1026  for (size_t i = 0; i < itFound->size(); ++i) {
1027  descriptions.push_back(itFound->getWorker(i)->description());
1028  }
1029  }
1030  }
std::string const & name() const
Definition: Path.h:74

◆ modulesInPath()

void edm::StreamSchedule::modulesInPath ( std::string const &  iPathLabel,
std::vector< std::string > &  oLabelsToFill 
) const

adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel

Definition at line 990 of file StreamSchedule.cc.

References mps_fire::i, edm::Path::name(), and trig_paths_.

990  {
991  TrigPaths::const_iterator itFound = std::find_if(
992  trig_paths_.begin(),
993  trig_paths_.end(),
994  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
995  if (itFound != trig_paths_.end()) {
996  oLabelsToFill.reserve(itFound->size());
997  for (size_t i = 0; i < itFound->size(); ++i) {
998  oLabelsToFill.push_back(itFound->getWorker(i)->description()->moduleLabel());
999  }
1000  }
1001  }
std::string const & name() const
Definition: Path.h:74

◆ numberOfUnscheduledModules()

unsigned int edm::StreamSchedule::numberOfUnscheduledModules ( ) const
inline

Definition at line 256 of file StreamSchedule.h.

References number_of_unscheduled_modules_.

unsigned int number_of_unscheduled_modules_

◆ processOneEventAsync()

void edm::StreamSchedule::processOneEventAsync ( WaitingTaskHolder  iTask,
EventTransitionInfo info,
ServiceToken const &  token,
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters 
)

Definition at line 789 of file StreamSchedule.cc.

References actReg_, CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), empty_end_paths_, empty_trig_paths_, end_paths_, endPathStatusInserterWorkers_, SiStripBadComponentsDQMServiceTemplate_cfg::ep, finishedPaths(), finishProcessOneEvent(), edm::WaitingTaskHolder::group(), info(), edm::ServiceWeakToken::lock(), edm::make_waiting_task(), eostools::move(), edm::hlt::Pass, pathStatusInserterWorkers_, edm::WorkerManager::processAccumulatorsAsync(), resetAll(), results_, edm::WorkerManager::setupOnDemandSystem(), edm::WorkerManager::setupResolvers(), streamContext_, streamID_, total_events_, trig_paths_, and workerManager_.

793  {
794  EventPrincipal& ep = info.principal();
795 
796  // Caught exception is propagated via WaitingTaskHolder
797  CMS_SA_ALLOW try {
798  this->resetAll();
799 
800  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
801 
802  Traits::setStreamContext(streamContext_, ep);
803  //a service may want to communicate with another service
804  ServiceRegistry::Operate guard(serviceToken);
805  Traits::preScheduleSignal(actReg_.get(), &streamContext_);
806 
807  HLTPathStatus hltPathStatus(hlt::Pass, 0);
808  for (int empty_trig_path : empty_trig_paths_) {
809  results_->at(empty_trig_path) = hltPathStatus;
810  pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
811  std::exception_ptr except = pathStatusInserterWorkers_[empty_trig_path]
812  ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
813  info, streamID_, ParentContext(&streamContext_), &streamContext_);
814  if (except) {
815  iTask.doneWaiting(except);
816  return;
817  }
818  }
819  for (int empty_end_path : empty_end_paths_) {
820  std::exception_ptr except = endPathStatusInserterWorkers_[empty_end_path]
821  ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
822  info, streamID_, ParentContext(&streamContext_), &streamContext_);
823  if (except) {
824  iTask.doneWaiting(except);
825  return;
826  }
827  }
828 
831 
832  ++total_events_;
833 
834  //use to give priorities on an error to ones from Paths
835  auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(nullptr);
836  auto pathErrorPtr = pathErrorHolder.get();
837  ServiceWeakToken weakToken = serviceToken;
838  auto allPathsDone = make_waiting_task(
839  [iTask, this, weakToken, pathError = std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable {
840  ServiceRegistry::Operate operate(weakToken.lock());
841 
842  std::exception_ptr ptr;
843  if (pathError->load()) {
844  ptr = *pathError->load();
845  delete pathError->load();
846  }
847  if ((not ptr) and iPtr) {
848  ptr = *iPtr;
849  }
850  iTask.doneWaiting(finishProcessOneEvent(ptr));
851  });
852  //The holder guarantees that if the paths finish before the loop ends
853  // that we do not start too soon. It also guarantees that the task will
854  // run under that condition.
855  WaitingTaskHolder allPathsHolder(*iTask.group(), allPathsDone);
856 
857  auto pathsDone = make_waiting_task([allPathsHolder, pathErrorPtr, transitionInfo = info, this, weakToken](
858  std::exception_ptr const* iPtr) mutable {
859  ServiceRegistry::Operate operate(weakToken.lock());
860 
861  if (iPtr) {
862  //this is used to prioritize this error over one
863  // that happens in EndPath or Accumulate
864  pathErrorPtr->store(new std::exception_ptr(*iPtr));
865  }
866  finishedPaths(*pathErrorPtr, std::move(allPathsHolder), transitionInfo);
867  });
868 
869  //The holder guarantees that if the paths finish before the loop ends
870  // that we do not start too soon. It also guarantees that the task will
871  // run under that condition.
872  WaitingTaskHolder taskHolder(*iTask.group(), pathsDone);
873 
874  //start end paths first so on single threaded the paths will run first
875  WaitingTaskHolder hAllPathsDone(*iTask.group(), allPathsDone);
876  for (auto it = end_paths_.rbegin(), itEnd = end_paths_.rend(); it != itEnd; ++it) {
877  it->processOneOccurrenceAsync(hAllPathsDone, info, serviceToken, streamID_, &streamContext_);
878  }
879 
880  for (auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend(); it != itEnd; ++it) {
881  it->processOneOccurrenceAsync(taskHolder, info, serviceToken, streamID_, &streamContext_);
882  }
883 
884  ParentContext parentContext(&streamContext_);
885  workerManager_.processAccumulatorsAsync<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
886  hAllPathsDone, info, serviceToken, streamID_, parentContext, &streamContext_);
887  } catch (...) {
888  iTask.doneWaiting(std::current_exception());
889  }
890  }
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
std::vector< int > empty_trig_paths_
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void processAccumulatorsAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
void setupOnDemandSystem(EventTransitionInfo const &)
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
std::vector< int > empty_end_paths_
accept
Definition: HLTenums.h:18
StreamContext streamContext_
void finishedPaths(std::atomic< std::exception_ptr *> &, WaitingTaskHolder, EventTransitionInfo &)
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:101
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
edm::propagate_const< TrigResPtr > results_
void setupResolvers(Principal &principal)
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
def move(src, dest)
Definition: eostools.py:511

◆ processOneStreamAsync()

template<typename T >
void edm::StreamSchedule::processOneStreamAsync ( WaitingTaskHolder  iTask,
typename T::TransitionInfoType &  transitionInfo,
ServiceToken const &  token,
bool  cleaningUpAfterException = false 
)

Definition at line 387 of file StreamSchedule.h.

References actReg_, edm::addContextAndPrintException(), CMS_SA_ALLOW, cms::Exception::context(), edm::WaitingTaskHolder::doneWaiting(), end_paths_, edm::ExceptionFromThisContext, edm::WaitingTaskHolder::group(), h, triggerObjects_cff::id, info(), edm::ServiceWeakToken::lock(), edm::make_functor_task(), edm::make_waiting_task(), AlCaHLTBitMon_ParallelJobs::p, edm::WorkerManager::processOneOccurrenceAsync(), edm::WorkerManager::resetAll(), alignCSCRings::s, streamContext_, streamID_, TrackValidation_cff::task, unpackBuffers-CaloStage2::token, trig_paths_, edm::StreamID::value(), workerManager_, and edm::convertException::wrap().

390  {
391  auto const& principal = transitionInfo.principal();
392  T::setStreamContext(streamContext_, principal);
393 
394  auto id = principal.id();
395  ServiceWeakToken weakToken = token;
396  auto doneTask = make_waiting_task(
397  [this, iHolder, id, cleaningUpAfterException, weakToken](std::exception_ptr const* iPtr) mutable {
398  std::exception_ptr excpt;
399  if (iPtr) {
400  excpt = *iPtr;
401  //add context information to the exception and print message
402  try {
403  convertException::wrap([&]() { std::rethrow_exception(excpt); });
404  } catch (cms::Exception& ex) {
405  //TODO: should add the transition type info
406  std::ostringstream ost;
407  if (ex.context().empty()) {
408  ost << "Processing " << T::transitionName() << " " << id;
409  }
410  ServiceRegistry::Operate op(weakToken.lock());
411  addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
412  excpt = std::current_exception();
413  }
414 
415  ServiceRegistry::Operate op(weakToken.lock());
416  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
417  }
418  // Caught exception is propagated via WaitingTaskHolder
419  CMS_SA_ALLOW try {
420  ServiceRegistry::Operate op(weakToken.lock());
421  T::postScheduleSignal(actReg_.get(), &streamContext_);
422  } catch (...) {
423  if (not excpt) {
424  excpt = std::current_exception();
425  }
426  }
427  iHolder.doneWaiting(excpt);
428  });
429 
430  auto task = make_functor_task(
431  [this, h = WaitingTaskHolder(*iHolder.group(), doneTask), info = transitionInfo, weakToken]() mutable {
432  auto token = weakToken.lock();
434  // Caught exception is propagated via WaitingTaskHolder
435  CMS_SA_ALLOW try {
436  T::preScheduleSignal(actReg_.get(), &streamContext_);
437 
439  } catch (...) {
440  h.doneWaiting(std::current_exception());
441  return;
442  }
443 
444  for (auto& p : end_paths_) {
445  p.runAllModulesAsync<T>(h, info, token, streamID_, &streamContext_);
446  }
447 
448  for (auto& p : trig_paths_) {
449  p.runAllModulesAsync<T>(h, info, token, streamID_, &streamContext_);
450  }
451 
453  });
454 
455  if (streamID_.value() == 0) {
456  //Enqueueing will start another thread if there is only
457  // one thread in the job. Having stream == 0 use spawn
458  // avoids starting up another thread when there is only one stream.
459  iHolder.group()->run([task]() {
460  TaskSentry s{task};
461  task->execute();
462  });
463  } else {
464  oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
465  arena.enqueue([task]() {
466  TaskSentry s{task};
467  task->execute();
468  });
469  }
470  }
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
void processOneOccurrenceAsync(WaitingTaskHolder, typename T::TransitionInfoType &, ServiceToken const &, StreamID, typename T::Context const *topContext, U const *context)
StreamContext streamContext_
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:101
FunctorTask< F > * make_functor_task(F f)
Definition: FunctorTask.h:44
unsigned int value() const
Definition: StreamID.h:43
auto wrap(F iFunc) -> decltype(iFunc())
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
std::list< std::string > const & context() const
Definition: Exception.cc:147
long double T

◆ replaceModule()

void edm::StreamSchedule::replaceModule ( maker::ModuleHolder iMod,
std::string const &  iLabel 
)

clone the type of module with label iLabel but configure with iPSet.

Definition at line 760 of file StreamSchedule.cc.

References allWorkers(), newFWLiteAna::found, edm::maker::ModuleHolder::replaceModuleFor(), streamContext_, and streamID_.

760  {
761  Worker* found = nullptr;
762  for (auto const& worker : allWorkers()) {
763  if (worker->description()->moduleLabel() == iLabel) {
764  found = worker;
765  break;
766  }
767  }
768  if (nullptr == found) {
769  return;
770  }
771 
772  iMod->replaceModuleFor(found);
773  found->beginStream(streamID_, streamContext_);
774  }
StreamContext streamContext_
AllWorkers const & allWorkers() const
returns the collection of pointers to workers

◆ reportSkipped()

void edm::StreamSchedule::reportSkipped ( EventPrincipal const &  ep) const
inlineprivate

Definition at line 381 of file StreamSchedule.h.

References SiStripBadComponentsDQMServiceTemplate_cfg::ep.

381  {
382  Service<JobReport> reportSvc;
383  reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
384  }

◆ resetAll()

void edm::StreamSchedule::resetAll ( )
private

Definition at line 1122 of file StreamSchedule.cc.

References results_, and skippingEvent_.

Referenced by processOneEventAsync().

1122  {
1123  skippingEvent_ = false;
1124  results_->reset();
1125  }
edm::propagate_const< TrigResPtr > results_
std::atomic< bool > skippingEvent_

◆ resetEarlyDelete()

void edm::StreamSchedule::resetEarlyDelete ( )
private

Definition at line 1129 of file StreamSchedule.cc.

References submitPVResolutionJobs::count, earlyDeleteBranchToCount_, earlyDeleteHelpers_, and earlyDeleteHelperToBranchIndicies_.

Referenced by finishProcessOneEvent(), and initializeEarlyDelete().

1129  {
1130  //must be sure we have cleared the count first
1131  for (auto& count : earlyDeleteBranchToCount_) {
1132  count.count = 0;
1133  }
1134  //now reset based on how many helpers use that branch
1136  ++(earlyDeleteBranchToCount_[index].count);
1137  }
1138  for (auto& helper : earlyDeleteHelpers_) {
1139  helper.reset();
1140  }
1141  }
Definition: helper.py:1
std::vector< BranchToCount > earlyDeleteBranchToCount_
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_

◆ results() [1/2]

TrigResConstPtr edm::StreamSchedule::results ( ) const
inlineprivate

Definition at line 335 of file StreamSchedule.h.

References edm::get_underlying_safe(), and results_.

Referenced by StreamSchedule().

335 { return get_underlying_safe(results_); }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< TrigResPtr > results_

◆ results() [2/2]

TrigResPtr& edm::StreamSchedule::results ( )
inlineprivate

Definition at line 336 of file StreamSchedule.h.

References edm::get_underlying_safe(), and results_.

336 { return get_underlying_safe(results_); }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< TrigResPtr > results_

◆ streamID()

StreamID edm::StreamSchedule::streamID ( ) const
inline

Definition at line 199 of file StreamSchedule.h.

References streamID_.

Referenced by StreamSchedule().

199 { return streamID_; }

◆ totalEvents()

int edm::StreamSchedule::totalEvents ( ) const
inline

Return the number of events this StreamSchedule has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 226 of file StreamSchedule.h.

References total_events_.

Referenced by getTriggerReport(), and totalEventsFailed().

226 { return total_events_; }

◆ totalEventsFailed()

int edm::StreamSchedule::totalEventsFailed ( ) const
inline

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 234 of file StreamSchedule.h.

References totalEvents(), and totalEventsPassed().

Referenced by getTriggerReport().

234 { return totalEvents() - totalEventsPassed(); }
int totalEventsPassed() const
int totalEvents() const

◆ totalEventsPassed()

int edm::StreamSchedule::totalEventsPassed ( ) const
inline

Return the number of events which have been passed by one or more trigger paths.

Definition at line 230 of file StreamSchedule.h.

References total_passed_.

Referenced by getTriggerReport(), and totalEventsFailed().

230 { return total_passed_; }

◆ tryToPlaceConditionalModules()

std::vector< Worker * > edm::StreamSchedule::tryToPlaceConditionalModules ( Worker worker,
std::unordered_set< std::string > &  conditionalModules,
std::multimap< std::string, edm::BranchDescription const *> const &  conditionalModuleBranches,
std::multimap< std::string, AliasInfo > const &  aliasMap,
ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration 
)
private

Definition at line 399 of file StreamSchedule.cc.

References cms::cuda::assert(), MicroEventContent_cff::branch, edm::TypeID::className(), edm::Worker::consumesInfo(), edm::Worker::description(), edm::ELEMENT_TYPE, edm::getWorker(), HerwigMaxPtPartonFilter_cfi::moduleLabel, edm::ModuleDescription::moduleLabel(), or, edm::PRODUCT_TYPE, edm::productholderindexhelper::typeIsViewCompatible(), and workerManager_.

Referenced by fillWorkers().

407  {
408  std::vector<Worker*> returnValue;
409  auto const& consumesInfo = worker->consumesInfo();
410  auto moduleLabel = worker->description()->moduleLabel();
411  using namespace productholderindexhelper;
412  for (auto const& ci : consumesInfo) {
413  if (not ci.skipCurrentProcess() and
414  (ci.process().empty() or ci.process() == processConfiguration->processName())) {
415  auto productModuleLabel = ci.label();
416  if (productModuleLabel.empty()) {
417  //this is a consumesMany request
418  for (auto const& branch : conditionalModuleBranches) {
419  //check that the conditional module has not been used
420  if (conditionalModules.find(branch.first) == conditionalModules.end()) {
421  continue;
422  }
423  if (ci.kindOfType() == edm::PRODUCT_TYPE) {
424  if (branch.second->unwrappedTypeID() != ci.type()) {
425  continue;
426  }
427  } else {
428  if (not typeIsViewCompatible(
429  ci.type(), TypeID(branch.second->wrappedType().typeInfo()), branch.second->className())) {
430  continue;
431  }
432  }
433 
434  auto condWorker = getWorker(branch.first, proc_pset, workerManager_, preg, prealloc, processConfiguration);
435  assert(condWorker);
436 
437  conditionalModules.erase(branch.first);
438 
439  auto dependents = tryToPlaceConditionalModules(condWorker,
440  conditionalModules,
441  conditionalModuleBranches,
442  aliasMap,
443  proc_pset,
444  preg,
445  prealloc,
446  processConfiguration);
447  returnValue.insert(returnValue.end(), dependents.begin(), dependents.end());
448  returnValue.push_back(condWorker);
449  }
450  } else {
451  //just a regular consumes
452  bool productFromConditionalModule = false;
453  auto itFound = conditionalModules.find(productModuleLabel);
454  if (itFound == conditionalModules.end()) {
455  //Check to see if this was an alias
456  auto findAlias = aliasMap.equal_range(productModuleLabel);
457  for (auto it = findAlias.first; it != findAlias.second; ++it) {
458  //this was previously filtered so only the conditional modules remain
459  productModuleLabel = it->second.originalModuleLabel;
460  if (it->second.instanceLabel == "*" or ci.instance() == it->second.instanceLabel) {
461  if (it->second.friendlyClassName == "*" or
462  (ci.type().friendlyClassName() == it->second.friendlyClassName)) {
463  productFromConditionalModule = true;
464  //need to check the rest of the data product info
465  break;
466  } else if (ci.kindOfType() == ELEMENT_TYPE) {
467  //consume is a View so need to do more intrusive search
468  //find matching branches in module
469  auto branches = conditionalModuleBranches.equal_range(productModuleLabel);
470  for (auto itBranch = branches.first; itBranch != branches.second; ++it) {
471  if (it->second.originalInstanceLabel == "*" or
472  itBranch->second->productInstanceName() == it->second.originalInstanceLabel) {
473  if (typeIsViewCompatible(ci.type(),
474  TypeID(itBranch->second->wrappedType().typeInfo()),
475  itBranch->second->className())) {
476  productFromConditionalModule = true;
477  break;
478  }
479  }
480  }
481  if (productFromConditionalModule) {
482  break;
483  }
484  }
485  }
486  }
487  if (productFromConditionalModule) {
488  itFound = conditionalModules.find(productModuleLabel);
489  //check that the alias-for conditional module has not been used
490  if (itFound == conditionalModules.end()) {
491  continue;
492  }
493  }
494  } else {
495  //need to check the rest of the data product info
496  auto findBranches = conditionalModuleBranches.equal_range(productModuleLabel);
497  for (auto itBranch = findBranches.first; itBranch != findBranches.second; ++itBranch) {
498  if (itBranch->second->productInstanceName() == ci.instance()) {
499  if (ci.kindOfType() == PRODUCT_TYPE) {
500  if (ci.type() == itBranch->second->unwrappedTypeID()) {
501  productFromConditionalModule = true;
502  break;
503  }
504  } else {
505  //this is a view
506  if (typeIsViewCompatible(ci.type(),
507  TypeID(itBranch->second->wrappedType().typeInfo()),
508  itBranch->second->className())) {
509  productFromConditionalModule = true;
510  break;
511  }
512  }
513  }
514  }
515  }
516  if (productFromConditionalModule) {
517  auto condWorker =
518  getWorker(productModuleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration);
519  assert(condWorker);
520 
521  conditionalModules.erase(itFound);
522 
523  auto dependents = tryToPlaceConditionalModules(condWorker,
524  conditionalModules,
525  conditionalModuleBranches,
526  aliasMap,
527  proc_pset,
528  preg,
529  prealloc,
530  processConfiguration);
531  returnValue.insert(returnValue.end(), dependents.begin(), dependents.end());
532  returnValue.push_back(condWorker);
533  }
534  }
535  }
536  }
537  return returnValue;
538  }
assert(be >=bs)
std::vector< Worker * > tryToPlaceConditionalModules(Worker *, std::unordered_set< std::string > &conditionalModules, std::multimap< std::string, edm::BranchDescription const *> const &conditionalModuleBranches, std::multimap< std::string, AliasInfo > const &aliasMap, ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
WorkerManager workerManager_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
bool typeIsViewCompatible(TypeID const &requestedViewType, TypeID const &wrappedtypeID, std::string const &className)
static Worker * getWorker(std::string const &moduleLabel, ParameterSet &proc_pset, WorkerManager &workerManager, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)

Member Data Documentation

◆ actReg_

std::shared_ptr<ActivityRegistry> edm::StreamSchedule::actReg_
private

◆ earlyDeleteBranchToCount_

std::vector<BranchToCount> edm::StreamSchedule::earlyDeleteBranchToCount_
private

Definition at line 360 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

◆ earlyDeleteHelpers_

std::vector<EarlyDeleteHelper> edm::StreamSchedule::earlyDeleteHelpers_
private

Definition at line 370 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

◆ earlyDeleteHelperToBranchIndicies_

std::vector<unsigned int> edm::StreamSchedule::earlyDeleteHelperToBranchIndicies_
private

Definition at line 367 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

◆ empty_end_paths_

std::vector<int> edm::StreamSchedule::empty_end_paths_
private

Definition at line 355 of file StreamSchedule.h.

Referenced by fillEndPath(), makePathStatusInserters(), and processOneEventAsync().

◆ empty_trig_paths_

std::vector<int> edm::StreamSchedule::empty_trig_paths_
private

Definition at line 354 of file StreamSchedule.h.

Referenced by fillTrigPath(), makePathStatusInserters(), and processOneEventAsync().

◆ end_paths_

TrigPaths edm::StreamSchedule::end_paths_
private

◆ endPathStatusInserterWorkers_

std::vector<edm::propagate_const<WorkerPtr> > edm::StreamSchedule::endPathStatusInserterWorkers_
private

Definition at line 350 of file StreamSchedule.h.

Referenced by makePathStatusInserters(), and processOneEventAsync().

◆ number_of_unscheduled_modules_

unsigned int edm::StreamSchedule::number_of_unscheduled_modules_
private

Definition at line 374 of file StreamSchedule.h.

Referenced by numberOfUnscheduledModules(), and StreamSchedule().

◆ pathStatusInserterWorkers_

std::vector<edm::propagate_const<WorkerPtr> > edm::StreamSchedule::pathStatusInserterWorkers_
private

Definition at line 349 of file StreamSchedule.h.

Referenced by makePathStatusInserters(), and processOneEventAsync().

◆ results_

edm::propagate_const<TrigResPtr> edm::StreamSchedule::results_
private

Definition at line 346 of file StreamSchedule.h.

Referenced by finishedPaths(), processOneEventAsync(), resetAll(), and results().

◆ results_inserter_

edm::propagate_const<WorkerPtr> edm::StreamSchedule::results_inserter_
private

Definition at line 348 of file StreamSchedule.h.

Referenced by finishedPaths(), and StreamSchedule().

◆ skippingEvent_

std::atomic<bool> edm::StreamSchedule::skippingEvent_
private

Definition at line 378 of file StreamSchedule.h.

Referenced by fillTrigPath(), and resetAll().

◆ streamContext_

StreamContext edm::StreamSchedule::streamContext_
private

◆ streamID_

StreamID edm::StreamSchedule::streamID_
private

◆ total_events_

int edm::StreamSchedule::total_events_
private

Definition at line 372 of file StreamSchedule.h.

Referenced by clearCounters(), processOneEventAsync(), and totalEvents().

◆ total_passed_

int edm::StreamSchedule::total_passed_
private

Definition at line 373 of file StreamSchedule.h.

Referenced by clearCounters(), finishedPaths(), and totalEventsPassed().

◆ trig_paths_

TrigPaths edm::StreamSchedule::trig_paths_
private

◆ workerManager_

WorkerManager edm::StreamSchedule::workerManager_
private