CMS 3D CMS Logo

List of all members | Classes | Public Types | Public Member Functions | Private Member Functions | Private Attributes
edm::StreamSchedule Class Reference

#include <StreamSchedule.h>

Classes

struct  AliasInfo
 

Public Types

typedef std::vector< Worker * > AllWorkers
 
typedef std::vector< PathNonTrigPaths
 
typedef std::vector< WorkerInPathPathWorkers
 
typedef std::vector< PathTrigPaths
 
typedef std::shared_ptr< HLTGlobalStatus const > TrigResConstPtr
 
typedef std::shared_ptr< HLTGlobalStatusTrigResPtr
 
typedef std::vector< std::string > vstring
 
typedef std::shared_ptr< WorkerWorkerPtr
 
typedef std::vector< Worker * > Workers
 

Public Member Functions

AllWorkers const & allWorkersBeginEnd () const
 returns the collection of pointers to workers More...
 
AllWorkers const & allWorkersLumisAndEvents () const
 
AllWorkers const & allWorkersRuns () const
 
void availablePaths (std::vector< std::string > &oLabelsToFill) const
 adds to oLabelsToFill the labels for all paths in the process More...
 
void beginStream ()
 
void clearCounters ()
 Clear all the counters in the trigger report. More...
 
StreamContext const & context () const
 
void deleteModule (std::string const &iLabel)
 Delete the module with label iLabel. More...
 
void endStream ()
 
std::vector< ModuleDescription const * > getAllModuleDescriptions () const
 
void getTriggerReport (TriggerReport &rep) const
 
void initializeEarlyDelete (ModuleRegistry &modReg, std::vector< std::string > const &branchesToDeleteEarly, std::multimap< std::string, std::string > const &referencesToBranches, std::vector< std::string > const &modulesToSkip, edm::ProductRegistry const &preg)
 
void moduleDescriptionsInEndPath (std::string const &iEndPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
 
void moduleDescriptionsInPath (std::string const &iPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
 
void modulesInPath (std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
 adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel More...
 
unsigned int numberOfUnscheduledModules () const
 
void processOneEventAsync (WaitingTaskHolder iTask, EventTransitionInfo &, ServiceToken const &token, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
 
template<typename T >
void processOneStreamAsync (WaitingTaskHolder iTask, typename T::TransitionInfoType &transitionInfo, ServiceToken const &token, bool cleaningUpAfterException=false)
 
void replaceModule (maker::ModuleHolder *iMod, std::string const &iLabel)
 clone the type of module with label iLabel but configure with iPSet. More...
 
StreamID streamID () const
 
 StreamSchedule (std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, StreamID streamID, ProcessContext const *processContext)
 
 StreamSchedule (StreamSchedule const &)=delete
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 
AllWorkers const & unscheduledWorkersLumisAndEvents () const
 

Private Member Functions

ExceptionToActionTable const & actionTable () const
 returns the action table More...
 
void addToAllWorkers (Worker *w)
 
void fillEndPath (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
 
void fillTrigPath (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
 
void fillWorkers (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
 
void finishedPaths (std::atomic< std::exception_ptr *> &, WaitingTaskHolder, EventTransitionInfo &)
 
std::exception_ptr finishProcessOneEvent (std::exception_ptr)
 
void handleException (StreamContext const &, ServiceWeakToken const &, bool cleaningUpAfterException, std::exception_ptr &) const noexcept
 
void makePathStatusInserters (std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
 
template<typename T >
void postScheduleSignal (StreamContext const *, ServiceWeakToken const &, std::exception_ptr &) const noexcept
 
template<typename T >
void preScheduleSignal (StreamContext const *) const
 
void reportSkipped (EventPrincipal const &ep) const
 
void resetAll ()
 
void resetEarlyDelete ()
 
TrigResConstPtr results () const
 
TrigResPtrresults ()
 
std::vector< Worker * > tryToPlaceConditionalModules (Worker *, std::unordered_set< std::string > &conditionalModules, std::unordered_multimap< std::string, edm::BranchDescription const *> const &conditionalModuleBranches, std::unordered_multimap< std::string, AliasInfo > const &aliasMap, ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
 

Private Attributes

std::shared_ptr< ActivityRegistryactReg_
 
std::vector< BranchToCountearlyDeleteBranchToCount_
 
std::vector< EarlyDeleteHelperearlyDeleteHelpers_
 
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
 
std::vector< int > empty_end_paths_
 
std::vector< int > empty_trig_paths_
 
TrigPaths end_paths_
 
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
 
unsigned int number_of_unscheduled_modules_
 
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
 
edm::propagate_const< TrigResPtrresults_
 
edm::propagate_const< WorkerPtrresults_inserter_
 
StreamContext streamContext_
 
StreamID streamID_
 
int total_events_
 
int total_passed_
 
TrigPaths trig_paths_
 
WorkerManager workerManagerBeginEnd_
 
WorkerManager workerManagerLumisAndEvents_
 
WorkerManager workerManagerRuns_
 

Detailed Description

Definition at line 122 of file StreamSchedule.h.

Member Typedef Documentation

◆ AllWorkers

typedef std::vector<Worker*> edm::StreamSchedule::AllWorkers

Definition at line 130 of file StreamSchedule.h.

◆ NonTrigPaths

Definition at line 126 of file StreamSchedule.h.

◆ PathWorkers

Definition at line 134 of file StreamSchedule.h.

◆ TrigPaths

typedef std::vector<Path> edm::StreamSchedule::TrigPaths

Definition at line 125 of file StreamSchedule.h.

◆ TrigResConstPtr

typedef std::shared_ptr<HLTGlobalStatus const> edm::StreamSchedule::TrigResConstPtr

Definition at line 128 of file StreamSchedule.h.

◆ TrigResPtr

Definition at line 127 of file StreamSchedule.h.

◆ vstring

typedef std::vector<std::string> edm::StreamSchedule::vstring

Definition at line 124 of file StreamSchedule.h.

◆ WorkerPtr

typedef std::shared_ptr<Worker> edm::StreamSchedule::WorkerPtr

Definition at line 129 of file StreamSchedule.h.

◆ Workers

typedef std::vector<Worker*> edm::StreamSchedule::Workers

Definition at line 132 of file StreamSchedule.h.

Constructor & Destructor Documentation

◆ StreamSchedule() [1/2]

edm::StreamSchedule::StreamSchedule ( std::shared_ptr< TriggerResultInserter inserter,
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters,
std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &  endPathStatusInserters,
std::shared_ptr< ModuleRegistry modReg,
ParameterSet proc_pset,
service::TriggerNamesService const &  tns,
PreallocationConfiguration const &  prealloc,
ProductRegistry pregistry,
ExceptionToActionTable const &  actions,
std::shared_ptr< ActivityRegistry areg,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
StreamID  streamID,
ProcessContext const *  processContext 
)

Definition at line 349 of file StreamSchedule.cc.

References actions, actReg_, edm::WorkerManager::addToAllWorkers(), addToAllWorkers(), edm::WorkerManager::addToUnscheduledWorkers(), allWorkersLumisAndEvents(), cms::cuda::assert(), end_paths_, fillEndPath(), fillTrigPath(), edm::propagate_const< T >::get(), edm::service::TriggerNamesService::getEndPaths(), edm::ParameterSet::getParameter(), edm::ParameterSet::getPSetForUpdate(), edm::service::TriggerNamesService::getTrigPaths(), label, dqm-mbProfile::log, makePathStatusInserters(), HerwigMaxPtPartonFilter_cfi::moduleLabel, number_of_unscheduled_modules_, results(), results_inserter_, DBoxMetadataHelper::set_difference(), streamID(), AlCaHLTBitMon_QueryRunRegistry::string, trig_paths_, edm::StreamID::value(), workerManagerBeginEnd_, workerManagerLumisAndEvents_, and workerManagerRuns_.

363  : workerManagerBeginEnd_(modReg, areg, actions),
364  workerManagerRuns_(modReg, areg, actions),
365  workerManagerLumisAndEvents_(modReg, areg, actions),
366  actReg_(areg),
367  results_(new HLTGlobalStatus(tns.getTrigPaths().size())),
369  trig_paths_(),
370  end_paths_(),
371  total_events_(),
372  total_passed_(),
375  streamContext_(streamID_, processContext) {
376  bool hasPath = false;
377  std::vector<std::string> const& pathNames = tns.getTrigPaths();
378  std::vector<std::string> const& endPathNames = tns.getEndPaths();
379 
380  ConditionalTaskHelper conditionalTaskHelper(
381  proc_pset, preg, &prealloc, processConfiguration, workerManagerLumisAndEvents_, pathNames);
382  std::unordered_set<std::string> conditionalModules;
383 
384  int trig_bitpos = 0;
385  trig_paths_.reserve(pathNames.size());
386  for (auto const& trig_name : pathNames) {
387  fillTrigPath(proc_pset,
388  preg,
389  &prealloc,
390  processConfiguration,
391  trig_bitpos,
392  trig_name,
393  results(),
394  endPathNames,
395  conditionalTaskHelper,
396  conditionalModules);
397  ++trig_bitpos;
398  hasPath = true;
399  }
400 
401  if (hasPath) {
402  // the results inserter stands alone
403  inserter->setTrigResultForStream(streamID.value(), results());
404 
405  results_inserter_ = makeInserter(actions, actReg_, inserter);
407  }
408 
409  // fill normal endpaths
410  int bitpos = 0;
411  end_paths_.reserve(endPathNames.size());
412  for (auto const& end_path_name : endPathNames) {
413  fillEndPath(proc_pset,
414  preg,
415  &prealloc,
416  processConfiguration,
417  bitpos,
418  end_path_name,
419  endPathNames,
420  conditionalTaskHelper,
421  conditionalModules);
422  ++bitpos;
423  }
424 
425  makePathStatusInserters(pathStatusInserters, endPathStatusInserters, actions);
426 
427  //See if all modules were used
428  std::set<std::string> usedWorkerLabels;
429  for (auto const& worker : allWorkersLumisAndEvents()) {
430  usedWorkerLabels.insert(worker->description()->moduleLabel());
431  }
432  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
433  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
434  std::vector<std::string> unusedLabels;
435  set_difference(modulesInConfigSet.begin(),
436  modulesInConfigSet.end(),
437  usedWorkerLabels.begin(),
438  usedWorkerLabels.end(),
439  back_inserter(unusedLabels));
440  std::set<std::string> unscheduledLabels;
441  std::vector<std::string> shouldBeUsedLabels;
442  if (!unusedLabels.empty()) {
443  //Need to
444  // 1) create worker
445  // 2) if it is a WorkerT<EDProducer>, add it to our list
446  // 3) hand list to our delayed reader
447  for (auto const& label : unusedLabels) {
448  bool isTracked;
449  ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
450  assert(isTracked);
451  assert(modulePSet != nullptr);
453  *modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
454  }
455  if (!shouldBeUsedLabels.empty()) {
456  std::ostringstream unusedStream;
457  unusedStream << "'" << shouldBeUsedLabels.front() << "'";
458  for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
459  itLabelEnd = shouldBeUsedLabels.end();
460  itLabel != itLabelEnd;
461  ++itLabel) {
462  unusedStream << ",'" << *itLabel << "'";
463  }
464  LogInfo("path") << "The following module labels are not assigned to any path:\n" << unusedStream.str() << "\n";
465  }
466  }
467  number_of_unscheduled_modules_ = unscheduledLabels.size();
468 
469  // Print conditional modules that were not consumed in any of their associated Paths
470  if (streamID.value() == 0 and not conditionalModules.empty()) {
471  // Intersection of unscheduled and ConditionalTask modules gives
472  // directly the set of conditional modules that were not
473  // consumed by anything in the Paths associated to the
474  // corresponding ConditionalTask.
475  std::vector<std::string_view> labelsToPrint;
476  std::copy_if(
477  unscheduledLabels.begin(),
478  unscheduledLabels.end(),
479  std::back_inserter(labelsToPrint),
480  [&conditionalModules](auto const& lab) { return conditionalModules.find(lab) != conditionalModules.end(); });
481 
482  if (not labelsToPrint.empty()) {
483  edm::LogWarning log("NonConsumedConditionalModules");
484  log << "The following modules were part of some ConditionalTask, but were not\n"
485  << "consumed by any other module in any of the Paths to which the ConditionalTask\n"
486  << "was associated. Perhaps they should be either removed from the\n"
487  << "job, or moved to a Task to make it explicit they are unscheduled.\n";
488  for (auto const& modLabel : labelsToPrint) {
489  log.format("\n {}", modLabel);
490  }
491  }
492  }
493 
494  for (auto const& worker : allWorkersLumisAndEvents()) {
495  std::string const& moduleLabel = worker->description()->moduleLabel();
496 
497  // The new worker pointers will be null for the TriggerResultsInserter, PathStatusInserter, and
498  // EndPathStatusInserter because there are no ParameterSets for those in the configuration.
499  // We could add special code to create workers for those, but instead we skip them because they
500  // do not have beginStream, endStream, or run/lumi begin/end stream transition functions.
501 
502  Worker* workerBeginEnd =
503  getWorker(moduleLabel, proc_pset, workerManagerBeginEnd_, preg, &prealloc, processConfiguration);
504  if (workerBeginEnd) {
505  workerManagerBeginEnd_.addToAllWorkers(workerBeginEnd);
506  }
507 
508  Worker* workerRuns = getWorker(moduleLabel, proc_pset, workerManagerRuns_, preg, &prealloc, processConfiguration);
509  if (workerRuns) {
511  }
512  }
513 
514  } // StreamSchedule::StreamSchedule
roAction_t actions[nactions]
Definition: GenABIO.cc:181
void addToAllWorkers(Worker *w)
edm::propagate_const< WorkerPtr > results_inserter_
assert(be >=bs)
StreamID streamID() const
WorkerManager workerManagerBeginEnd_
unsigned int number_of_unscheduled_modules_
std::shared_ptr< ActivityRegistry > actReg_
constexpr element_type const * get() const
char const * label
WorkerManager workerManagerRuns_
TrigResConstPtr results() const
StreamContext streamContext_
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
Log< level::Info, false > LogInfo
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
edm::propagate_const< TrigResPtr > results_
AllWorkers const & allWorkersLumisAndEvents() const
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
unsigned int value() const
Definition: StreamID.h:43
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
WorkerManager workerManagerLumisAndEvents_
std::vector< std::string > set_difference(std::vector< std::string > const &v1, std::vector< std::string > const &v2)
void addToAllWorkers(Worker *w)

◆ StreamSchedule() [2/2]

edm::StreamSchedule::StreamSchedule ( StreamSchedule const &  )
delete

Member Function Documentation

◆ actionTable()

ExceptionToActionTable const& edm::StreamSchedule::actionTable ( ) const
inlineprivate

returns the action table

Definition at line 244 of file StreamSchedule.h.

References edm::WorkerManager::actionTable(), and workerManagerLumisAndEvents_.

Referenced by fillEndPath(), fillTrigPath(), and finishedPaths().

ExceptionToActionTable const & actionTable() const
Definition: WorkerManager.h:85
WorkerManager workerManagerLumisAndEvents_

◆ addToAllWorkers()

void edm::StreamSchedule::addToAllWorkers ( Worker w)
private

◆ allWorkersBeginEnd()

AllWorkers const& edm::StreamSchedule::allWorkersBeginEnd ( ) const
inline

returns the collection of pointers to workers

Definition at line 224 of file StreamSchedule.h.

References edm::WorkerManager::allWorkers(), and workerManagerBeginEnd_.

Referenced by replaceModule().

WorkerManager workerManagerBeginEnd_
AllWorkers const & allWorkers() const
Definition: WorkerManager.h:80

◆ allWorkersLumisAndEvents()

AllWorkers const& edm::StreamSchedule::allWorkersLumisAndEvents ( ) const
inline

◆ allWorkersRuns()

AllWorkers const& edm::StreamSchedule::allWorkersRuns ( ) const
inline

Definition at line 225 of file StreamSchedule.h.

References edm::WorkerManager::allWorkers(), and workerManagerRuns_.

Referenced by replaceModule().

225 { return workerManagerRuns_.allWorkers(); }
WorkerManager workerManagerRuns_
AllWorkers const & allWorkers() const
Definition: WorkerManager.h:80

◆ availablePaths()

void edm::StreamSchedule::availablePaths ( std::vector< std::string > &  oLabelsToFill) const

adds to oLabelsToFill the labels for all paths in the process

Definition at line 1228 of file StreamSchedule.cc.

References edm::Path::name(), HcalDetIdTransform::transform(), and trig_paths_.

1228  {
1229  oLabelsToFill.reserve(trig_paths_.size());
1230  std::transform(trig_paths_.begin(),
1231  trig_paths_.end(),
1232  std::back_inserter(oLabelsToFill),
1233  std::bind(&Path::name, std::placeholders::_1));
1234  }
std::string const & name() const
Definition: Path.h:66
unsigned transform(const HcalDetId &id, unsigned transformCode)

◆ beginStream()

void edm::StreamSchedule::beginStream ( )

Definition at line 985 of file StreamSchedule.cc.

References edm::WorkerManager::beginStream(), streamContext_, streamID_, and workerManagerBeginEnd_.

WorkerManager workerManagerBeginEnd_
StreamContext streamContext_
void beginStream(StreamID iID, StreamContext &streamContext)

◆ clearCounters()

void edm::StreamSchedule::clearCounters ( )

Clear all the counters in the trigger report.

Definition at line 1360 of file StreamSchedule.cc.

References allWorkersLumisAndEvents(), edm::Path::clearCounters(), edm::Worker::clearCounters(), end_paths_, edm::for_all(), total_events_, total_passed_, and trig_paths_.

1360  {
1361  using std::placeholders::_1;
1363  for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
1364  for_all(end_paths_, std::bind(&Path::clearCounters, _1));
1366  }
void clearCounters() noexcept
Definition: Worker.h:235
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
AllWorkers const & allWorkersLumisAndEvents() const
void clearCounters()
Definition: Path.cc:183

◆ context()

StreamContext const& edm::StreamSchedule::context ( ) const
inline

Definition at line 233 of file StreamSchedule.h.

References streamContext_.

233 { return streamContext_; }
StreamContext streamContext_

◆ deleteModule()

void edm::StreamSchedule::deleteModule ( std::string const &  iLabel)

Delete the module with label iLabel.

Definition at line 1013 of file StreamSchedule.cc.

References edm::WorkerManager::deleteModuleIfExists(), workerManagerBeginEnd_, workerManagerLumisAndEvents_, and workerManagerRuns_.

1013  {
1017  }
void deleteModuleIfExists(std::string const &moduleLabel)
WorkerManager workerManagerBeginEnd_
WorkerManager workerManagerRuns_
WorkerManager workerManagerLumisAndEvents_

◆ endStream()

void edm::StreamSchedule::endStream ( )

Definition at line 987 of file StreamSchedule.cc.

References edm::WorkerManager::endStream(), streamContext_, streamID_, and workerManagerBeginEnd_.

void endStream(StreamID iID, StreamContext &streamContext)
WorkerManager workerManagerBeginEnd_
StreamContext streamContext_

◆ fillEndPath()

void edm::StreamSchedule::fillEndPath ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
int  bitpos,
std::string const &  name,
std::vector< std::string > const &  endPathNames,
ConditionalTaskHelper const &  conditionalTaskHelper,
std::unordered_set< std::string > &  allConditionalModules 
)
private

Definition at line 947 of file StreamSchedule.cc.

References actionTable(), actReg_, addToAllWorkers(), empty_end_paths_, end_paths_, fillWorkers(), edm::PathContext::kEndPath, Skims_PA_cff::name, and streamContext_.

Referenced by StreamSchedule().

955  {
956  PathWorkers tmpworkers;
957  fillWorkers(proc_pset,
958  preg,
959  prealloc,
960  processConfiguration,
961  name,
962  true,
963  tmpworkers,
964  endPathNames,
965  conditionalTaskHelper,
966  allConditionalModules);
967 
968  if (!tmpworkers.empty()) {
969  end_paths_.emplace_back(bitpos,
970  name,
971  tmpworkers,
972  TrigResPtr(),
973  actionTable(),
974  actReg_,
977  } else {
978  empty_end_paths_.push_back(bitpos);
979  }
980  for (WorkerInPath const& workerInPath : tmpworkers) {
981  addToAllWorkers(workerInPath.getWorker());
982  }
983  }
ExceptionToActionTable const & actionTable() const
returns the action table
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
std::shared_ptr< HLTGlobalStatus > TrigResPtr
void addToAllWorkers(Worker *w)
std::shared_ptr< ActivityRegistry > actReg_
std::vector< WorkerInPath > PathWorkers
std::vector< int > empty_end_paths_
StreamContext streamContext_

◆ fillTrigPath()

void edm::StreamSchedule::fillTrigPath ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
int  bitpos,
std::string const &  name,
TrigResPtr  trptr,
std::vector< std::string > const &  endPathNames,
ConditionalTaskHelper const &  conditionalTaskHelper,
std::unordered_set< std::string > &  allConditionalModules 
)
private

Definition at line 913 of file StreamSchedule.cc.

References actionTable(), actReg_, addToAllWorkers(), empty_trig_paths_, fillWorkers(), edm::PathContext::kPath, Skims_PA_cff::name, streamContext_, and trig_paths_.

Referenced by StreamSchedule().

922  {
923  PathWorkers tmpworkers;
924  fillWorkers(proc_pset,
925  preg,
926  prealloc,
927  processConfiguration,
928  name,
929  false,
930  tmpworkers,
931  endPathNames,
932  conditionalTaskHelper,
933  allConditionalModules);
934 
935  // an empty path will cause an extra bit that is not used
936  if (!tmpworkers.empty()) {
937  trig_paths_.emplace_back(
938  bitpos, name, tmpworkers, trptr, actionTable(), actReg_, &streamContext_, PathContext::PathType::kPath);
939  } else {
940  empty_trig_paths_.push_back(bitpos);
941  }
942  for (WorkerInPath const& workerInPath : tmpworkers) {
943  addToAllWorkers(workerInPath.getWorker());
944  }
945  }
ExceptionToActionTable const & actionTable() const
returns the action table
std::vector< int > empty_trig_paths_
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
void addToAllWorkers(Worker *w)
std::shared_ptr< ActivityRegistry > actReg_
std::vector< WorkerInPath > PathWorkers
StreamContext streamContext_

◆ fillWorkers()

void edm::StreamSchedule::fillWorkers ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
std::string const &  name,
bool  ignoreFilters,
PathWorkers out,
std::vector< std::string > const &  endPathNames,
ConditionalTaskHelper const &  conditionalTaskHelper,
std::unordered_set< std::string > &  allConditionalModules 
)
private

Definition at line 805 of file StreamSchedule.cc.

References edm::ConditionalTaskHelper::aliasMap(), edm::ConditionalTaskHelper::conditionalModuleBranches(), edm::errors::Configuration, edm::Worker::description(), Exception, edm::ParameterSet::getParameter(), edm::ParameterSet::getUntrackedParameter(), edm::WorkerInPath::Ignore, ALPAKA_ACCELERATOR_NAMESPACE::vertexFinder::it, edm::Worker::kFilter, HerwigMaxPtPartonFilter_cfi::moduleLabel, edm::ModuleDescription::moduleName(), edm::Worker::moduleType(), Skims_PA_cff::name, edm::WorkerInPath::Normal, or, MillePedeFileConverter_cfg::out, hltMonBTagIPClient_cfi::pathName, edm::search_all(), AlCaHLTBitMon_QueryRunRegistry::string, tryToPlaceConditionalModules(), edm::WorkerInPath::Veto, and workerManagerLumisAndEvents_.

Referenced by fillEndPath(), and fillTrigPath().

814  {
815  vstring modnames = proc_pset.getParameter<vstring>(pathName);
816  PathWorkers tmpworkers;
817 
818  //Pull out ConditionalTask modules
819  auto condRange = findConditionalTaskModulesRange(modnames);
820 
821  std::unordered_set<std::string> conditionalmods;
822  //An EDAlias may be redirecting to a module on a ConditionalTask
823  std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModsBranches;
824  std::unordered_map<std::string, unsigned int> conditionalModOrder;
825  if (condRange.first != condRange.second) {
826  for (auto it = condRange.first; it != condRange.second; ++it) {
827  // ordering needs to skip the # token in the path list
828  conditionalModOrder.emplace(*it, it - modnames.begin() - 1);
829  }
830  //the last entry should be ignored since it is required to be "@"
831  conditionalmods = std::unordered_set<std::string>(std::make_move_iterator(condRange.first),
832  std::make_move_iterator(condRange.second));
833 
834  conditionalModsBranches = conditionalTaskHelper.conditionalModuleBranches(conditionalmods);
835  modnames.erase(std::prev(condRange.first), modnames.end());
836 
837  // Make a union of all conditional modules from all Paths
838  allConditionalModules.insert(conditionalmods.begin(), conditionalmods.end());
839  }
840 
841  unsigned int placeInPath = 0;
842  for (auto const& name : modnames) {
843  //Modules except EDFilters are set to run concurrently by default
844  bool doNotRunConcurrently = false;
846  if (name[0] == '!') {
847  filterAction = WorkerInPath::Veto;
848  } else if (name[0] == '-' or name[0] == '+') {
849  filterAction = WorkerInPath::Ignore;
850  }
851  if (name[0] == '|' or name[0] == '+') {
852  //cms.wait was specified so do not run concurrently
853  doNotRunConcurrently = true;
854  }
855 
857  if (filterAction != WorkerInPath::Normal or name[0] == '|') {
858  moduleLabel.erase(0, 1);
859  }
860 
861  Worker* worker =
862  getWorker(moduleLabel, proc_pset, workerManagerLumisAndEvents_, preg, prealloc, processConfiguration);
863  if (worker == nullptr) {
864  std::string pathType("endpath");
865  if (!search_all(endPathNames, pathName)) {
866  pathType = std::string("path");
867  }
869  << "The unknown module label \"" << moduleLabel << "\" appears in " << pathType << " \"" << pathName
870  << "\"\n please check spelling or remove that label from the path.";
871  }
872 
873  if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType() == Worker::kFilter) {
874  // We have a filter on an end path, and the filter is not explicitly ignored.
875  // See if the filter is allowed.
876  std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
877  if (!search_all(allowed_filters, worker->description()->moduleName())) {
878  // Filter is not allowed. Ignore the result, and issue a warning.
879  filterAction = WorkerInPath::Ignore;
880  LogWarning("FilterOnEndPath") << "The EDFilter '" << worker->description()->moduleName()
881  << "' with module label '" << moduleLabel << "' appears on EndPath '"
882  << pathName << "'.\n"
883  << "The return value of the filter will be ignored.\n"
884  << "To suppress this warning, either remove the filter from the endpath,\n"
885  << "or explicitly ignore it in the configuration by using cms.ignore().\n";
886  }
887  }
888  bool runConcurrently = not doNotRunConcurrently;
889  if (runConcurrently && worker->moduleType() == Worker::kFilter and filterAction != WorkerInPath::Ignore) {
890  runConcurrently = false;
891  }
892 
893  auto condModules = tryToPlaceConditionalModules(worker,
894  conditionalmods,
895  conditionalModsBranches,
896  conditionalTaskHelper.aliasMap(),
897  proc_pset,
898  preg,
899  prealloc,
900  processConfiguration);
901  for (auto condMod : condModules) {
902  tmpworkers.emplace_back(
903  condMod, WorkerInPath::Ignore, conditionalModOrder[condMod->description()->moduleLabel()], true);
904  }
905 
906  tmpworkers.emplace_back(worker, filterAction, placeInPath, runConcurrently);
907  ++placeInPath;
908  }
909 
910  out.swap(tmpworkers);
911  }
vector< string > vstring
Definition: ExoticaDQM.cc:7
std::vector< Worker * > tryToPlaceConditionalModules(Worker *, std::unordered_set< std::string > &conditionalModules, std::unordered_multimap< std::string, edm::BranchDescription const *> const &conditionalModuleBranches, std::unordered_multimap< std::string, AliasInfo > const &aliasMap, ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
std::vector< WorkerInPath > PathWorkers
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:36
Log< level::Warning, false > LogWarning
WorkerManager workerManagerLumisAndEvents_

◆ finishedPaths()

void edm::StreamSchedule::finishedPaths ( std::atomic< std::exception_ptr *> &  iExcept,
WaitingTaskHolder  iWait,
EventTransitionInfo info 
)
private

Definition at line 1139 of file StreamSchedule.cc.

References writedatasetfile::action, actionTable(), cms::Exception::addContext(), cms::cuda::assert(), CMS_SA_ALLOW, cms::Exception::context(), edm::WaitingTaskHolder::doneWaiting(), MillePedeFileConverter_cfg::e, edm::ExceptionToActionTable::find(), edm::propagate_const< T >::get(), edm::exception_actions::IgnoreCompletely, info(), edm::printCmsExceptionWarning(), results_, results_inserter_, streamContext_, streamID_, total_passed_, and edm::exception_actions::TryToContinue.

Referenced by processOneEventAsync().

1141  {
1142  if (iExcept) {
1143  // Caught exception is propagated via WaitingTaskHolder
1144  CMS_SA_ALLOW try { std::rethrow_exception(*(iExcept.load())); } catch (cms::Exception& e) {
1148  edm::printCmsExceptionWarning("TryToContinue", e);
1149  *(iExcept.load()) = std::exception_ptr();
1150  } else {
1151  *(iExcept.load()) = std::current_exception();
1152  }
1153  } catch (...) {
1154  *(iExcept.load()) = std::current_exception();
1155  }
1156  }
1157 
1158  if ((not iExcept) and results_->accept()) {
1159  ++total_passed_;
1160  }
1161 
1162  if (nullptr != results_inserter_.get()) {
1163  // Caught exception is propagated to the caller
1164  CMS_SA_ALLOW try {
1165  //Even if there was an exception, we need to allow results inserter
1166  // to run since some module may be waiting on its results.
1167  ParentContext parentContext(&streamContext_);
1168  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
1169 
1170  auto expt = results_inserter_->runModuleDirectly<Traits>(info, streamID_, parentContext, &streamContext_);
1171  if (expt) {
1172  std::rethrow_exception(expt);
1173  }
1174  } catch (cms::Exception& ex) {
1175  if (not iExcept) {
1176  if (ex.context().empty()) {
1177  std::ostringstream ost;
1178  ost << "Processing Event " << info.principal().id();
1179  ex.addContext(ost.str());
1180  }
1181  iExcept.store(new std::exception_ptr(std::current_exception()));
1182  }
1183  } catch (...) {
1184  if (not iExcept) {
1185  iExcept.store(new std::exception_ptr(std::current_exception()));
1186  }
1187  }
1188  }
1189  std::exception_ptr ptr;
1190  if (iExcept) {
1191  ptr = *iExcept.load();
1192  }
1193  iWait.doneWaiting(ptr);
1194  }
ExceptionToActionTable const & actionTable() const
returns the action table
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
edm::propagate_const< WorkerPtr > results_inserter_
assert(be >=bs)
constexpr element_type const * get() const
StreamContext streamContext_
exception_actions::ActionCodes find(const std::string &category) const
edm::propagate_const< TrigResPtr > results_
void addContext(std::string const &context)
Definition: Exception.cc:169
std::list< std::string > const & context() const
Definition: Exception.cc:151
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)

◆ finishProcessOneEvent()

std::exception_ptr edm::StreamSchedule::finishProcessOneEvent ( std::exception_ptr  iExcept)
private

Definition at line 1196 of file StreamSchedule.cc.

References actReg_, edm::addContextAndPrintException(), CMS_SA_ALLOW, cms::Exception::context(), edm::ExceptionFromThisContext, resetEarlyDelete(), streamContext_, and edm::convertException::wrap().

Referenced by processOneEventAsync().

1196  {
1197  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
1198 
1199  if (iExcept) {
1200  //add context information to the exception and print message
1201  try {
1202  convertException::wrap([&]() { std::rethrow_exception(iExcept); });
1203  } catch (cms::Exception& ex) {
1204  bool const cleaningUpAfterException = false;
1205  if (ex.context().empty()) {
1206  addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
1207  } else {
1208  addContextAndPrintException("", ex, cleaningUpAfterException);
1209  }
1210  iExcept = std::current_exception();
1211  }
1212 
1213  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
1214  }
1215  // Caught exception is propagated to the caller
1216  CMS_SA_ALLOW try { Traits::postScheduleSignal(actReg_.get(), &streamContext_); } catch (...) {
1217  if (not iExcept) {
1218  iExcept = std::current_exception();
1219  }
1220  }
1221  if (not iExcept) {
1222  resetEarlyDelete();
1223  }
1224 
1225  return iExcept;
1226  }
#define CMS_SA_ALLOW
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
std::shared_ptr< ActivityRegistry > actReg_
StreamContext streamContext_
auto wrap(F iFunc) -> decltype(iFunc())
std::list< std::string > const & context() const
Definition: Exception.cc:151

◆ getAllModuleDescriptions()

std::vector< ModuleDescription const * > edm::StreamSchedule::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this StreamSchedule. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 1019 of file StreamSchedule.cc.

References allWorkersLumisAndEvents(), AlCaHLTBitMon_ParallelJobs::p, and mps_fire::result.

1019  {
1020  std::vector<ModuleDescription const*> result;
1021  result.reserve(allWorkersLumisAndEvents().size());
1022 
1023  for (auto const& worker : allWorkersLumisAndEvents()) {
1024  ModuleDescription const* p = worker->description();
1025  result.push_back(p);
1026  }
1027  return result;
1028  }
size
Write out results.
AllWorkers const & allWorkersLumisAndEvents() const

◆ getTriggerReport()

void edm::StreamSchedule::getTriggerReport ( TriggerReport rep) const

Return the trigger report information on paths, modules-in-path, modules-in-endpath, and modules.

Definition at line 1350 of file StreamSchedule.cc.

References allWorkersLumisAndEvents(), end_paths_, edm::fillPathSummary(), edm::fillWorkerSummary(), cuy::rep, totalEvents(), totalEventsFailed(), totalEventsPassed(), and trig_paths_.

1350  {
1351  rep.eventSummary.totalEvents += totalEvents();
1352  rep.eventSummary.totalEventsPassed += totalEventsPassed();
1353  rep.eventSummary.totalEventsFailed += totalEventsFailed();
1354 
1355  fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
1356  fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
1357  fill_summary(allWorkersLumisAndEvents(), rep.workerSummaries, &fillWorkerSummary);
1358  }
int totalEventsPassed() const
int totalEventsFailed() const
static void fillPathSummary(Path const &path, PathSummary &sum)
rep
Definition: cuy.py:1189
int totalEvents() const
AllWorkers const & allWorkersLumisAndEvents() const
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)

◆ handleException()

void edm::StreamSchedule::handleException ( StreamContext const &  streamContext,
ServiceWeakToken const &  weakToken,
bool  cleaningUpAfterException,
std::exception_ptr &  excpt 
) const
privatenoexcept

Definition at line 1437 of file StreamSchedule.cc.

References actReg_, edm::addContextAndPrintException(), CMS_SA_ALLOW, cms::Exception::context(), edm::exceptionContext(), edm::ExceptionFromThisContext, findAndChange::op, and edm::convertException::wrap().

Referenced by processOneStreamAsync().

1440  {
1441  //add context information to the exception and print message
1442  try {
1443  convertException::wrap([&excpt]() { std::rethrow_exception(excpt); });
1444  } catch (cms::Exception& ex) {
1445  std::ostringstream ost;
1446  // In most cases the exception will already have context at this point,
1447  // but add some context here in those rare cases where it does not.
1448  if (ex.context().empty()) {
1449  exceptionContext(ost, streamContext);
1450  }
1451  ServiceRegistry::Operate op(weakToken.lock());
1452  addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
1453  excpt = std::current_exception();
1454  }
1455  // We are already handling an earlier exception, so ignore it
1456  // if this signal results in another exception being thrown.
1457  CMS_SA_ALLOW try {
1458  ServiceRegistry::Operate op(weakToken.lock());
1459  actReg_->preStreamEarlyTerminationSignal_(streamContext, TerminationOrigin::ExceptionFromThisContext);
1460  } catch (...) {
1461  }
1462  }
#define CMS_SA_ALLOW
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
std::shared_ptr< ActivityRegistry > actReg_
auto wrap(F iFunc) -> decltype(iFunc())
std::list< std::string > const & context() const
Definition: Exception.cc:151

◆ initializeEarlyDelete()

void edm::StreamSchedule::initializeEarlyDelete ( ModuleRegistry modReg,
std::vector< std::string > const &  branchesToDeleteEarly,
std::multimap< std::string, std::string > const &  referencesToBranches,
std::vector< std::string > const &  modulesToSkip,
edm::ProductRegistry const &  preg 
)

Definition at line 516 of file StreamSchedule.cc.

References allWorkersLumisAndEvents(), cms::cuda::assert(), MicroEventContent_cff::branch, edm::maker::ModuleHolder::createOutputModuleCommunicator(), dumpMFGeometry_cfg::delta, submitPVResolutionJobs::desc, earlyDeleteBranchToCount_, earlyDeleteHelpers_, earlyDeleteHelperToBranchIndicies_, end_paths_, edm::ModuleRegistry::forAllModuleHolders(), dqm-mbProfile::format, newFWLiteAna::found, edm::InEvent, ALPAKA_ACCELERATOR_NAMESPACE::vertexFinder::it, B2GTnPMonitor_cfi::item, ALPAKA_ACCELERATOR_NAMESPACE::pixelClustering::pixelStatus::kEmpty, MainPageGenerator::l, dqmiodumpmetadata::n, AlCaHLTBitMon_ParallelJobs::p, resetEarlyDelete(), AlCaHLTBitMon_QueryRunRegistry::string, groupFilesInBlocks::temp, trig_paths_, mitigatedMETSequence_cff::U, and w().

520  {
521  // setup the list with those products actually registered for this job
522  std::multimap<std::string, Worker*> branchToReadingWorker;
523  initializeBranchToReadingWorker(branchesToDeleteEarly, preg, branchToReadingWorker);
524 
525  const std::vector<std::string> kEmpty;
526  std::map<Worker*, unsigned int> reserveSizeForWorker;
527  unsigned int upperLimitOnReadingWorker = 0;
528  unsigned int upperLimitOnIndicies = 0;
529  unsigned int nUniqueBranchesToDelete = branchToReadingWorker.size();
530 
531  //talk with output modules first
532  modReg.forAllModuleHolders([&branchToReadingWorker, &nUniqueBranchesToDelete](maker::ModuleHolder* iHolder) {
533  auto comm = iHolder->createOutputModuleCommunicator();
534  if (comm) {
535  if (!branchToReadingWorker.empty()) {
536  //If an OutputModule needs a product, we can't delete it early
537  // so we should remove it from our list
538  SelectedProductsForBranchType const& kept = comm->keptProducts();
539  for (auto const& item : kept[InEvent]) {
540  BranchDescription const& desc = *item.first;
541  auto found = branchToReadingWorker.equal_range(desc.branchName());
542  if (found.first != found.second) {
543  --nUniqueBranchesToDelete;
544  branchToReadingWorker.erase(found.first, found.second);
545  }
546  }
547  }
548  }
549  });
550 
551  if (branchToReadingWorker.empty()) {
552  return;
553  }
554 
555  std::unordered_set<std::string> modulesToExclude(modulesToSkip.begin(), modulesToSkip.end());
556  for (auto w : allWorkersLumisAndEvents()) {
557  if (modulesToExclude.end() != modulesToExclude.find(w->description()->moduleLabel())) {
558  continue;
559  }
560  //determine if this module could read a branch we want to delete early
561  auto consumes = w->consumesInfo();
562  if (not consumes.empty()) {
563  bool foundAtLeastOneMatchingBranch = false;
564  for (auto const& product : consumes) {
565  std::string branch = fmt::format("{}_{}_{}_{}",
566  product.type().friendlyClassName(),
567  product.label().data(),
568  product.instance().data(),
569  product.process().data());
570  {
571  //Handle case where worker directly consumes product
572  auto found = branchToReadingWorker.end();
573  if (product.process().empty()) {
574  auto startFound = branchToReadingWorker.lower_bound(branch);
575  if (startFound != branchToReadingWorker.end()) {
576  if (startFound->first.substr(0, branch.size()) == branch) {
577  //match all processNames here, even if it means multiple matches will happen
578  found = startFound;
579  }
580  }
581  } else {
582  auto exactFound = branchToReadingWorker.equal_range(branch);
583  if (exactFound.first != exactFound.second) {
584  found = exactFound.first;
585  }
586  }
587  if (found != branchToReadingWorker.end()) {
588  if (not foundAtLeastOneMatchingBranch) {
589  ++upperLimitOnReadingWorker;
590  foundAtLeastOneMatchingBranch = true;
591  }
592  ++upperLimitOnIndicies;
593  ++reserveSizeForWorker[w];
594  if (nullptr == found->second) {
595  found->second = w;
596  } else {
597  branchToReadingWorker.insert(make_pair(found->first, w));
598  }
599  }
600  }
601  {
602  //Handle case where indirectly consumes product
603  auto found = referencesToBranches.end();
604  if (product.process().empty()) {
605  auto startFound = referencesToBranches.lower_bound(branch);
606  if (startFound != referencesToBranches.end()) {
607  if (startFound->first.substr(0, branch.size()) == branch) {
608  //match all processNames here, even if it means multiple matches will happen
609  found = startFound;
610  }
611  }
612  } else {
613  //can match exactly
614  auto exactFound = referencesToBranches.equal_range(branch);
615  if (exactFound.first != exactFound.second) {
616  found = exactFound.first;
617  }
618  }
619  if (found != referencesToBranches.end()) {
620  for (auto itr = found; (itr != referencesToBranches.end()) and (itr->first == found->first); ++itr) {
621  auto foundInBranchToReadingWorker = branchToReadingWorker.find(itr->second);
622  if (foundInBranchToReadingWorker == branchToReadingWorker.end()) {
623  continue;
624  }
625  if (not foundAtLeastOneMatchingBranch) {
626  ++upperLimitOnReadingWorker;
627  foundAtLeastOneMatchingBranch = true;
628  }
629  ++upperLimitOnIndicies;
630  ++reserveSizeForWorker[w];
631  if (nullptr == foundInBranchToReadingWorker->second) {
632  foundInBranchToReadingWorker->second = w;
633  } else {
634  branchToReadingWorker.insert(make_pair(itr->second, w));
635  }
636  }
637  }
638  }
639  }
640  }
641  }
642  {
643  auto it = branchToReadingWorker.begin();
644  std::vector<std::string> unusedBranches;
645  while (it != branchToReadingWorker.end()) {
646  if (it->second == nullptr) {
647  unusedBranches.push_back(it->first);
648  //erasing the object invalidates the iterator so must advance it first
649  auto temp = it;
650  ++it;
651  branchToReadingWorker.erase(temp);
652  } else {
653  ++it;
654  }
655  }
656  if (not unusedBranches.empty()) {
657  LogWarning l("UnusedProductsForCanDeleteEarly");
658  l << "The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
659  " If possible, remove the producer from the job.";
660  for (auto const& n : unusedBranches) {
661  l << "\n " << n;
662  }
663  }
664  }
665  if (!branchToReadingWorker.empty()) {
666  earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
667  earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies, 0);
668  earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
669  std::map<const Worker*, EarlyDeleteHelper*> alreadySeenWorkers;
670  std::string lastBranchName;
671  size_t nextOpenIndex = 0;
672  unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
673  for (auto& branchAndWorker : branchToReadingWorker) {
674  if (lastBranchName != branchAndWorker.first) {
675  //have to put back the period we removed earlier in order to get the proper name
676  BranchID bid(branchAndWorker.first + ".");
677  earlyDeleteBranchToCount_.emplace_back(bid, 0U);
678  lastBranchName = branchAndWorker.first;
679  }
680  auto found = alreadySeenWorkers.find(branchAndWorker.second);
681  if (alreadySeenWorkers.end() == found) {
682  //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
683  // all the branches that might be read by this worker. However, initially we will only tell the
684  // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
685  // EarlyDeleteHelper will automatically advance its internal end pointer.
686  size_t index = nextOpenIndex;
687  size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
690  earlyDeleteHelpers_.emplace_back(beginAddress + index, beginAddress + index + 1, &earlyDeleteBranchToCount_);
691  branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
692  alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second, &(earlyDeleteHelpers_.back())));
693  nextOpenIndex += nIndices;
694  } else {
695  found->second->appendIndex(earlyDeleteBranchToCount_.size() - 1);
696  }
697  }
698 
699  //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
700  // space needed for each module
701  auto itLast = earlyDeleteHelpers_.begin();
702  for (auto it = earlyDeleteHelpers_.begin() + 1; it != earlyDeleteHelpers_.end(); ++it) {
703  if (itLast->end() != it->begin()) {
704  //figure the offset for next Worker since it hasn't been moved yet so it has the original address
705  unsigned int delta = it->begin() - itLast->end();
706  it->shiftIndexPointers(delta);
707 
709  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
710  earlyDeleteHelperToBranchIndicies_.begin() + (it->begin() - beginAddress));
711  }
712  itLast = it;
713  }
715  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
717 
718  //now tell the paths about the deleters
719  for (auto& p : trig_paths_) {
720  p.setEarlyDeleteHelpers(alreadySeenWorkers);
721  }
722  for (auto& p : end_paths_) {
723  p.setEarlyDeleteHelpers(alreadySeenWorkers);
724  }
726  }
727  }
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
T w() const
std::vector< BranchToCount > earlyDeleteBranchToCount_
assert(be >=bs)
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
AllWorkers const & allWorkersLumisAndEvents() const
Log< level::Warning, false > LogWarning
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_

◆ makePathStatusInserters()

void edm::StreamSchedule::makePathStatusInserters ( std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters,
std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &  endPathStatusInserters,
ExceptionToActionTable const &  actions 
)
private

Definition at line 1386 of file StreamSchedule.cc.

References actions, actReg_, addToAllWorkers(), empty_end_paths_, empty_trig_paths_, end_paths_, endPathStatusInserterWorkers_, edm::get_underlying(), pathStatusInserterWorkers_, and trig_paths_.

Referenced by StreamSchedule().

1389  {
1390  int bitpos = 0;
1391  unsigned int indexEmpty = 0;
1392  unsigned int indexOfPath = 0;
1393  for (auto& pathStatusInserter : pathStatusInserters) {
1394  std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
1395  WorkerPtr workerPtr(
1396  new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1397  pathStatusInserterWorkers_.emplace_back(workerPtr);
1398  workerPtr->setActivityRegistry(actReg_);
1399  addToAllWorkers(workerPtr.get());
1400 
1401  // A little complexity here because a C++ Path object is not
1402  // instantiated and put into end_paths if there are no modules
1403  // on the configured path.
1404  if (indexEmpty < empty_trig_paths_.size() && bitpos == empty_trig_paths_.at(indexEmpty)) {
1405  ++indexEmpty;
1406  } else {
1407  trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(), workerPtr.get());
1408  ++indexOfPath;
1409  }
1410  ++bitpos;
1411  }
1412 
1413  bitpos = 0;
1414  indexEmpty = 0;
1415  indexOfPath = 0;
1416  for (auto& endPathStatusInserter : endPathStatusInserters) {
1417  std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
1418  WorkerPtr workerPtr(
1419  new edm::WorkerT<EndPathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1420  endPathStatusInserterWorkers_.emplace_back(workerPtr);
1421  workerPtr->setActivityRegistry(actReg_);
1422  addToAllWorkers(workerPtr.get());
1423 
1424  // A little complexity here because a C++ Path object is not
1425  // instantiated and put into end_paths if there are no modules
1426  // on the configured path.
1427  if (indexEmpty < empty_end_paths_.size() && bitpos == empty_end_paths_.at(indexEmpty)) {
1428  ++indexEmpty;
1429  } else {
1430  end_paths_.at(indexOfPath).setPathStatusInserter(nullptr, workerPtr.get());
1431  ++indexOfPath;
1432  }
1433  ++bitpos;
1434  }
1435  }
std::vector< int > empty_trig_paths_
roAction_t actions[nactions]
Definition: GenABIO.cc:181
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void addToAllWorkers(Worker *w)
std::shared_ptr< Worker > WorkerPtr
std::shared_ptr< ActivityRegistry > actReg_
std::vector< int > empty_end_paths_
constexpr T & get_underlying(propagate_const< T > &)
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_

◆ moduleDescriptionsInEndPath()

void edm::StreamSchedule::moduleDescriptionsInEndPath ( std::string const &  iEndPathLabel,
std::vector< ModuleDescription const *> &  descriptions,
unsigned int  hint 
) const

Definition at line 1278 of file StreamSchedule.cc.

References end_paths_, newFWLiteAna::found, mps_fire::i, and edm::Path::name().

1280  {
1281  descriptions.clear();
1282  bool found = false;
1283  TrigPaths::const_iterator itFound;
1284 
1285  if (hint < end_paths_.size()) {
1286  itFound = end_paths_.begin() + hint;
1287  if (itFound->name() == iEndPathLabel)
1288  found = true;
1289  }
1290  if (!found) {
1291  // if the hint did not work, do it the slow way
1292  itFound = std::find_if(
1293  end_paths_.begin(),
1294  end_paths_.end(),
1295  std::bind(std::equal_to<std::string>(), iEndPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1296  if (itFound != end_paths_.end())
1297  found = true;
1298  }
1299  if (found) {
1300  descriptions.reserve(itFound->size());
1301  for (size_t i = 0; i < itFound->size(); ++i) {
1302  descriptions.push_back(itFound->getWorker(i)->description());
1303  }
1304  }
1305  }
std::string const & name() const
Definition: Path.h:66

◆ moduleDescriptionsInPath()

void edm::StreamSchedule::moduleDescriptionsInPath ( std::string const &  iPathLabel,
std::vector< ModuleDescription const *> &  descriptions,
unsigned int  hint 
) const

Definition at line 1249 of file StreamSchedule.cc.

References newFWLiteAna::found, mps_fire::i, edm::Path::name(), and trig_paths_.

1251  {
1252  descriptions.clear();
1253  bool found = false;
1254  TrigPaths::const_iterator itFound;
1255 
1256  if (hint < trig_paths_.size()) {
1257  itFound = trig_paths_.begin() + hint;
1258  if (itFound->name() == iPathLabel)
1259  found = true;
1260  }
1261  if (!found) {
1262  // if the hint did not work, do it the slow way
1263  itFound = std::find_if(
1264  trig_paths_.begin(),
1265  trig_paths_.end(),
1266  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1267  if (itFound != trig_paths_.end())
1268  found = true;
1269  }
1270  if (found) {
1271  descriptions.reserve(itFound->size());
1272  for (size_t i = 0; i < itFound->size(); ++i) {
1273  descriptions.push_back(itFound->getWorker(i)->description());
1274  }
1275  }
1276  }
std::string const & name() const
Definition: Path.h:66

◆ modulesInPath()

void edm::StreamSchedule::modulesInPath ( std::string const &  iPathLabel,
std::vector< std::string > &  oLabelsToFill 
) const

adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel

Definition at line 1236 of file StreamSchedule.cc.

References mps_fire::i, edm::Path::name(), and trig_paths_.

1236  {
1237  TrigPaths::const_iterator itFound = std::find_if(
1238  trig_paths_.begin(),
1239  trig_paths_.end(),
1240  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1241  if (itFound != trig_paths_.end()) {
1242  oLabelsToFill.reserve(itFound->size());
1243  for (size_t i = 0; i < itFound->size(); ++i) {
1244  oLabelsToFill.push_back(itFound->getWorker(i)->description()->moduleLabel());
1245  }
1246  }
1247  }
std::string const & name() const
Definition: Path.h:66

◆ numberOfUnscheduledModules()

unsigned int edm::StreamSchedule::numberOfUnscheduledModules ( ) const
inline

Definition at line 231 of file StreamSchedule.h.

References number_of_unscheduled_modules_.

unsigned int number_of_unscheduled_modules_

◆ postScheduleSignal()

template<typename T >
void edm::StreamSchedule::postScheduleSignal ( StreamContext const *  streamContext,
ServiceWeakToken const &  weakToken,
std::exception_ptr &  excpt 
) const
privatenoexcept

Definition at line 432 of file StreamSchedule.h.

References cms::Exception::addContext(), edm::exceptionContext(), findAndChange::op, and edm::convertException::wrap().

434  {
435  try {
436  convertException::wrap([this, &weakToken, streamContext]() {
437  ServiceRegistry::Operate op(weakToken.lock());
438  T::postScheduleSignal(actReg_.get(), streamContext);
439  });
440  } catch (cms::Exception& ex) {
441  if (not excpt) {
442  std::ostringstream ost;
443  ex.addContext("Handling post signal, likely in a service function");
444  exceptionContext(ost, *streamContext);
445  ex.addContext(ost.str());
446  excpt = std::current_exception();
447  }
448  }
449  }
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
std::shared_ptr< ActivityRegistry > actReg_
void addContext(std::string const &context)
Definition: Exception.cc:169
auto wrap(F iFunc) -> decltype(iFunc())

◆ preScheduleSignal()

template<typename T >
void edm::StreamSchedule::preScheduleSignal ( StreamContext const *  streamContext) const
private

Definition at line 419 of file StreamSchedule.h.

References actReg_, cms::Exception::addContext(), edm::exceptionContext(), and edm::convertException::wrap().

419  {
420  try {
421  convertException::wrap([this, streamContext]() { T::preScheduleSignal(actReg_.get(), streamContext); });
422  } catch (cms::Exception& ex) {
423  std::ostringstream ost;
424  ex.addContext("Handling pre signal, likely in a service function");
425  exceptionContext(ost, *streamContext);
426  ex.addContext(ost.str());
427  throw;
428  }
429  }
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
std::shared_ptr< ActivityRegistry > actReg_
void addContext(std::string const &context)
Definition: Exception.cc:169
auto wrap(F iFunc) -> decltype(iFunc())

◆ processOneEventAsync()

void edm::StreamSchedule::processOneEventAsync ( WaitingTaskHolder  iTask,
EventTransitionInfo info,
ServiceToken const &  token,
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters 
)

Definition at line 1030 of file StreamSchedule.cc.

References actReg_, CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), empty_end_paths_, empty_trig_paths_, end_paths_, endPathStatusInserterWorkers_, SiStripBadComponentsDQMServiceTemplate_cfg::ep, finishedPaths(), finishProcessOneEvent(), edm::WaitingTaskHolder::group(), info(), ALPAKA_ACCELERATOR_NAMESPACE::vertexFinder::it, edm::ServiceWeakToken::lock(), edm::make_waiting_task(), eostools::move(), edm::hlt::Pass, pathStatusInserterWorkers_, edm::WorkerManager::processAccumulatorsAsync(), resetAll(), results_, edm::WorkerManager::setupOnDemandSystem(), edm::WorkerManager::setupResolvers(), streamContext_, streamID_, total_events_, trig_paths_, and workerManagerLumisAndEvents_.

1034  {
1035  EventPrincipal& ep = info.principal();
1036 
1037  // Caught exception is propagated via WaitingTaskHolder
1038  CMS_SA_ALLOW try {
1039  this->resetAll();
1040 
1041  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
1042 
1043  Traits::setStreamContext(streamContext_, ep);
1044  //a service may want to communicate with another service
1045  ServiceRegistry::Operate guard(serviceToken);
1046  Traits::preScheduleSignal(actReg_.get(), &streamContext_);
1047 
1048  // Data dependencies need to be set up before marking empty
1049  // (End)Paths complete in case something consumes the status of
1050  // the empty (EndPath)
1053 
1054  HLTPathStatus hltPathStatus(hlt::Pass, 0);
1055  for (int empty_trig_path : empty_trig_paths_) {
1056  results_->at(empty_trig_path) = hltPathStatus;
1057  pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
1058  std::exception_ptr except = pathStatusInserterWorkers_[empty_trig_path]
1059  ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
1060  info, streamID_, ParentContext(&streamContext_), &streamContext_);
1061  if (except) {
1062  iTask.doneWaiting(except);
1063  return;
1064  }
1065  }
1066  if (not endPathStatusInserterWorkers_.empty()) {
1067  for (int empty_end_path : empty_end_paths_) {
1068  std::exception_ptr except =
1069  endPathStatusInserterWorkers_[empty_end_path]
1070  ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
1071  info, streamID_, ParentContext(&streamContext_), &streamContext_);
1072  if (except) {
1073  iTask.doneWaiting(except);
1074  return;
1075  }
1076  }
1077  }
1078 
1079  ++total_events_;
1080 
1081  //use to give priorities on an error to ones from Paths
1082  auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(nullptr);
1083  auto pathErrorPtr = pathErrorHolder.get();
1084  ServiceWeakToken weakToken = serviceToken;
1085  auto allPathsDone = make_waiting_task(
1086  [iTask, this, weakToken, pathError = std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable {
1087  ServiceRegistry::Operate operate(weakToken.lock());
1088 
1089  std::exception_ptr ptr;
1090  if (pathError->load()) {
1091  ptr = *pathError->load();
1092  delete pathError->load();
1093  }
1094  if ((not ptr) and iPtr) {
1095  ptr = *iPtr;
1096  }
1097  iTask.doneWaiting(finishProcessOneEvent(ptr));
1098  });
1099  //The holder guarantees that if the paths finish before the loop ends
1100  // that we do not start too soon. It also guarantees that the task will
1101  // run under that condition.
1102  WaitingTaskHolder allPathsHolder(*iTask.group(), allPathsDone);
1103 
1104  auto pathsDone = make_waiting_task([allPathsHolder, pathErrorPtr, transitionInfo = info, this, weakToken](
1105  std::exception_ptr const* iPtr) mutable {
1106  ServiceRegistry::Operate operate(weakToken.lock());
1107 
1108  if (iPtr) {
1109  //this is used to prioritize this error over one
1110  // that happens in EndPath or Accumulate
1111  pathErrorPtr->store(new std::exception_ptr(*iPtr));
1112  }
1113  finishedPaths(*pathErrorPtr, std::move(allPathsHolder), transitionInfo);
1114  });
1115 
1116  //The holder guarantees that if the paths finish before the loop ends
1117  // that we do not start too soon. It also guarantees that the task will
1118  // run under that condition.
1119  WaitingTaskHolder taskHolder(*iTask.group(), pathsDone);
1120 
1121  //start end paths first so on single threaded the paths will run first
1122  WaitingTaskHolder hAllPathsDone(*iTask.group(), allPathsDone);
1123  for (auto it = end_paths_.rbegin(), itEnd = end_paths_.rend(); it != itEnd; ++it) {
1124  it->processEventUsingPathAsync(hAllPathsDone, info, serviceToken, streamID_, &streamContext_);
1125  }
1126 
1127  for (auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend(); it != itEnd; ++it) {
1128  it->processEventUsingPathAsync(taskHolder, info, serviceToken, streamID_, &streamContext_);
1129  }
1130 
1131  ParentContext parentContext(&streamContext_);
1132  workerManagerLumisAndEvents_.processAccumulatorsAsync<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
1133  hAllPathsDone, info, serviceToken, streamID_, parentContext, &streamContext_);
1134  } catch (...) {
1135  iTask.doneWaiting(std::current_exception());
1136  }
1137  }
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
std::vector< int > empty_trig_paths_
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void processAccumulatorsAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
void setupOnDemandSystem(EventTransitionInfo const &)
std::shared_ptr< ActivityRegistry > actReg_
std::vector< int > empty_end_paths_
accept
Definition: HLTenums.h:18
StreamContext streamContext_
void finishedPaths(std::atomic< std::exception_ptr *> &, WaitingTaskHolder, EventTransitionInfo &)
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
edm::propagate_const< TrigResPtr > results_
void setupResolvers(Principal &principal)
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
WorkerManager workerManagerLumisAndEvents_
def move(src, dest)
Definition: eostools.py:511

◆ processOneStreamAsync()

template<typename T >
void edm::StreamSchedule::processOneStreamAsync ( WaitingTaskHolder  iTask,
typename T::TransitionInfoType &  transitionInfo,
ServiceToken const &  token,
bool  cleaningUpAfterException = false 
)

Definition at line 361 of file StreamSchedule.h.

References CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), edm::WaitingTaskHolder::group(), watchdog::group, h, handleException(), info(), edm::InLumi, edm::ServiceWeakToken::lock(), edm::make_functor_task(), edm::make_waiting_task(), eostools::move(), findAndChange::op, edm::WorkerManager::processOneOccurrenceAsync(), edm::WorkerManager::resetAll(), alignCSCRings::s, streamContext_, streamID_, TrackValidation_cff::task, unpackBuffers-CaloStage2::token, edm::StreamID::value(), workerManagerLumisAndEvents_, and workerManagerRuns_.

364  {
365  auto group = iHolder.group();
366  auto const& principal = transitionInfo.principal();
367  T::setStreamContext(streamContext_, principal);
368 
369  ServiceWeakToken weakToken = token;
370  auto doneTask = make_waiting_task([this, iHolder = std::move(iHolder), cleaningUpAfterException, weakToken](
371  std::exception_ptr const* iPtr) mutable {
372  std::exception_ptr excpt;
373  if (iPtr) {
374  excpt = *iPtr;
375  handleException(streamContext_, weakToken, cleaningUpAfterException, excpt);
376  }
377  postScheduleSignal<T>(&streamContext_, weakToken, excpt);
378  iHolder.doneWaiting(excpt);
379  });
380 
381  auto task =
382  make_functor_task([this, h = WaitingTaskHolder(*group, doneTask), info = transitionInfo, weakToken]() mutable {
383  auto token = weakToken.lock();
385  // Caught exception is propagated via WaitingTaskHolder
386  WorkerManager* workerManager = &workerManagerRuns_;
387  if (T::branchType_ == InLumi) {
388  workerManager = &workerManagerLumisAndEvents_;
389  }
390  CMS_SA_ALLOW try {
391  preScheduleSignal<T>(&streamContext_);
392  workerManager->resetAll();
393  } catch (...) {
394  h.doneWaiting(std::current_exception());
395  return;
396  }
397 
398  workerManager->processOneOccurrenceAsync<T>(h, info, token, streamID_, &streamContext_, &streamContext_);
399  });
400 
401  if (streamID_.value() == 0) {
402  //Enqueueing will start another thread if there is only
403  // one thread in the job. Having stream == 0 use spawn
404  // avoids starting up another thread when there is only one stream.
405  group->run([task]() {
406  TaskSentry s{task};
407  task->execute();
408  });
409  } else {
410  oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
411  arena.enqueue([task]() {
412  TaskSentry s{task};
413  task->execute();
414  });
415  }
416  }
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
WorkerManager workerManagerRuns_
StreamContext streamContext_
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
void handleException(StreamContext const &, ServiceWeakToken const &, bool cleaningUpAfterException, std::exception_ptr &) const noexcept
FunctorTask< F > * make_functor_task(F f)
Definition: FunctorTask.h:44
unsigned int value() const
Definition: StreamID.h:43
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
long double T
WorkerManager workerManagerLumisAndEvents_
def move(src, dest)
Definition: eostools.py:511

◆ replaceModule()

void edm::StreamSchedule::replaceModule ( maker::ModuleHolder iMod,
std::string const &  iLabel 
)

clone the type of module with label iLabel but configure with iPSet.

Definition at line 989 of file StreamSchedule.cc.

References allWorkersBeginEnd(), allWorkersLumisAndEvents(), allWorkersRuns(), edm::maker::ModuleHolder::replaceModuleFor(), streamContext_, and streamID_.

989  {
990  for (auto const& worker : allWorkersBeginEnd()) {
991  if (worker->description()->moduleLabel() == iLabel) {
992  iMod->replaceModuleFor(worker);
993  worker->beginStream(streamID_, streamContext_);
994  break;
995  }
996  }
997 
998  for (auto const& worker : allWorkersRuns()) {
999  if (worker->description()->moduleLabel() == iLabel) {
1000  iMod->replaceModuleFor(worker);
1001  break;
1002  }
1003  }
1004 
1005  for (auto const& worker : allWorkersLumisAndEvents()) {
1006  if (worker->description()->moduleLabel() == iLabel) {
1007  iMod->replaceModuleFor(worker);
1008  break;
1009  }
1010  }
1011  }
AllWorkers const & allWorkersBeginEnd() const
returns the collection of pointers to workers
StreamContext streamContext_
AllWorkers const & allWorkersLumisAndEvents() const
AllWorkers const & allWorkersRuns() const

◆ reportSkipped()

void edm::StreamSchedule::reportSkipped ( EventPrincipal const &  ep) const
inlineprivate

Definition at line 355 of file StreamSchedule.h.

References SiStripBadComponentsDQMServiceTemplate_cfg::ep.

355  {
356  Service<JobReport> reportSvc;
357  reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
358  }

◆ resetAll()

void edm::StreamSchedule::resetAll ( )
private

Definition at line 1368 of file StreamSchedule.cc.

References results_.

Referenced by processOneEventAsync().

1368 { results_->reset(); }
edm::propagate_const< TrigResPtr > results_

◆ resetEarlyDelete()

void edm::StreamSchedule::resetEarlyDelete ( )
private

Definition at line 1372 of file StreamSchedule.cc.

References submitPVResolutionJobs::count, earlyDeleteBranchToCount_, earlyDeleteHelpers_, and earlyDeleteHelperToBranchIndicies_.

Referenced by finishProcessOneEvent(), and initializeEarlyDelete().

1372  {
1373  //must be sure we have cleared the count first
1374  for (auto& count : earlyDeleteBranchToCount_) {
1375  count.count = 0;
1376  }
1377  //now reset based on how many helpers use that branch
1379  ++(earlyDeleteBranchToCount_[index].count);
1380  }
1381  for (auto& helper : earlyDeleteHelpers_) {
1382  helper.reset();
1383  }
1384  }
Definition: helper.py:1
std::vector< BranchToCount > earlyDeleteBranchToCount_
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_

◆ results() [1/2]

TrigResConstPtr edm::StreamSchedule::results ( ) const
inlineprivate

Definition at line 297 of file StreamSchedule.h.

References edm::get_underlying_safe(), and results_.

Referenced by StreamSchedule().

297 { return get_underlying_safe(results_); }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< TrigResPtr > results_

◆ results() [2/2]

TrigResPtr& edm::StreamSchedule::results ( )
inlineprivate

Definition at line 298 of file StreamSchedule.h.

References edm::get_underlying_safe(), and results_.

298 { return get_underlying_safe(results_); }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< TrigResPtr > results_

◆ streamID()

StreamID edm::StreamSchedule::streamID ( ) const
inline

Definition at line 167 of file StreamSchedule.h.

References streamID_.

Referenced by StreamSchedule().

167 { return streamID_; }

◆ totalEvents()

int edm::StreamSchedule::totalEvents ( ) const
inline

Return the number of events this StreamSchedule has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 194 of file StreamSchedule.h.

References total_events_.

Referenced by getTriggerReport(), and totalEventsFailed().

194 { return total_events_; }

◆ totalEventsFailed()

int edm::StreamSchedule::totalEventsFailed ( ) const
inline

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 202 of file StreamSchedule.h.

References totalEvents(), and totalEventsPassed().

Referenced by getTriggerReport().

202 { return totalEvents() - totalEventsPassed(); }
int totalEventsPassed() const
int totalEvents() const

◆ totalEventsPassed()

int edm::StreamSchedule::totalEventsPassed ( ) const
inline

Return the number of events which have been passed by one or more trigger paths.

Definition at line 198 of file StreamSchedule.h.

References total_passed_.

Referenced by getTriggerReport(), and totalEventsFailed().

198 { return total_passed_; }

◆ tryToPlaceConditionalModules()

std::vector< Worker * > edm::StreamSchedule::tryToPlaceConditionalModules ( Worker worker,
std::unordered_set< std::string > &  conditionalModules,
std::unordered_multimap< std::string, edm::BranchDescription const *> const &  conditionalModuleBranches,
std::unordered_multimap< std::string, AliasInfo > const &  aliasMap,
ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration 
)
private

Definition at line 729 of file StreamSchedule.cc.

References cms::cuda::assert(), edm::TypeID::className(), edm::Worker::consumesInfo(), edm::Worker::description(), HerwigMaxPtPartonFilter_cfi::moduleLabel, edm::ModuleDescription::moduleLabel(), or, edm::PRODUCT_TYPE, AlCaHLTBitMon_QueryRunRegistry::string, edm::productholderindexhelper::typeIsViewCompatible(), and workerManagerLumisAndEvents_.

Referenced by fillWorkers().

737  {
738  std::vector<Worker*> returnValue;
739  auto const& consumesInfo = worker->consumesInfo();
740  auto moduleLabel = worker->description()->moduleLabel();
741  using namespace productholderindexhelper;
742  for (auto const& ci : consumesInfo) {
743  if (not ci.skipCurrentProcess() and
744  (ci.process().empty() or ci.process() == processConfiguration->processName())) {
745  auto productModuleLabel = std::string(ci.label());
746  bool productFromConditionalModule = false;
747  auto itFound = conditionalModules.find(productModuleLabel);
748  if (itFound == conditionalModules.end()) {
749  //Check to see if this was an alias
750  //note that aliasMap was previously filtered so only the conditional modules remain there
751  auto foundAlias = findBestMatchingAlias(conditionalModuleBranches, aliasMap, productModuleLabel, ci);
752  if (foundAlias) {
753  productModuleLabel = *foundAlias;
754  productFromConditionalModule = true;
755  itFound = conditionalModules.find(productModuleLabel);
756  //check that the alias-for conditional module has not been used
757  if (itFound == conditionalModules.end()) {
758  continue;
759  }
760  }
761  } else {
762  //need to check the rest of the data product info
763  auto findBranches = conditionalModuleBranches.equal_range(productModuleLabel);
764  for (auto itBranch = findBranches.first; itBranch != findBranches.second; ++itBranch) {
765  if (itBranch->second->productInstanceName() == ci.instance()) {
766  if (ci.kindOfType() == PRODUCT_TYPE) {
767  if (ci.type() == itBranch->second->unwrappedTypeID()) {
768  productFromConditionalModule = true;
769  break;
770  }
771  } else {
772  //this is a view
774  ci.type(), TypeID(itBranch->second->wrappedType().typeInfo()), itBranch->second->className())) {
775  productFromConditionalModule = true;
776  break;
777  }
778  }
779  }
780  }
781  }
782  if (productFromConditionalModule) {
783  auto condWorker = getWorker(
784  productModuleLabel, proc_pset, workerManagerLumisAndEvents_, preg, prealloc, processConfiguration);
785  assert(condWorker);
786 
787  conditionalModules.erase(itFound);
788 
789  auto dependents = tryToPlaceConditionalModules(condWorker,
790  conditionalModules,
791  conditionalModuleBranches,
792  aliasMap,
793  proc_pset,
794  preg,
795  prealloc,
796  processConfiguration);
797  returnValue.insert(returnValue.end(), dependents.begin(), dependents.end());
798  returnValue.push_back(condWorker);
799  }
800  }
801  }
802  return returnValue;
803  }
std::vector< Worker * > tryToPlaceConditionalModules(Worker *, std::unordered_set< std::string > &conditionalModules, std::unordered_multimap< std::string, edm::BranchDescription const *> const &conditionalModuleBranches, std::unordered_multimap< std::string, AliasInfo > const &aliasMap, ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
assert(be >=bs)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
bool typeIsViewCompatible(TypeID const &requestedViewType, TypeID const &wrappedtypeID, std::string const &className)
WorkerManager workerManagerLumisAndEvents_

◆ unscheduledWorkersLumisAndEvents()

AllWorkers const& edm::StreamSchedule::unscheduledWorkersLumisAndEvents ( ) const
inline

Definition at line 228 of file StreamSchedule.h.

References edm::WorkerManager::unscheduledWorkers(), and workerManagerLumisAndEvents_.

228  {
230  }
AllWorkers const & unscheduledWorkers() const
Definition: WorkerManager.h:81
WorkerManager workerManagerLumisAndEvents_

Member Data Documentation

◆ actReg_

std::shared_ptr<ActivityRegistry> edm::StreamSchedule::actReg_
private

◆ earlyDeleteBranchToCount_

std::vector<BranchToCount> edm::StreamSchedule::earlyDeleteBranchToCount_
private

Definition at line 335 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

◆ earlyDeleteHelpers_

std::vector<EarlyDeleteHelper> edm::StreamSchedule::earlyDeleteHelpers_
private

Definition at line 345 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

◆ earlyDeleteHelperToBranchIndicies_

std::vector<unsigned int> edm::StreamSchedule::earlyDeleteHelperToBranchIndicies_
private

Definition at line 342 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

◆ empty_end_paths_

std::vector<int> edm::StreamSchedule::empty_end_paths_
private

Definition at line 330 of file StreamSchedule.h.

Referenced by fillEndPath(), makePathStatusInserters(), and processOneEventAsync().

◆ empty_trig_paths_

std::vector<int> edm::StreamSchedule::empty_trig_paths_
private

Definition at line 329 of file StreamSchedule.h.

Referenced by fillTrigPath(), makePathStatusInserters(), and processOneEventAsync().

◆ end_paths_

TrigPaths edm::StreamSchedule::end_paths_
private

◆ endPathStatusInserterWorkers_

std::vector<edm::propagate_const<WorkerPtr> > edm::StreamSchedule::endPathStatusInserterWorkers_
private

Definition at line 325 of file StreamSchedule.h.

Referenced by makePathStatusInserters(), and processOneEventAsync().

◆ number_of_unscheduled_modules_

unsigned int edm::StreamSchedule::number_of_unscheduled_modules_
private

Definition at line 349 of file StreamSchedule.h.

Referenced by numberOfUnscheduledModules(), and StreamSchedule().

◆ pathStatusInserterWorkers_

std::vector<edm::propagate_const<WorkerPtr> > edm::StreamSchedule::pathStatusInserterWorkers_
private

Definition at line 324 of file StreamSchedule.h.

Referenced by makePathStatusInserters(), and processOneEventAsync().

◆ results_

edm::propagate_const<TrigResPtr> edm::StreamSchedule::results_
private

Definition at line 321 of file StreamSchedule.h.

Referenced by finishedPaths(), processOneEventAsync(), resetAll(), and results().

◆ results_inserter_

edm::propagate_const<WorkerPtr> edm::StreamSchedule::results_inserter_
private

Definition at line 323 of file StreamSchedule.h.

Referenced by finishedPaths(), and StreamSchedule().

◆ streamContext_

StreamContext edm::StreamSchedule::streamContext_
private

◆ streamID_

StreamID edm::StreamSchedule::streamID_
private

◆ total_events_

int edm::StreamSchedule::total_events_
private

Definition at line 347 of file StreamSchedule.h.

Referenced by clearCounters(), processOneEventAsync(), and totalEvents().

◆ total_passed_

int edm::StreamSchedule::total_passed_
private

Definition at line 348 of file StreamSchedule.h.

Referenced by clearCounters(), finishedPaths(), and totalEventsPassed().

◆ trig_paths_

TrigPaths edm::StreamSchedule::trig_paths_
private

◆ workerManagerBeginEnd_

WorkerManager edm::StreamSchedule::workerManagerBeginEnd_
private

◆ workerManagerLumisAndEvents_

WorkerManager edm::StreamSchedule::workerManagerLumisAndEvents_
private

◆ workerManagerRuns_

WorkerManager edm::StreamSchedule::workerManagerRuns_
private